Hadoop小组实验之Maven工程java代码分析

原始数据

输入:tail -10 SogouQ.reduced | iconv -f GBK -t UTF-8查看原始数据前10条。

原始数据的格式为:

依次代表:访问时间 用户ID [查询词] 该URL在返回结果中的排名 用户点击的顺序号 用户点击的URL

数据清洗: 因为原始数据中有些行的字段数不为6,且原始数据的字段分隔符不是Hive表规定的逗号’,’,所以需要对原始数据进行数据清洗。

通过编写MapReduce程序完成数据清洗:

  • 将不满足6个字段的行删除
  • 将字段分隔符由不等的空格变为逗号‘,’分隔符

代码分析

SogouMapper.java

首先看SogouMapper.java类,完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

// k1, v1, k2, v2
public class SogouMapper extends Mapper<LongWritable,Text,Text,NullWritable> {

@Override
/**
* 在任务开始时,被调用一次。且只会被调用一次。
*/
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
}

@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
//避免乱码
//数据格式:20111230000005 57375476989eea12893c0c3811607bcf 奇艺高清 1 1 http://www.qiyi.com/
String data = new String(v1.getBytes(),0,v1.getLength(),"GBK");

//split("\\s+") \\s+为正则表达式,意思是匹配一个或多个空白字符,包括空格、制表、换页符等。
//参考:http://www.runoob.com/java/java-regular-expressions.html
String words[] = data.split("\\s+");

//判断数据如果不等于6个字段,则退出程序
if(words.length != 6){
return;//return语句后不带返回值,作用是退出该程序的运行 https://www.cnblogs.com/paomoopt/p/3746963.html
}
//用逗号代替空白字符
String newData = data.replaceAll("\\s+",",");
//输出
context.write(new Text(newData),NullWritable.get());
}

@Override
/**
* 在任务结束时,被调用一次。且只会被调用一次。
*/
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
}
}

setup和cleanup

其中在开头和结尾,重写的两个方法setup和cleanup:

1
2
3
4
5
6
7
@Override
/**
* 在任务开始时,被调用一次。且只会被调用一次。
*/
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
}
1
2
3
4
5
6
7
@Override
/**
* 在任务结束时,被调用一次。且只会被调用一次。
*/
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
}

hadoop中的MapReduce框架里已经预定义了相关的接口,其中如Mapper类下的方法setup()cleanup()

setup() 资源初始化

此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作

若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高!

cleanup() 释放资源

此方法被MapReduce框架仅且执行一次,在执行完毕Map任务后,进行相关变量或资源的释放工作

若是将释放资源工作放入方法map()中,也会导 致Mapper任务在解析、处理每一行文本后释放资源,

而且在下一行文本解析前还要重复初始化,导致反复重复,程序运行效率不高!

所以,建议资源初始化及释放工作,分别放入方法setup()和cleanup()中进行

map

在中间重写的map方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
//避免乱码
//数据格式:20111230000005 57375476989eea12893c0c3811607bcf 奇艺高清 1 1 http://www.qiyi.com/
String data = new String(v1.getBytes(),0,v1.getLength(),"GBK");

//split("\\s+") \\s+为正则表达式,意思是匹配一个或多个空白字符,包括空格、制表、换页符等。
//参考:http://www.runoob.com/java/java-regular-expressions.html
String words[] = data.split("\\s+");

//判断数据如果不等于6个字段,则退出程序
if(words.length != 6){
return;//return语句后不带返回值,作用是退出该程序的运行 https://www.cnblogs.com/paomoopt/p/3746963.html
}
//用逗号代替空白字符
String newData = data.replaceAll("\\s+",",");
//输出
context.write(new Text(newData),NullWritable.get());
}

避免乱码:

1
2
//避免乱码
String data = new String(v1.getBytes(),0,v1.getLength(),"GBK");

数据格式为:

20111230000005 57375476989eea12893c0c3811607bcf 奇艺高清 1 1 http://www.qiyi.com/

分别表示:访问时间 用户ID [查询词] 该URL在返回结果中的排名 用户点击的顺序号 用户点击的URL

其中的v1是hadoop中的Text类对象,v1.getBytes()将Text类的v1转换成字节数组byte bytes[];v1.getLength()返回v1的byte舒长度。

而new String(v1.getBytes(),0,v1.getLength(),“GBK”)则是Java中string类的其中一个构造方法,其源码为:

1
2
3
4
5
6
7
public String(byte bytes[], int offset, int length, String charsetName)
throws UnsupportedEncodingException {
if (charsetName == null)
throw new NullPointerException("charsetName");
checkBounds(bytes, offset, length);
this.value = StringCoding.decode(charsetName, bytes, offset, length);
}

其中的四个参数分别代表:

  • byte bytes[]:要解码为字符的字节数组
  • int offset:要解码字节数组中的第一个 byte 的索引
  • int length:要解码的字节数组的长度
  • String charsetName:指定编码(此处使用GBK)

将不满足6个字段的行删除::

1
2
3
4
5
6
//split("\\s+") \\s+为正则表达式,意思是匹配一个或多个空白字符,包括空格、制表、换页符等。
//参考:http://www.runoob.com/java/java-regular-expressions.html
String words[] = data.split("\\s+");
if(words.length != 6){
return;//return语句后不带返回值,作用是退出该程序的运行 https://www.cnblogs.com/paomoopt/p/3746963.html
}

split方法为String类中定义的切割字符串方法,这里我们利用正则表达式将上述转码后的字符串data分割为一个字符串数组words[]

如果字符串数组words[]的元素不为6,那么我们就不执行后续代码。

将字段分隔符由不等的空格变为逗号‘,’分隔符:

如果words[]的元素为6,那么就执行后续代码:

1
2
//用逗号代替空白字符
String newData = data.replaceAll("\\s+",",");

String类中的replaceAll方法,将若干空格替换成逗号","

然后输出结果:

1
2
//输出                k2              v2
context.write(new Text(newData),NullWritable.get());

其中Mapper中map方法下context.write的流程与代码详解可以看此博客,这里简单讲一下:

Map和Reduce中的Context对象的write方法都是调用RecordWriter的write方法,其中RecordWriter有很多的实现类如classMapTask下的privateclass NewOutputCollector、private class NewDirectOutputCollector

在理论知识中,map操作会将集合中的元素从一种形式转化成另一种形式,在这种情况下,输入的键值对会被转换成零到多个键值对输出。其中输入和输出的键必须完全不同,而输入和输出的值则可能完全不同。而上述代码正是完成了这一步。

App.java

启动类App.java代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
* 数据清洗器 主类
*
*/
public class App
{
public static void main( String[] args ) throws Exception {
Configuration conf = new Configuration();

Job job = Job.getInstance(conf);
job.setJarByClass(App.class);

//指定map输出
job.setMapperClass(SogouMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);

//指定reduce的输出
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);

//指定输入、输出
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));

//提交job,等待结束
job.waitForCompletion(true);

}
}

其中:

1
2
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

什么也不设置时,如果在安装了hadoop的机器上运行时,自动读取/home/hadoop/app/hadoop-2.9.2/etc/hadoop/core-site.xml文件放入Configuration中

剩下代码的解释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Job job = Job.getInstance(conf);
// 指定本程序的jar包所在的本地路径(App.class)
job.setJarByClass(App.class);

//指定map输出
//指定本业务job要使用的mapper业务类(为SogouMapper.class)
job.setMapperClass(SogouMapper.class);
//指定mapper输出数据的kv类型(k1 Text和v1 NullWritable)
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);

//指定reduce的输出
//指定最终输出的数据的kv类型(k2 Text 和v2 NullWritable)
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);

//指定输入、输出
//指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job,new Path(args[0]));
//指定job的输出结果所在目录
FileOutputFormat.setOutputPath(job,new Path(args[1]));

//提交job,等待结束
//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
job.waitForCompletion(true);

MapReduce快速入门