码迷,mamicode.com
首页 > 其他好文 > 详细

Hadoop小结

时间:2018-08-08 21:23:09      阅读:151      评论:0      收藏:0      [点我收藏+]

标签:work   sel   hdf   pts   简单   阻塞   http   cte   NPU   

Google大数据技术:MapReduce、BigTable、GFS

Hadoop:一个模仿Google大数据技术的开源实现

技术分享图片技术分享图片?

 

HDFS的概念

数据块

磁盘中的关系:

技术分享图片技术分享图片?

HDFS同样也有块(block)的概念,但是大很多,默认为64MB。与单一磁盘上的文件系统相似,HDFS上的文件也被划分为块大小的多个分块(chunk),作为独立的存储单元。但与其他文件系统不同的是,HDFS中小于 一 个块大小的文件不会占据整个块的空间。

 

名称节点和数据节点(NameNode and DataNode)

  • HDFS集群有两种节点,以管理者-工作者的模式运行,即一个名称节点(NameNode,管理者)和多个数据节点(DataNode,工作者)。名称节点管理文件系统的命名空间。它维护着这个文件系统树及这个树内所有的文件和索引目录。这些信息以两种形式将文件永久保存在本地磁盘上:命名空间镜像和编辑日志。名称节点也记录着每个文件的每个块所在的数据节点,但它并不永久保存块的位置,因为这些信息会在系统启动时由数据节点重建。
  • 客户端代表用户通过与名称节点和数据节点交互来访问整个文件系统。客户端提供一个类似POSIX(可移植操作系统界面)的文件系统接口,因此用户在编程时并不需要知道名称节点和数据节点及其功能。
  • 数据节点是文件系统的工作者。它们存储并提供定位块的服务(被用户或名称节点调用时),并且定时的向名称节点发送它们存储的块的列表。
  • 没有名称节点,文件系统将无法使用。事实上,如果运行名称节点的机器被毁坏,文件系统上所有的文件都会丢失,因为我们不知道如何通过数据节点上的块来重建文件。因此,名称节点能够经受故障是非常重要的,hadoop提供了两种机制来确保这一点。
  • 第一种机制就是复制那些组成文件系统元数据持久状态的文件。hadoop可以通过配置使名称节点在多个文件系统上写入持久化状态。这些写操作是具同步性和原子性的。一般的配置选择是,在本地磁盘上写入的同时,写入一个远程NFS(网络文件系统)挂载(mount)。
  • 另一种可行的方法是运行一个二级名称节点,虽然它不能作为名称节点使用。这个二级名称节点的重要作用就是定期的通过编辑日志合并命名空间镜像,以防止编辑日志过大。这个二级名称节点一般在其他单独的物理计算机上运行,因为它也需要占用大量CPU和内存来执行合并操作。它会保存合并后的命名空间镜像的副本,在名称节点失效后就可以使用。但是,二级名称节点的状态是比主节点滞后的,所以主节点的数据若全部丢失,损失仍然在所难免。这种情况下,一般把存在NFS(Network File System,网络文件系统)上的主名称节点元数据复制到二级名称节点上并将其作为新的主名称节点。

 有关二级名称节点更多的介绍:https://blog.csdn.net/x_i_y_u_e/article/details/52430932

数据流

  •  文件读取剖析,为了了解客户端及与之交互的HDFS、名称节点和数据节点之间的数据流是怎样的,如图

 技术分享图片

  • 客户端通过调用FileSystem对象的open()来读取希望打开的文件,对于HDFS来说,这个对象是分布式文件系统的一个实例。DistributedFilesystem通过使用RPC来调用名称节点,以确定文件开头部分的块的位置(步骤2)。对于每一个块,名称节点返回具有该块副本的数据节点地址。此外,这些数据节点根据他们与客户端的距离来排序(根据网络集群的拓扑)。如果该客户端本身就是一个数据节点,便从本地数据节点上读取,Distributed Filesystem返回一个FSData InputStream对象(一个支持文件定位的输入流)给客户端读取数据。FSData InputStream转而包装了一个DFSInputStream对象。
  • 接着,客户端对这个输入流调用read()(步骤3)。存储着文件开头部分的块的数据节点地址的DFSInptStream随即与这些块最近的数据节点相连接。通过在数据流中重复调用read(),数据会从数据节点返回客户端(步骤4)。到达块的末端时,DFSInputStream会关闭与数据节点的联系,然后为下一个块找到最佳的数据节点(步骤5)。客户端只需要读取一个连续的流,这些对于客户端来说是透明的。
  • 客户端从流中读取数据时,块是按照DFSInputStream打开与数据节点的新连接的顺序读取的。它也会调用名称节点来检索下一组需要的块的数据节点的位置。一旦客户端完成读取,就对文件系统数据输入流调用close()(步骤6)

       在读取的时候,如果客户端在与数据节点通信时遇到一个错误,那么它就会去尝试对这个块来说下一个最近的块。它也会记住那个故障的数据节点,以保证不会再对之后的块进行徒劳无益的尝试。客户端也会确认从数据节点发来的数据的校验和。如果发现一个损坏的块,它就会在客户端试图从别的数据节点中读取一个块的副本之前报告给名称节点。

       这个设计的一个重点是,客户端直接联系数据节点去检索数据,并被名称节点指引到每个块中最好的数据节点。因为数据流动在此集群中是在所有数据节点分散进行的,所以这种设计能使HDFS可扩展到最大的并发客户端数量。同时,名称节点只不过是提供块位置请求(存储在内存中,因而非常高效),不是提供数据。否则如果客户端数量增长,名称节点会快速成为一个“瓶颈”。

 

  • 文件写入剖析,此处考虑的是创建一个新的文件,向其写入数据后关闭该文件。

 技术分享图片

  • 客户端通过Distributed Filesystem中调用create()来创建文件(图中步骤1)。DistributedFilesystem一个RPC去调用名称节点,在文件系统的命名空间中创建一个新的文件,没有块与之相联系(步骤2)。名称节点执行各种不同的检查后确保这个文件不会已经存在,并且客户端有可以创建文件的适当的许可。如果这些检查通过,名称节点就会生成一个新文件的记录;否则,文件创建失败并向客户端抛出一个IOException异常。分布式文件系统返回一个文件系统数据输出流,让客户端开始写入数据。就像读取事件一样,文件系统数据输出流控制一个DFSOutputStream,负责处理数据节点和名称节点之间的通信。
  • 在客户端写入数据时(步骤3),DFSOutputStream将它分成一个个的包,写入内部的队列,称为数据队列。数据队列随数据流流动,数据流的责任是根据适合的数据节点的列表来要求这些节点为副本分配新的块。这个数据节点的列表形成一个管线——我们假设这个副本数是3,所以有3个节点在管线中。数据流将包分流给管线中第一个的数据节点,这个节点会存储包并发送给管线中的第二个数据节点。同样地,第二个数据节点存储包并且传给管线中第三个(也是最后一个)数据节点(步骤4)。
  • DFSOutputStream也有一个内部的包队列来等待数据节点收到确认,称为确认队列。一个包只有在被管线中所有节点确认后才会被移出确认队列(步骤5)。
  • 如果在有数据写入期间,数据节点发生故障,则会执行下面的操作,当然这对写入数据的客户端而言,是透明的。首先管线被关闭,确认队列中的任何包都会被添加回数据队列的前面,以确保数据节点从失败的节点处是顺流的,不会漏掉任意一个包。当前的块在正常工作的数据节点中被给予一个新的身份并联系名称节点,以便能在故障数据节点后期恢复时其中的部分数据块会被删除。故障数据节点会从管线中删除并且余下块的数据会被写入管线中的两个好的数据节点。名称节点注意到块副本不足时,会在另一个节点上安排创建一个副本,随后,后续的块会继续正常处理。
  • 在一个块被写入期间多个数据节点发生故障的可能性虽然有但很少见。只要dfs.replication.min的副本(默认为1)被写入,写操作就是成功的,并且这个块会在集群中被异步复制,直到满足其目标副本数(dfs.replication的默认设置为3)。
  • 客户端完成数据的写入后,就会在流中调用close()(步骤6)。向名称节点发送完信息之前,此方法会将余下的所有包放入数据节点管线并等待确认(步骤7)。名称节点已经知道文件由哪些块组成(通过Data stream询问块分配),所以它只需在返回成功前等待块进行最小量的复制。

 

副本的放置

       名称节点如何选择哪个数据节点来保存副本?我们需要在可靠性与写入带宽和读取带宽之间进行权衡。例如,因为副本管线都在单独一个节点上运行,所以把所有副本都放在一个节点基本上不会损失写入带宽,但这并没有实现真的冗余(如果节点发生故障,那么该块中的数据会丢失)。同样,离架读取的带宽是很高的,另一个极端,把副本放在不同的数据中心会最大限度地增大冗余,但会以带宽为代价。即使在相同的数据中心(所有的Hadoop集群到目前为止都运行在同一个数据中心),也有许多不同的放置策略。其实,Hadoop在发布的0.17.0版本中改变了放置策略来帮助保护块在集群间有相对平均的分布。

  Hadoop的策略是在与客户端相同的节点上放置第一个副本(若客户端运行在集群之外,就可以随机选择节点,不过系统会避免挑选那些太满或太忙的节点)。第二个副本被放置在第一个不同的随机选择的机架上(离架)。第三个副本被放置在与第二个相同的机架上,但放在不同的节点。更多的副本被放置在集群中的随机节点上,不过系统会尽量避免在相同的机架上放置太多的副本。

  一旦选定副本放置的位置,就会生成一个管线,会考虑到网络拓扑。副本数为3的管道看起来如图

技术分享图片

  总的来说,这样的方法在稳定性(块存储在两个机架中)、写入宽带(写入操作只需要做一个单一网络转换)、读取性能(选择从两个机架中进行读取)和集群中块的分布(客户端只在本地机架写入一个块)之间,进行较好的平衡。

 

MapReduce的工作原理

技术分享图片

  • 整个过程如图,在最上层,有4个独立的实体:
  1. 客户端,提交MapReduce作业。
  2. jobtracker,协调作业的运行。jobtracker是一个Java应用程序,它的主类是JobTracker。
  3. tasktracker,运行作业划分后的任务。tasktracker是一个java应用程序,它的主类是TaskTracker。
  4. 分布式文件系统(一般为HDFS),用来在其他实体间共享作业文件。

  提交作业

  JobClient的runJob()方法用于新建JobClient实例和调用其submitJob()方法的简便方法(如图步骤1)。提交作业后,runJob()将每秒轮询作业的进度,如果发现与上一个记录不同,便把报告显示到控制台。作业完成后,如果成功,就显示作业计数器。否则,导致作业失败的错误指令会被记录到控制台。

  JobClient的submitJob()方法所实现的作业提交过程如下:

  1. 向jobtracker请求一个新的作业ID(通过调用JobTracker的getNewJobId())(步骤2)
  2. 检查作业的输出说明。比如,如果没有指定输出目录或者它已经存在,作业就不会被提交,并有错误返回给MapReduce程序。
  3. 计算作业的输入部分。如果划分无法计算,比如因为输入路径不存在,作业就不会被提交,并有错误返回给MapReduce程序。
  4. 将运行作业所需要的资源——包括作业JAR文件、配置文件和计算所得的输入划分——复制到一个以作业ID号命名的目录中jobtracker的文件系统。作业JAR的副本较多(由mapred.submit.replication属性控制,默认为10),如此一来,在tasktracker运行作业任务时,集群能为它们提供许多副本进行访问(步骤3)
  5. 告诉jobtracker作业准备执行(通过调用JobTracker的submitJob()方法)(步骤4)

  作业的初始化

  • JobTracker接收到对其submitJob()方法的调用后,会把此调用放入一个内部的队列中,交由作业调度器进行调度,并对其进行初始化。初始化包括创建一个代表该正在运行的作业的对象,它封装任务和记录信息,以便跟踪任务的状态和进程(步骤5)
  • 要创建运行任务列表,作业调度器首先从共享文件系统中获取JobClient已计算好的输入划分信息(步骤6)。然后为每个划分创建一个map任务。创建的reduce任务的数量由JobConf的mapred.reduce.tasks属性决定,它是用setNumReduceTasks()方法来设置的,然后调度器便创建这么多reduce任务来运行。任务在此时指定ID号。

  任务的分配

  TaskTracker执行一个简单的循环,定期发送心跳(heartbeat)方法调用JobTracker。心跳方法告诉jobtracker,tasktracker是否还存活,同时也充当两者之间的消息通道。作为心跳方法调用的一部分,tasktracker会指明它是否已经准备运行新的任务,如果是,jobtracker会为它分配一个任务,并使用心跳方法的返回值与tasktracker进行通信(步骤7)。

  在jobtracker为tasktracker选择任务之前,jobtracker先选定任务所在的作业,但是默认的方法是简单维护一个作业优先级列表。选择好作业后,jobtracker就可以为该作业选定一个任务。

  针对map任务和reduce任务,tasktracker有固定数量的槽。例如,一个tasktracker可能可以同时运行两个map任务和reduce任务。(准确数量由tasktracker核的数量和内存大小来决定)。默认调度器在处理reduce任务槽之前,会填满空闲的map任务槽,因此,如果tasktracker至少有一个空闲的map任务槽,jobtracker会为它选择一个map任务,否则选择一个reduce任务。

  要选择一个reduce任务,jobtracker只是简单地从尚未运行的reduce任务列表中选取下一个来执行,并没有考虑数据的本地化,然而,对于一个map任务,它考虑的是tasktracker的网络位置和选取一个距离其输入划分文件最近的tasktracker。在最理想的情况下,任务是data-local(数据本地化)的,与分割文件所在节点运行在相同的节点上。同样,任务也可能是rack-local(机架本地化的):和分割文件在同一个机架,但不在同一个节点。一些任务既不是数据本地化的,也不是机架本地化,从与他们自身运行的不同机架上检索数据。可以通过查看作业的计数器得知每种类型任务的比例。

  任务的执行

  现在,tasktracker已经被分配了任务,下一步是运行任务。首先,它本地化作业的JAR文件,将它从共享文件系统复制到tasktracker所在的文件系统。同时,将应用程序所需要的全部文件从分布式缓存复制到本地磁盘(步骤8)。然后,为任务新建一个本地工作目录,并把JAR文件中的内容解压到这个文件夹下。第三部,新建一个TaskRunner实例来运行任务。

  TaskRunner启动一个新的Java虚拟机(步骤9)来运行每个任务(步骤10),使得用户定义的map和reduce函数的任何缺陷都不会影响tasktracker(比如导致它奔溃或者挂起)。但在不同的任务之间重用JVM还是可能的。

  子进程通过umbilical接口与父进程进行通信。它每隔几秒便告知父进程它的进度,直到任务完成。

  流和管道

  流和管道都运行特殊的map和reduce任务,目的是运行用户提供的可执行程序,并于之通信。

  应用流时,流任务使用标准输入和输出流,与进程(可以用任何语言编写)进行通信。另一方面,管道任务则监听套接字,发送其环境中的一个端口号给C++进程,如此一来,在开始时,C++进程即可建立一个与其父java管道任务的持续连接套接字。

  在这两种情况下,Java进程都会把输入键/值对传给外部的进程,后者通过用户定义的map或者reduce函数来执行它并把输出的键/值对传回Java进程。从tasktracker的角度看,就像tasktracker的子进程在运行自己的map或者reduce代码。

技术分享图片

  进度和状态的更新 

   MapReduce作业是一个长时间运行批量作业,可以运行数秒甚至数小时。由于这是一个很长的时间段,所以对于用户而言,能够得知作业进展是很重要的。一个作业和它的每个任务都有一个状态,包括:作业或者任务的状态(比如,运行成功,失败);map和reduce的进度;作业计数器的值;状态消息或描述(可以由用户代码来设置)。这些状态信息在作业期间不断被改变,那么它们是如何与客户端通信的呢?

  任务正在运行时,对任务进度(即任务完成率)保持追踪。对于map任务,这是已处理完输入的百分比。对于reduce任务,则稍微有点复杂,但程序仍然会估计reduce输入已处理的百分比。整个过程分成三部分,与shuffle的三个阶段相对应。比如,如果任务已经执行reducer一半的输入,那么任务的进度便是5/6。因为已经完成拷贝和排序阶段(每个占1/3),并且已经完成reduce阶段的一半(1/6)。

  任务也有一组计数器可以对任务运行过程中各个事件进行计数,这些计数器要么内置于框架中,例如已写入的map输出记录数,要么由用户自己定义。

  如果任务报告了进度,便会设置一个标志以表明状态变化将被发送到tasktracker。在另一个线程中,每隔三秒检查此标志一次,如果已设置,则告知tasktracker当前任务状态。同时,tasktracker每隔5秒发送心跳到jobtracker(5秒间隔是最小值,因为心跳间隔是由集群的大小来决定的:对于一个更大的集群,间隔会更长一些),并且在此调用(指心跳调用)中,所有由tasktracker运行的任务,它们的状态都会被发送至jobtracker。计数器的发送间隔通常大于5秒,因为计数器占的带宽相对较高。

  jobtracker将这些更新合并起来,产生一个全局试图,表明正在运行的所有作业及其所含任务的状态。最后,正如前面提到的,JobClient通过每秒查看jobtracker来接收最新状态。客户端也可以使用JobClient的getJob()方法来得到一个RunningJob的实例,后者包含作业的所有状态信息。

  作业的完成

  jobtracker收到作业最后一个任务已完成的通知后,便把作业的状态设置为“成功”。然后,在JobClient查询状态时,它将得知任务已成功完成,所有便显示一条消息告知用户,然后从runJob()返回。状态更新在MapReduce系统中的传播如图。

技术分享图片

  如果jobtracker有相应的设置,也会发送一个HTTP作业通知。希望收到回调(指HTTP作业通知的回调)的客户端可以通过job.end.notification.url属性来设置。

  最后,jobtracker清空作业的工作状态,指示tasktracker也清空作业的工作状态(比如删除中间输出)。

 

Shuffle和排序

  MapReduce保证每个reduce输入都已按键排序。系统执行的过程——map输出传到reduce作为后者的输入——即称为shuffle(混洗或称洗牌)。

  map端

  map端开始产生输出结果时,并不是简单地将它写到磁盘。这个过程更复杂,它利用缓冲的方式写到内存,并出于效率的原因预先进行排序。如图

 技术分享图片

 

  每个map任务都有一个环形内存缓存区,任务会把输出写到此。默认情况下,缓冲区的大小为100MB,此值可以通过io.sort.mb属性来修改。当缓冲内容达到指定大小时(io.sort.spill.percent,默认为0.80,80%),一个后台线程便开始把内容溢写(spill)到磁盘中。在线程工作的同时,map输出继续被写到缓冲区,但如果在此期间缓冲区被填满,map会阻塞直到溢写过程结束。

  溢写将按轮询的方式写到mapred.local.dir属性指定的目录,在一个作业相关子目录中。在写到磁盘之前,线程首先根据数据最终被传送到的reducer,将数据划分成相应的分区。在每个分区中,后台线程按键进行内排序(in-memory sort)。此时如果有一个combiner,它将基于排序后输出运行。

  一旦内存缓冲区达到溢写阈值,就会新建一个溢写文件,因此在map任务写入其最后一个输出记录之后,会有若干个溢写文件。在任务完成之前,溢写文件被合并成一个已分区且已排序的输出文件。配置属性io.sort.factor控制着一次最多能合并多少流,默认值是10。

  如果已经指定combiner,并且溢写次数至少为3(min.num.spills.for.combiner属性的值)时,combiner就会输出文件被写之前执行。combiner会针对输入反复运行,但不会影响最终结果。运行combiner的意义在于使map输出更紧凑,从而只有较少数据被写到本地磁盘后传给reducer。

  map输出被写到磁盘时,对它进行压缩往往是个很好的主意,因为这样会让写入磁盘的速度更快,节约磁盘空间和减少传给reducer的数据量。默认情况下,输出是不压缩的,但是只要将mapred.compress.map.output设置为true,就可以弃用此功能。使用的压缩库由mapred.map.output.compression.codec定义。

  reducer通过HTTP得到输出文件的分区。用于服务于文件分区的工作线程,其数量由任务的tracker.http.threads属性来控制。此设置针对的是每个tasktracker,而不是针对每个map任务槽。默认是40,在运行大规模作业的大型集群上,此值可以根据需要而增加。

  reduce端 

   转到处理过程的reduce这一端。map输出文件位于运行map任务的tasktrack的本地磁盘(注意,尽管map输出经常写到map tasktracker的本地磁盘,但reduce输出并不这样),不过在现在,tasktracker需要它为分区文件运行reduce任务。而且,reduce任务可以在不同时间完成,因此只要一个任务结束,reduce任务就开始复制其输出。这就是reduce任务的复制阶段。reduce任务有少量复制线程,因此能够并行地取得map输出。默认是5个线程,但这个默认值可以通过设置mapred.reduce.parallel.copies属性来改变。

  如果map输出相当小,则会被复制到reduce tasktracker的内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,它说明用作此用途的堆空间的百分比);否则,被复制到磁盘。内存缓冲区达到阈值大小(由mapred.job.shuffle.merge.percent决定)或达到map输出阈值(由mapred.inmem.merge.threshold控制)时,会被合并,进而被溢写到磁盘中。

  随着磁盘上积累的副本增多,后台线程会将它们合并为一个更大的、排好序的文件。这会为后面的合并节省一些时间。注意,任何压缩的map输出(通过map任务)都必须在内存中被解压缩,以便于合并。

  所有map输出被复制期间,reduce任务进入排序阶段(更恰当的说法是合并阶段,因为排序是在map端进行的),这个阶段将合并map输出,维持其按顺序排序。这将循环进行。比如,如果有50个map输出,而合并系数是10(10是默认设置,右io.sort.factor属性设置,与map的合并类似),合并将进行5轮。每轮将10个文件合并成一个文件,因此最后有5个中间文件。

  最后阶段,即reduce阶段,合并直接把数据输入reduce函数,而不是最后还有一轮将5个文件合并成一个已排序的文件。此举省略了一次磁盘往返行程。最后的合并既可来自内存中,也可来自磁盘。

  在reduce阶段,对已排序输出中的每个键依次调用reduce函数。此阶段的输出直接写到输出文件系统,一般为hdfs。如果采用hdfs,由于tasktracker节点也运行数据节点,所有第一个块副本会被写到本地磁盘。

Hadoop小结

标签:work   sel   hdf   pts   简单   阻塞   http   cte   NPU   

原文地址:https://www.cnblogs.com/Mayny/p/9368103.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!