Hadoop 里执行 MapReduce 任务的几种常见方式

MR中一个InputSplit对应着一个map任务,而一个InputSplit可能包含一个或者多个block,block是HDFS对数据的物理划分而InputSplit是MR对文件的逻辑划分,通常来说InputSplit的大小和block的大小相同最适宜。其中单个
block
的操作是本地操作,将更多的时间用于计算,而不是数据传输。也就是说这个map任务在运行的过程中只从本节点上读取所需要的数据,而不会与其他节点进行通信获取数据,map任务之间的独立性,对任意一个map任务,都会在单独的Java进程中被初始化,map之间也不会进行通信。那同样对于reduce任务也是的,其中在reduce中不同key关联的value
list
操作也是相互独立的。这种任务独立性就保证了容错性,TaskTracker定时会以心跳的方式向JobTracker通信,超过这个时间JobTracker任务这个任务失败,在其它地方重新执行这个任务,不需要任务的中间状态,而是直接重启,这个节点的选择也要考虑到数据所在的节点位置,尽量保证数据本地性,移动代码而不是移动数据。

MapReduce的执行过程主要包含是三个阶段:Map阶段、Shuffle阶段、Reduce阶段

说明:

Hadoop的MapReduce
shuffle过程,非常重要。只有熟悉整个过程才能对业务了如指掌。

独立性也可以扩展到cluster
node上面,也就是执行mapper或reducer的过程,(以用户的角度)各node也是不会进行通信的,那各node节点唯一发生的通信的map和reduce中间的shuffle过程,此时通信也是隐式的,由于key的存在,便知道目的节点,所有的数据传送操作都是Hadoop平台去做的,对用户是透明的。

图片 1

测试文件:

MapReduce执行流程

TaskTracker 向 JobTracker  报告任务运行情况

MapReuce 过程分解

echo -e “aa\tbb \tcc\nbb\tcc\tdd” > 3.txt

图片 2

JobTracker 向 TaskTracker 分配任务 监视任务运行情况

Map 阶段

  • split: 会将输入的大文件 split 成一个 HDFS 的 block,每个 map
    处理一个 block 的数据
  • map:对输入分片中的每个键值对调用map()函数进行运算,然后输出一个结果键值对

Partitioner:对 map
的输出进行partition,即根据key或value及reduce的数量来决定当前的这对键值对最终应该交由哪个reduce处理。默认是对key哈希后再以reduce
task数量取模,默认的取模方式只是为了避免数据倾斜。这个 partition
过程可以通过指定 partitioner 自定义

  • sort:在溢写到磁盘之前,使用快排对缓冲区数据按照partitionIdx,
    key排序。(每个partitionIdx表示一个分区,一个分区对应一个reduce)

Combiner:如果设置了Combiner,那么在Sort之后,还会对具有相同key的键值对进行合并,减少溢写到磁盘的数据量。

  • spill:
    map输出写在内存中的环形缓冲区,默认当缓冲区满80%,启动溢写线程,以
    round-robin的方式将缓冲的数据写出到 mapreduce.cluster.local.dir
    指定的目录磁盘
  • merge:溢写可能会生成多个文件,这时需要将多个文件合并成一个文件。合并的过程中会不断地进行
    sort & combine 操作,最后合并成了一个已分区且已排序的文件

Hadoop fs -put 3.txt
/tmp/3.txt

输入和拆分:

DataNode 向 NameNode 请求data meat-data 和 更新自己的data meat-data   
change

Shuffle阶段

广义上Shuffle阶段横跨Map端和Reduce端,在Map端包括Spill过程,在Reduce端包括copy和merge/sort过程。通常认为Shuffle阶段就是将map的输出作为reduce的输入的过程

  • Copy:Reduce端启动一些copy线程,通过HTTP方式将map端输出文件中属于自己的部分拉取到本地。Reduce会从多个map端拉取数据,并且每个map的数据都是有序的。
  • Merge:Copy过来的数据会先放入内存缓冲区中,这里的缓冲区比较大;当缓冲区数据量达到一定阈值时,将数据溢写到磁盘(与map端类似,溢写过程会执行
    sort &
    combine)。如果生成了多个溢写文件,它们会被merge成一个有序的最终文件。这个过程也会不停地执行
    sort & combine 操作。

全文的例子均以该文件做测试用例,统计单词出现的次数(WordCount)。

不属于map和reduce的主要过程,但属于整个计算框架消耗时间的一部分,该部分会为正式的map准备数据。

DataNode 向 DataNode 请求data

Reduce阶段

  • reduce:Shuffle阶段最终生成了一个有序的文件作为Reduce的输入,对于该文件中的每一个
    <key, [value1,value2…]>调用reduce()方法,并将结果写到HDFS

1、原生态的方式:java 源码编译打包成jar包后,由 hadoop
脚本调度执行,举例:

分片(split)操作:

那么什么是speculative execution呢?可能由于部分节点 I/O CPU
网络带宽的限制,部分任务运行慢,然而大部分作业已经完成,那么为了不让这些运行节点称为任务的瓶颈,hadoop会在几个空闲的节点上调度执行剩余任务的拷贝,其中某一个任务完成时便向JobTracker报告,首先完成的拷贝称为权威拷贝,其他任务放弃执行,其输出也会被放弃。reducer
则从权威拷贝获取所需数据或者将数据接入到结果文件。mapred.map.tasks.speculative.execution
可以设置禁止任务的推测性执行。

import java.io.IOException;
import java.util.StringTokenizer;

split只是将源文件的内容分片形成一系列的 InputSplit,每个 InputSpilt
中存储着对
应分片的数据信息(例如,文件块信息、起始位置、数据长度、所在节点列表…),并不是将源文件分割成多个小文件,每个InputSplit
都由一个 mapper 进行后续处理。

计算资源:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

每个分片大小参数是很重要的,splitSize
是组成分片规则很重要的一个参数,该参数由三个值来确定:

1. 处理器时间

public class WordCount {

minSize:splitSize 的最小值,由 mapred-site.xml 配置文件中
mapred.min.split.size 参数确定。

2. 内存

 public static class TokenizerMapper extends
   Mapper<Object, Text, Text, IntWritable> {
  /** 
        * LongWritable, IntWritable, Text 均是 Hadoop 中实现的用于封装
Java 数据类型的类,
        * 这些类实现了WritableComparable接口, 
        * 都能够被串行化从而便于在分布式环境中进行数据交换,
        * 你可以将它们分别视为long,int,String 的替代品。 
        */ 
  // IntWritable one 相当于 java 原生类型 int 1
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();

maxSize:splitSize 的最大值,由 mapred-site.xml
配置文件中mapreduce.jobtracker.split.metainfo.maxsize 参数确定。

3. 磁盘

  public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {
   // 每行记录都会调用 map 方法处理,此处是每行都被分词
   StringTokenizer itr = new StringTokenizer(value.toString());
   while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    // 输出每个词及其出现的次数 1,类似
<word1,1><word2,1><word1,1>
    context.write(word, one);
   }
  }
 }

blockSize:HDFS 中文件存储的快大小,由 hdfs-site.xml 配置文件中
dfs.block.size 参数确定。

4. 网络带宽

 public static class IntSumReducer extends
   Reducer<Text, IntWritable, Text, IntWritable> {
  private IntWritable result = new IntWritable();

splitSize的确定规则:splitSize=max{minSize,min{maxSize,blockSize}}

对于Hadoop来说, 磁盘,网络带宽可能是个瓶颈。

  public void reduce(Text key, Iterable<IntWritable> values,
    Context context) throws IOException, InterruptedException {
   // key 相同的键值对会被分发到同一个 reduce中处理
   // 例如 <word1,<1,1>>在 reduce1
中处理,而<word2,<1>> 会在 reduce2 中处理
   int sum = 0;
   // 相同的key(单词)的出现次数会被 sum 累加
   for (IntWritable val : values) {
    sum += val.get();
   }
   result.set(sum);
   // 1个 reduce 处理完1 个键值对后,会输出其
key(单词)对应的结果(出现次数)
   context.write(key, result);
  }
 }

数据格式化(Format)操作:

图片 3

 public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  // 多队列hadoop集群中,设置使用的队列
  conf.set(“mapred.job.queue.name”, “regular”);
  // 之所以此处不直接用 argv[1]
这样的,是为了排除掉运行时的集群属性参数,例如队列参数,
  // 得到用户输入的纯参数,如路径信息等
  String[] otherArgs = new GenericOptionsParser(conf, args)
    .getRemainingArgs();
  if (otherArgs.length != 2) {
   System.err.println(“Usage: wordcount <in> <out>”);
   System.exit(2);
  }
  Job job = new Job(conf, “word count”);
  job.setJarByClass(WordCount.class);
  // map、reduce 输入输出类
  job.setMapperClass(TokenizerMapper.class);
  job.setCombinerClass(IntSumReducer.class);
  job.setReducerClass(IntSumReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  // 输入输出路径
  FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  // 多子job的类中,可以保证各个子job串行执行
  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

将划分好的 InputSplit 格式化成键值对形式的数据。其中 key 为偏移量,value
是每一行的内容。

执行:

值得注意的是,在map任务执行过程中,会不停的执行数据格式化操作,每生成一个键值对就会将其传入
map,进行处理。所以map和数据格式化操作并不存在前后时间差,而是同时进行的。

bin/hadoop jar /tmp/wordcount.jar WordCount /tmp/3.txt /tmp/5

spacer.gif

结果:

2)Map 映射:

hadoop fs -cat /tmp/5/*
aa      1
bb      2
cc      2
dd      1

是 Hadoop 并行性质发挥的地方。根据用户指定的map过程,MapReduce
尝试在数据所在机器上执行该 map 程序。在
HDFS中,文件数据是被复制多份的,所以计算将会选择拥有此数据的最空闲的节点。

参考资料:

在这一部分,map内部具体实现过程,可以由用户自定义。

Hadoop – Map/Reduce 通过WordCount例子的变化来了解新版hadoop接口的变化
http://www.linuxidc.com/Linux/2013-04/82868.htm

3)Shuffle 派发:

Hadoop示例程序WordCount运行及详解
http://www.linuxidc.com/Linux/2013-04/82871.htm

Shuffle 过程是指Mapper 产生的直接输出结果,经过一系列的处理,成为最终的
Reducer
直接输入数据为止的整个过程。这是mapreduce的核心过程。该过程可以分为两个阶段:

官方的 wordcount v1.0 例子

Mapper 端的Shuffle:由 Mapper
产生的结果并不会直接写入到磁盘中,而是先存储在内存中,当内存中的数据量达到设定的阀值时,一次性写入到本地磁盘中。并同时进行
sort(排序)、combine(合并)、partition(分片)等操作。其中,sort 是把
Mapper 产 生的结果按照 key 值进行排序;combine
是把key值相同的记录进行合并;partition 是把 数据均衡的分配给 Reducer。

图片 4

Reducer 端的 Shuffle:由于Mapper和Reducer往往不在同一个节点上运行,所以
Reducer
需要从多个节点上下载Mapper的结果数据,并对这些数据进行处理,然后才能被
Reducer处理。

4)Reduce 缩减:

Reducer
接收形式的数据流,形成形式的输出,具体的过程可以由用户自定义,最终结果直接写入hdfs。每个reduce进程会对应一个输出文件,名称以part-开头。

更多Hadoop相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

本文永久更新链接地址:http://www.linuxidc.com/Linux/2017-10/147428.htm

图片 5

相关文章