Hadoop小组实验之Maven工程java代码分析
原始数据
输入:tail -10 SogouQ.reduced | iconv -f GBK -t UTF-8查看原始数据前10条。
原始数据的格式为:
依次代表:访问时间 用户ID [查询词] 该URL在返回结果中的排名 用户点击的顺序号 用户点击的URL
数据清洗: 因为原始数据中有些行的字段数不为6 ,且原始数据的字段分隔符不是Hive表规定的逗号’,’,所以需要对原始数据进行数据清洗。
通过编写MapReduce程序完成数据清洗:
将不满足6个字段的行删除
将字段分隔符由不等的空格变为逗号‘,’分隔符
代码分析
SogouMapper.java
首先看SogouMapper.java类,完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class SogouMapper extends Mapper <LongWritable ,Text ,Text ,NullWritable > { @Override protected void setup (Context context) throws IOException, InterruptedException { super .setup(context); } @Override protected void map (LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { String data = new String(v1.getBytes(),0 ,v1.getLength(),"GBK" ); String words[] = data.split("\\s+" ); if (words.length != 6 ){ return ; } String newData = data.replaceAll("\\s+" ,"," ); context.write(new Text(newData),NullWritable.get()); } @Override protected void cleanup (Context context) throws IOException, InterruptedException { super .cleanup(context); } }
setup和cleanup
其中在开头和结尾,重写的两个方法setup和cleanup:
1 2 3 4 5 6 7 @Override protected void setup (Context context) throws IOException, InterruptedException { super .setup(context); }
1 2 3 4 5 6 7 @Override protected void cleanup (Context context) throws IOException, InterruptedException { super .cleanup(context); }
hadoop中的MapReduce框架里已经预定义了相关的接口,其中如Mapper类下的方法setup()和 cleanup() 。
setup() 资源初始化
此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作 。
若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高!
cleanup() 释放资源
此方法被MapReduce框架仅且执行一次,在执行完毕Map任务后,进行相关变量或资源的释放工作 。
若是将释放资源工作放入方法map()中,也会导 致Mapper任务在解析、处理每一行文本后释放资源,
而且在下一行文本解析前还要重复初始化,导致反复重复,程序运行效率不高!
所以,建议资源初始化及释放工作,分别放入方法setup()和cleanup()中进行 。
map
在中间重写的map方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Override protected void map (LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { String data = new String(v1.getBytes(),0 ,v1.getLength(),"GBK" ); String words[] = data.split("\\s+" ); if (words.length != 6 ){ return ; } String newData = data.replaceAll("\\s+" ,"," ); context.write(new Text(newData),NullWritable.get()); }
避免乱码:
1 2 String data = new String(v1.getBytes(),0 ,v1.getLength(),"GBK" );
数据格式为:
20111230000005 57375476989eea12893c0c3811607bcf 奇艺高清 1 1 http://www.qiyi.com/
分别表示:访问时间 用户ID [查询词] 该URL在返回结果中的排名 用户点击的顺序号 用户点击的URL
其中的v1是hadoop中的Text类对象,v1.getBytes()将Text类的v1转换成字节数组byte bytes[];v1.getLength()返回v1的byte舒长度。
而new String(v1.getBytes(),0,v1.getLength(),“GBK”)则是Java中string类的其中一个构造方法,其源码为:
1 2 3 4 5 6 7 public String (byte bytes[], int offset, int length, String charsetName) throws UnsupportedEncodingException { if (charsetName == null ) throw new NullPointerException("charsetName" ); checkBounds(bytes, offset, length); this .value = StringCoding.decode(charsetName, bytes, offset, length); }
其中的四个参数分别代表:
byte bytes[]:要解码为字符的字节数组
int offset:要解码字节数组中的第一个 byte 的索引
int length:要解码的字节数组的长度
String charsetName:指定编码(此处使用GBK)
将不满足6个字段的行删除::
1 2 3 4 5 6 String words[] = data.split("\\s+" ); if (words.length != 6 ){ return ; }
split方法为String类中定义的切割字符串方法,这里我们利用正则表达式将上述转码后的字符串data分割为一个字符串数组words[]
如果字符串数组words[]的元素不为6,那么我们就不执行后续代码。
将字段分隔符由不等的空格变为逗号‘,’分隔符:
如果words[]的元素为6,那么就执行后续代码:
1 2 String newData = data.replaceAll("\\s+" ,"," );
String类中的replaceAll方法,将若干空格替换成逗号","
然后输出结果:
1 2 context.write(new Text(newData),NullWritable.get());
其中Mapper中map方法下context.write的流程与代码详解可以看此博客 ,这里简单讲一下:
Map和Reduce中的Context对象的write方法都是调用RecordWriter的write方法 ,其中RecordWriter有很多的实现类如classMapTask下的privateclass NewOutputCollector、private class NewDirectOutputCollector
在理论知识中,map操作会将集合中的元素从一种形式转化成另一种形式,在这种情况下,输入的键值对会被转换成零到多个键值对输出 。其中输入和输出的键必须完全不同,而输入和输出的值则可能完全不同。而上述代码正是完成了这一步。
App.java
启动类App.java代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class App { public static void main ( String[] args ) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(App.class ) ; job.setMapperClass(SogouMapper.class ) ; job.setMapOutputKeyClass(Text.class ) ; job.setMapOutputValueClass(NullWritable.class ) ; job.setOutputKeyClass(Text.class ) ; job.setMapOutputValueClass(NullWritable.class ) ; FileInputFormat.setInputPaths(job,new Path(args[0 ])); FileOutputFormat.setOutputPath(job,new Path(args[1 ])); job.waitForCompletion(true ); } }
其中:
1 2 Configuration conf = new Configuration(); Job job = Job.getInstance(conf);
什么也不设置时,如果在安装了hadoop的机器上运行时,自动读取/home/hadoop/app/hadoop-2.9.2/etc/hadoop/core-site.xml文件放入Configuration中
剩下代码的解释如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 Job job = Job.getInstance(conf); job.setJarByClass(App.class ) ; job.setMapperClass(SogouMapper.class ) ; job.setMapOutputKeyClass(Text.class ) ; job.setMapOutputValueClass(NullWritable.class ) ; job.setOutputKeyClass(Text.class ) ; job.setMapOutputValueClass(NullWritable.class ) ; FileInputFormat.setInputPaths(job,new Path(args[0 ])); FileOutputFormat.setOutputPath(job,new Path(args[1 ])); job.waitForCompletion(true );
(MapReduce快速入门 )