分布式计算框架——MapReduce

分布式计算框架——MapReduce

基本概念

MapReduce是一个分布式计算框架,主要用来解决海量数据的计算问题。

MapReduce核心思想

MapReduce由两个阶段组成,Map阶段和Reduce阶段。
用户只需要实现map()和reduce()两个函数,即可实现分布式计算。

Map阶段的MapTask并发实例,完全并行运行,互不相干。
Reduce阶段的ReduceTask并发实例互不相干,但是它们的数据依赖上一阶段的所有MapTask并发实例的输出。

一个完整的MapReduce程序在运行时有三类实例进程:

  • MrAppMaster:负责整个程序的过程调度及状态协调
  • MapTask:负责Map阶段的整个数据处理
  • ReduceTask:负责Reduce阶段的整个数据处理

MapReduce编程规范

用户编写的程序分为三个部分:Mapper、Reducer、Driver。

Mapper

用户自定义的Mapper要继承自己的父类
Mapper的输入数据类型是Key-Value键值对的形式
Mapper的输出数据类型是Key-Value键值对的形式
Mapper中的业务逻辑写在map()方法中
map()方法(MapTask)进程对每一个<k,v>调用一次

Reducer

用户自定义的Reducer方法要继承自己的父类
Reducer的输出数据类型对应Mapper的输出数据类型,也是Key-Value键值对的形式
Reducer的业务逻辑写在reduce()方法中
reduce()方法(ReduceTask)进程对每一组相同key的<k,v>组调用一次

Driver

用于提交整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象。

WordCount案例

pom文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.1</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>

demo代码

WordCountMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* MapReduce编程规范
* Map阶段
* <p>
* KEYIN :输入的key类型——LongWritable,这里是偏移量
* VALUEIN:输入的value类型——Text,文本内容
* KEYOUT:输出的key类型——Text
* VALUEOUT:输出的value类型——IntWritable
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outKey = new Text();
private IntWritable outValue = new IntWritable(1);

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(",");
for (String word : words) {
outKey.set(word);
context.write(outKey, outValue);
}
}
}

WordCountReducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* MapReduce编程规范
* Reduce阶段
* <p>
* KEYIN :输入的key类型——Map的输出key类型
* VALUEIN:输入的value类型——Map的输出value类型
* KEYOUT:输出的key类型——Text
* VALUEOUT:输出的value类型——IntWritable
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outValue = new IntWritable(1);

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
// 累加计算
for (IntWritable value : values) {
sum += value.get();
}
// 输出
outValue.set(sum);
context.write(key, outValue);
}
}

WordCountDriver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* MapReduce编程规范
* Driver类
*/
public class WordCountDriver {
public static void main(String[] args) throws Exception {
// 1.获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.设置jar包路径
job.setJarByClass(WordCountDriver.class);
// 3.关联mapper和reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4.设置mapper的输出k-v
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.设置最终的输出k-v
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6.设置输入和输出路径。结果以文件夹的形式体现。
FileInputFormat.setInputPaths(job, new Path("D://abc.txt"));
FileOutputFormat.setOutputPath(job, new Path("D://test"));
// 7.提交job作业
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2015-2023 henrrywan

请我喝杯咖啡吧~

支付宝
微信