WordCount 의 결과를 가지고 빈도수가 높은 단어를 추출할 수 있다. 상위 N개의 단어를 보여준다. WordCount 결과는 단어와 빈도를 하나의 라인으로 가지고 있는 파일이다. 우선 순위 큐를 만들고, 매퍼에서 라인을 하나씩 읽으면서 단어와 빈도를 추출한다. 우선순위 큐의 아이템 갯수가 N보다 적거나 우선 순위 큐의 가장 빈도가 낮은 아이템의 빈도보다 읽혀진 단어의 빈도가 크면 우선순위 큐에 집어 넣고 queue size가 넘치지 않게 조정한다.
다음은 우선순위 큐에 넣을 아이템을 정의하는 클래스이다. 단어와 빈도를 저장하고 설정, 조회할 수 있다.
public class ItemFreq { private String item; private Long freq; public ItemFreq() { this.item = ""; this.freq = 0L; } public ItemFreq(String item, long freq) { this.item = item; this.freq = freq; } public String getItem() { return item; } public void setItem(String item) { this.item = item; } public Long getFreq() { return freq; } public void setFreq(Long freq) { this.freq = freq; } } |
다음은 상위 빈도 10개 단어를 추출하는 MR 프로그램이다.
public class TopN { public static void insert(PriorityQueue queue, String item, Long lValue, int topN) {
ItemFreq head = (ItemFreq)queue.peek();
if (queue.size() < topN || head.getFreq() < lValue) {
ItemFreq itemFreq = new ItemFreq(item, lValue);
queue.add(itemFreq);
if (queue.size() > topN) {
queue.remove();
}
}
} public static class ItemFreqComparator implements Comparator<ItemFreq> {
public int compare(ItemFreq x, ItemFreq y) {
if (x.getFreq() < y.getFreq()) {
return -1;
}
if (x.getFreq() > y.getFreq()) {
return 1;
}
return 0;
}
} public static class Map extends Mapper<Text, Text, Text, LongWritable> { private final static LongWritable one = new LongWritable(1); Comparator<ItemFreq> comparator = new ItemFreqComparator(); PriorityQueue<ItemFreq> queue = new PriorityQueue<ItemFreq>(10, comparator); int topN = 10; @Override protected void setup(Context context) throws IOException, InterruptedException { topN = context.getConfiguration().getInt("topN", 10); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { while (queue.size() != 0) { ItemFreq item = (ItemFreq)queue.remove(); context.write(new Text(item.getItem()), new LongWritable(item.getFreq())); } } public void map(Text key, Text value, Context context) throws IOException, InterruptedException { Long lValue = (long)Integer.parseInt(value.toString()); insert(queue, key.toString(), lValue, topN); } } public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> { Comparator<ItemFreq> comparator = new ItemFreqComparator(); PriorityQueue<ItemFreq> queue = new PriorityQueue<ItemFreq>(10, comparator); int topN = 10; @Override protected void setup(Context context) throws IOException, InterruptedException { topN = context.getConfiguration().getInt("topN", 10); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { while (queue.size() != 0) { ItemFreq item = (ItemFreq)queue.remove(); context.write(new Text(item.getItem()), new LongWritable(item.getFreq())); } } public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0;
for (LongWritable val : values) {
sum += val.get();
} insert(queue, key.toString(), sum, topN);
}
}
public static void main(String[] args) throws Exception { Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "TopN");
job.setJarByClass(TopN.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setNumReduceTasks(1); job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1])); job.getConfiguration().setInt("topN", Integer.parseInt(args[2]));
job.waitForCompletion(true) } } |