博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce/Hadoop的TopN解决方案之键唯一的情况
阅读量:2502 次
发布时间:2019-05-11

本文共 7375 字,大约阅读时间需要 24 分钟。

TopN问题:上星期访问次数最多的10个URL是哪些?所有猫中体重最大的10只猫是哪些?

本文使用 MapReduce/Hadoop的TopN解决方案,假设所有输入键都是唯一的。也就是说,对于一个给定的输入集合{<K,V>},所有K都是唯一的。

例如对于下面的猫,cat1不会再出现第二次

输入:cat.txt

12,cat1,cat113,cat2,cat214,cat3,cat315,cat4,cat410,cat5,cat5100,cat100,cat100200,cat200,cat200300,cat300,cat3001,cat001,cat00167,cat67,cat6722,cat22,cat2223,cat23,cat231000,cat1000,cat10002000,cat2000,cat2000

期待输出:

2000	cat2000,cat20001000	cat1000,cat1000300	cat300,cat300200	cat200,cat200100	cat100,cat10067	cat67,cat6723	cat23,cat2322	cat22,cat2215	cat4,cat414	cat3,cat3
一、用到的核心数据结构:Java中的SortedMap和TreeMap,其中SortedMap可以实现按key值排序。对于如下测试类

package topN_hadoop1;import java.util.Map.Entry;import java.util.SortedMap;import java.util.TreeMap;public class Test {	public static void main(String[] args) {		 SortedMap
top = new TreeMap
(); top.put(1, "chenjie,1"); top.put(10, "zhanghan,10"); top.put(3 ,"renbo,3"); for(Entry< Integer, String> entry : top.entrySet()) { System.out.println(entry); } System.out.println("------------------------------------------------------"); System.out.println("firstKey:" + top.firstKey()); System.out.println("first:" + top.get(top.firstKey())); System.out.println("lastKey:" + top.lastKey()); System.out.println("last:" + top.get(top.lastKey())); top.remove(top.firstKey()); System.out.println("remove first "); System.out.println("------------------------------------------------------"); for(Entry< Integer, String> entry : top.entrySet()) { System.out.println(entry); } top.remove(top.lastKey()); System.out.println("remove last "); System.out.println("------------------------------------------------------"); for(Entry< Integer, String> entry : top.entrySet()) { System.out.println(entry); } }}
输出为:

1=chenjie,13=renbo,310=zhanghan,10------------------------------------------------------firstKey:1first:chenjie,1lastKey:10last:zhanghan,10remove first ------------------------------------------------------3=renbo,310=zhanghan,10remove last ------------------------------------------------------3=renbo,3

二、code

package topN_hadoop1;import java.io.IOException;import java.util.SortedMap;import java.util.TreeMap;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class TopNMapper extends   Mapper
{ private int N = 10; // default private SortedMap
top = new TreeMap
(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] lines = value.toString().split(","); String keyAsString = value.toString(); int frequency = Integer.valueOf(lines[0]); String compositeValue = keyAsString + "," + frequency; top.put(frequency, compositeValue); if (top.size() > N) { top.remove(top.firstKey()); } } @Override protected void setup(Context context) throws IOException, InterruptedException { this.N = context.getConfiguration().getInt("N", 10); // default is top 10 } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for (String str : top.values()) { context.write(NullWritable.get(), new Text(str)); } }}

package topN_hadoop1;import java.io.IOException;import java.util.SortedMap;import java.util.TreeMap;import java.util.List;import java.util.ArrayList;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class TopNReducer  extends   Reducer
{ private int N = 10; // default private SortedMap
top = new TreeMap
(); @Override public void reduce(NullWritable key, Iterable
values, Context context) throws IOException, InterruptedException { for (Text value : values) { //value: 2000,cat2000,cat2000,2000 String valueAsString = value.toString().trim(); System.out.println(value); String[] tokens = valueAsString.split(","); String url = tokens[1] + "," + tokens[2];//,cat2000,cat2000 int frequency = Integer.parseInt(tokens[0]);//2000 top.put(frequency, url); if (top.size() > N) { top.remove(top.firstKey()); } } // emit final top N List
keys = new ArrayList
(top.keySet()); for(int i=keys.size()-1; i>=0; i--){ context.write(new IntWritable(keys.get(i)), new Text(top.get(keys.get(i)))); } } @Override protected void setup(Context context) throws IOException, InterruptedException { this.N = context.getConfiguration().getInt("N", 10); // default is top 10 }}
package topN_hadoop1;import org.apache.log4j.Logger;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TopNDriver  extends Configured implements Tool {   private static Logger THE_LOGGER = Logger.getLogger(TopNDriver.class);   public int run(String[] args) throws Exception {      Job job = new Job(getConf());      int N = Integer.parseInt(args[0]); // top N      job.getConfiguration().setInt("N", N);      job.setJobName("TopNDriver");      job.setInputFormatClass(TextInputFormat.class);      job.setOutputFormatClass(TextOutputFormat.class);      job.setMapperClass(TopNMapper.class);      job.setReducerClass(TopNReducer.class);      job.setNumReduceTasks(1);      // map()'s output (K,V)      job.setMapOutputKeyClass(NullWritable.class);         job.setMapOutputValueClass(Text.class);                     // reduce()'s output (K,V)      job.setOutputKeyClass(IntWritable.class);      job.setOutputValueClass(Text.class);       // args[1] = input directory       // args[2] = output directory      FileInputFormat.setInputPaths(job, new Path(args[1]));      FileOutputFormat.setOutputPath(job, new Path(args[2]));            boolean status = job.waitForCompletion(true);      THE_LOGGER.info("run(): status="+status);      return status ? 0 : 1;   }   private static final String INPATH = "input/cat.txt";// 输入文件路径	private static final String OUTPATH = "output/cat_out1";// 输出文件路径      public static void main(String[] args) throws Exception {	   args = new String[3];	   args[0] = "10";	   args[1] = INPATH;	   args[2] = OUTPATH;	         // Make sure there are exactly 3 parameters      if (args.length != 3) {         THE_LOGGER.warn("usage TopNDriver 
"); System.exit(1); } THE_LOGGER.info("N="+args[0]); THE_LOGGER.info("inputDir="+args[1]); THE_LOGGER.info("outputDir="+args[2]); int returnStatus = ToolRunner.run(new TopNDriver(), args); System.exit(returnStatus); }}

四、扩展

1、Top5怎么办?传入另一个参数

2、不求前10个求后10个怎么办?将

if (top.size() > N) {            top.remove(top.firstKey());         }
改成top.lastKey()

转载地址:http://bkqrb.baihongyu.com/

你可能感兴趣的文章
C#修改JPG图片EXIF信息中的GPS信息
查看>>
从零开始的Docker ELK+Filebeat 6.4.0日志管理
查看>>
How it works(1) winston3源码阅读(A)
查看>>
How it works(2) autocannon源码阅读(A)
查看>>
How it works(3) Tilestrata源码阅读(A)
查看>>
How it works(12) Tileserver-GL源码阅读(A) 服务的初始化
查看>>
uni-app 全局变量的几种实现方式
查看>>
echarts 为例讲解 uni-app 如何引用 npm 第三方库
查看>>
uni-app跨页面、跨组件通讯
查看>>
springmvc-helloworld(idea)
查看>>
JDK下载(百度网盘)
查看>>
idea用得溜,代码才能码得快
查看>>
一篇掌握python魔法方法详解
查看>>
数据结构和算法5-非线性-树
查看>>
数据结构和算法6-非线性-图
查看>>
数据结构和算法7-搜索
查看>>
数据结构和算法8-排序
查看>>
windows缺少dll解决办法
查看>>
JPA多条件动态查询
查看>>
JPA自定义sql
查看>>