Big Data框架--MapReduce笔记

MapReduce是一个分布式运算程序的编程框架,是基于Hadoop的数据分析的核心框架


MapReduce概述


简介

  • MapReduce是一个分布式运算程序的编程框架,是基于Hadoop的数据分析的核心框架
  • map负责把一个任务分解成多个任务
  • reduce负责把分解后的多任务处理结果汇总

优点

  1. MapReduce 易于编程

    它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。

  2. 良好的扩展性

    当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。

  3. 高容错性

    MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。

  4. 适合PB级以上海量数据的离线处理

    可以实现上千台服务器集群并发工作,提供数据处理能力.

缺点

  1. 不擅长实时计算

    MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果

  2. 不擅长流式计算

    流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的

  3. 不擅长DAG(有向图)计算

    多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。

核心编程思想


MapReduce工作流程


Maptask阶段

  • Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。

  • Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。

  • Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。

  • Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

    ​ 溢写阶段详情:

    1. 利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。

    2. 按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。

    3. 将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。

  • Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

​ 当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。

​ 在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

​ 让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

shuffle阶段

  1. MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中

  2. 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件

  3. 多个溢出文件会被合并成大的溢出文件

  4. 在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序

  5. ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据

  6. ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)

  7. 合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)

    注意:

    Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。

    缓冲区的大小可以通过参数调整,参数:io.sort.mb默认100M。

Reduce阶段

  • Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

  • Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

  • Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

  • Reduce阶段:reduce()函数将计算结果写到HDFS上。

总结

  1. 输入数据接口: InputFormat

    • 默认使用的实现类是: TextInputFormat

    • TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回。

    • KeyValueTextInputFormat每一行均为一条记录,被分隔符分割为key value,默认分隔符是tab (\t )。

    • NlineInputFormat按照指定的行数N来划分切片。

    • CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率。

    • 用户还可以自定义InputFormat。

  2. 逻辑处理接口:

    ​ Mapper用户根据业务需求实现其中三个方法: map,setup),cleanup

  3. Partitioner分区

    • 有默认实现HashPartitioner ,逻辑是根据key的哈希值和 numReduces来返回一个分区号; key.hashCode()&IntegerMAXVALUE % numReduces

    • 如果业务上有特别的需求,可以自定义分区。

  4. Comparable排序

    • 当我们用自定义的对象作为key来输出时,就必须要实现 WritableComparable接口,重写其中的compareTo()方法

    • 部分排序:对最终输出的每一个文件进行内部排序。

    • 全排序:对所有数据进行排序,通常只有一个Reduce.

    • 二次排序:排序的条件有两个。

  5. Combiner合并

    Combiner合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果。

  6. Reduce端分组: GroupingComparator

    在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想上一个或几个字段相同(全部字段比较不相同)的 key进入到同一个reduce方法时,可以采用分组排序

  7. 逻辑处理接口: Reducer

    用户根据业务需求实现其中三个方法: reduce() setup() cleanup()


Hadoop数据压缩


概述

压缩技术能够有效减少底层存储系统(HDFS)读写字节数。压缩提高了网络带宽和磁盘空间的效率。在运行MR程序时,I/O操作、网络数据传输、 Shuffle和Merge要花大量的时间,尤其是数据规模很大和工作负载密集的情况下,因此,使用数据压缩显得非常重要。

鉴于磁盘I/O和网络带宽是Hadoop的宝贵资源,数据压缩对于节省资源、最小化磁盘I/O和网络传输非常有帮助可以在任意MapReduce阶段启用压缩。不过,尽管压缩与解压操作的CPU开销不高,其性能的提升和资源的节省并非没有代价。

压缩策略和原则

压缩是提高Hadoop运行效率的一种优化策略。通过对Mapper, Reducer运行过程的数据进行压缩,以减少磁盘IO,提高MR程序运行速度。

注意:采用压缩技术减少了磁盘IO,但同时增加了CPU运算负担。所以,压缩特性运用得当能提高性能,但运用不当也可能降低性能

压缩基本原则

  • 运算密軍型的job ,少用压缩

  • IO密集型的job,多用压缩

压缩方式选择

Gzip压缩

优点:压缩率比较高,而且压缩/解压速度也比较快;Hadoop本身支持,在应用中处理Gzip格式的文件就和直接处理文本一样;大部分Linux系统都自带Gzip命令,使用方便。

缺点:不支持Split。

应用场景:当每个文件压缩之后在130M以内的(1个块大小内),都可以考虑用Gzip压缩格式。例如说一天或者一个小时的日志压缩成一个Gzip文件。

Bzip2压缩

优点:支持Split;具有很高的压缩率,比Gzip压缩率都高;Hadoop本身自带,使用方便。

缺点:压缩/解压速度慢。

应用场景

  • 适合对速度要求不高,但需要较高的压缩率的时候;
  • 或者输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况;
  • 或者对单个很大的文本文件想压缩减少存储空间,同时又需要支持Split,而且兼容之前的应用程序的情况。

Lzo压缩

优点:压缩/解压速度也比较快,合理的压缩率;支持Split,是Hadoop中最流行的压缩格式;可以在Linux系统下安装lzop命令,使用方便。

缺点:压缩率比Gzip要低一些;Hadoop本身不支持,需要安装;在应用中对Lzo格式的文件需要做一些特殊处理(为了支持Split需要建索引,还需要指定InputFormat为Lzo格式)。

应用场景:一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,Lzo优点越越明显。

Snappy压缩

优点:高速压缩速度和合理的压缩率。

缺点:不支持Split;压缩率比Gzip要低;Hadoop本身不支持,需要安装。

应用场景

  • 当MapReduce作业的Map输出的数据比较大的时候,作为Map到Reduce的中间数据的压缩格式;
  • 或者作为一个MapReduce作业的输出和另外一个MapReduce作业的输入

压缩位置选择


MapReduce优化

缓慢原因

MapReduce程序效率的瓶颈在于两点:

  • 计算机性能

    CPU、内存、磁盘健康、网络.

  • IO操作优化

    • 数据倾斜
    • Map和Reduce教设置不合理
    • Map运行时间太长,导致Reduce等待过久
    • 小文件过多
    • 大量的不可分块的超大文件
    • Spill次数过多
    • MergeX数过多等。

优化方法

MapReduce优化方法主要从六个方面考虑:数据输入、Map阶段、Reduce阶段、IO传输、数据倾斜问题和常用的调优参数。

数据输入

  • 合并小文件:在执行MR任务前将小文件进行合开,大量的小文件会产生大量的Map任务,增大Map任务装载次数,而任务的装载比较耗时,从而·导致MR运行较慢,
  • 采用CombineTextnputFormat来作为输入,解决输入端大量小文件场景。

Map阶段

  • 减少溢写( Spill)次数:通过调整io.sort.mb及sort.spill.perc ent参数值,增大触发Spill的内存上限,减少Spill次数,从而减少磁盘IO
  • 减少合井(Merge )次数:通过调整io.sort.factor参数,增大Merge的文件数目,减少Merge的次数,从而缩短MR处理时间
  • 在Map之后,不影响业务逻辑前提下,先进行Combine处理,减少IO

Reduce阶段

  • **合理设置Map和Reduce数:**两个都不能设置太少,也不能设置太多。太少,会导致Tak等待,延长处理时间;太多,会导致Map, Reduce任务间竞争资源,造成处理超时等错误。
  • 设置Map, Reduce共存:调整slowstart.completedmaps参数,使Map运行到一定程度后, Reducet开始运行,减少Reduce的等待时间。
  • 规避使用Reduce :因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。
  • 合理设置Reduce端的Buffer:默认情况下,数据达到一个阈值的时候,Buffer中的数据就会写入磁盘,然后Reduce会从磁盘中获得所有的数据。也就是说,Buffer和Reduce是没有直接关联的,中间多次写磁盘->读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得Buffer中的一部分数据可以直接输送到Reduce,从而减少IO开销:mapreduce.reduce.input.buffer.percent,默认为0.0。当值大于0的时候,会保留指定比例的内存读Buffer中的数据直接拿给Reduce使用。这样一来,设置Buffer需要内存,读取数据需要内存,Reduce计算也要内存,所以要根据作业的运行情况进行调整。

I/O传输

  • 采用数据压缩的方式:减少网络IO的时间,安装snappy和LZO压缩编码器
  • 使用SequenceFile二进制文件

数据倾斜问题

  1. 数据倾斜现象

    数据频率倾斜:某一个区域的数据量要远远大于其他区域。

    数据大小倾斜:部分记录的大小远远大于平均值。

  2. 减少数据倾斜的方法方法

    • 方法1:抽样和范围分区

      可以通过对原始封居进行抽样得到的结果集来预设分区边界值。

    • 方法2 :自定义分区基于输出键的背景知识进行自定义分区

      例如,如果Ma出键的单词来源于一本书。且其中某几个专业词汇较多。那么就可以自定义分区将这这些专业词汇发送给固定的一部分Redauce实例。而得其他的都发送给剩的Reduce实例。

    • 方法3: Combine使用Combine可以大量地减小数据倾斜。

      在可能的情况下, Combine的目的就是聚合并精简数据。

    • 方法4:采用Map Join,尽量少使用Reduce Join

HDFS小文件优化方法

弊端:

HDFS上每个文件都要在NameNode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用NameNode的内存空间,另一方面就是索引文件过大使得索引速度变慢。

小文件的优化无非以下几种方式:

(1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS。

(2)在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并。

(3)在MapReduce处理时,可采用CombineTextInputFormat提高效率。

解决方案:

  1. Hadoop Archive

    是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样就减少了NameNode的内存使用。

  2. Sequence File

    Sequence File由一系列的二进制key/value组成,如果key为文件名, value为文件内容,则可以将大批小文件台并成一个大文件。

  3. CombineFilelnputForm at

    CombineFilelnputFormat是一种新的InputFormat,用于将多个文件合并成一个单独的Split ,另外,它会考虑教据的存储位置。

  4. 开启JVM重用对于大量小文件Job ,可以开启JVM重用会减少45%运行时间。

    JVM重用原理:一个Map运行在一个JVM上,开启重用的话,该Map在JVM上运行完毕后,JVM继续运行其他Map.

    具体设置: mapreduce.job.jvm.nurntasks值在10-20之间。

0%