标签:
在有些情况下,运行于Hadoop集群上的一些mapreduce作业本身的数据量并不是很大,如果此时的任务分片很多,那么为每个map任务或者reduce任务频繁创建Container,势必会增加Hadoop集群的资源消耗,并且因为创建分配Container本身的开销,还会增加这些任务的运行时延。如果能将这些小任务都放入少量的Container中执行,将会解决这些问题。好在Hadoop本身已经提供了这种功能,只需要我们理解其原理,并应用它。
Uber运行模式就是解决此类问题的现成解决方案。本文旨在通过测试手段验证Uber运行模式的效果,在正式的生成环境下,还需要大家具体情况具体对待。
Uber运行模式对小作业进行优化,不会给每个任务分别申请分配Container资源,这些小任务将统一在一个Container中按照先执行map任务后执行reduce任务的顺序串行执行。那么什么样的任务,mapreduce框架会认为它是小任务呢?
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize = 30 /wordcount/input /wordcount/output/result1
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize = 30 -D mapreduce.job.ubertask.enable = true /wordcount/input /wordcount/output/result2
| 输出字段 | 描述 | 
| TOTAL_LAUNCHED_UBERTASKS | 启动的Uber任务数 | 
| NUM_UBER_SUBMAPS | Uber任务中的地图任务数 | 
| NUM_UBER_SUBREDUCES | Uber中减少任务数 | 
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize = 20 -D mapreduce.job.ubertask.enable = true /wordcount/input /wordcount/output/result3
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize = 19 -D mapreduce.job.ubertask.enable = true /wordcount/input /wordcount/output/result4
  protected void serviceStart() throws Exception {
    // 省略无关代码
    job = createJob(getConfig(), forcedState, shutDownMessage);
    // 省略无关代码
    if (!errorHappenedShutDown) {
      JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
      jobEventDispatcher.handle(initJobEvent);
      // 省略无关代码
      if (job.isUber()) {
        speculatorEventDispatcher.disableSpeculation();
      } else {
        dispatcher.getEventHandler().handle(
            new SpeculatorEvent(job.getID(), clock.getTime()));
      }
    }  protected Job createJob(Configuration conf, JobStateInternal forcedState, 
      String diagnostic) {
    // create single job
    Job newJob =
        new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
            taskAttemptListener, jobTokenSecretManager, jobCredentials, clock,
            completedTasksFromPreviousRun, metrics,
            committer, newApiCommitter,
            currentUser.getUserName(), appSubmitTime, amInfos, context, 
            forcedState, diagnostic);
    ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
    dispatcher.register(JobFinishEvent.Type.class,
        createJobFinishEventHandler());     
    return newJob;
  }  private class JobEventDispatcher implements EventHandler<JobEvent> {
    @SuppressWarnings("unchecked")
    @Override
    public void handle(JobEvent event) {
      ((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event);
    }
  }    @Override
    public Job getJob(JobId jobID) {
      return jobs.get(jobID);
    }  public void handle(JobEvent event) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Processing " + event.getJobId() + " of type "
          + event.getType());
    }
    try {
      writeLock.lock();
      JobStateInternal oldState = getInternalState();
      try {
         getStateMachine().doTransition(event.getType(), event);
      } catch (InvalidStateTransitonException e) {
        LOG.error("Can‘t handle this event at current state", e);
        addDiagnostic("Invalid event " + event.getType() + 
            " on Job " + this.jobId);
        eventHandler.handle(new JobEvent(this.jobId,
            JobEventType.INTERNAL_ERROR));
      }
      //notify the eventhandler of state change
      if (oldState != getInternalState()) {
        LOG.info(jobId + "Job Transitioned from " + oldState + " to "
                 + getInternalState());
        rememberLastNonFinalState(oldState);
      }
    }
    
    finally {
      writeLock.unlock();
    }
  }  @Private
  public JobStateInternal getInternalState() {
    readLock.lock();
    try {
      if(forcedState != null) {
        return forcedState;
      }
     return getStateMachine().getCurrentState();
    } finally {
      readLock.unlock();
    }
  }    @Override
    public JobStateInternal transition(JobImpl job, JobEvent event) {
        // 省略无关代码
        job.makeUberDecision(inputLength);
        
        // 省略无关代码
    }  private void makeUberDecision(long dataInputLength) {
    //FIXME:  need new memory criterion for uber-decision (oops, too late here;
    // until AM-resizing supported,
    // must depend on job client to pass fat-slot needs)
    // these are no longer "system" settings, necessarily; user may override
    int sysMaxMaps = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
    int sysMaxReduces = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
    long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
        fs.getDefaultBlockSize(this.remoteJobSubmitDir)); // FIXME: this is wrong; get FS from
                                   // [File?]InputFormat and default block size
                                   // from that
    long sysMemSizeForUberSlot =
        conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
            MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
    long sysCPUSizeForUberSlot =
        conf.getInt(MRJobConfig.MR_AM_CPU_VCORES,
            MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);
    boolean uberEnabled =
        conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
    boolean smallNumMapTasks = (numMapTasks <= sysMaxMaps);
    boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces);
    boolean smallInput = (dataInputLength <= sysMaxBytes);
    // ignoring overhead due to UberAM and statics as negligible here:
    long requiredMapMB = conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0);
    long requiredReduceMB = conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0);
    long requiredMB = Math.max(requiredMapMB, requiredReduceMB);
    int requiredMapCores = conf.getInt(
            MRJobConfig.MAP_CPU_VCORES, 
            MRJobConfig.DEFAULT_MAP_CPU_VCORES);
    int requiredReduceCores = conf.getInt(
            MRJobConfig.REDUCE_CPU_VCORES, 
            MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
    int requiredCores = Math.max(requiredMapCores, requiredReduceCores);    
    if (numReduceTasks == 0) {
      requiredMB = requiredMapMB;
      requiredCores = requiredMapCores;
    }
    boolean smallMemory =
        (requiredMB <= sysMemSizeForUberSlot)
        || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT);
    
    boolean smallCpu = requiredCores <= sysCPUSizeForUberSlot;
    boolean notChainJob = !isChainJob(conf);
    // User has overall veto power over uberization, or user can modify
    // limits (overriding system settings and potentially shooting
    // themselves in the head).  Note that ChainMapper/Reducer are
    // fundamentally incompatible with MR-1220; they employ a blocking
    // queue between the maps/reduces and thus require parallel execution,
    // while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks
    // and thus requires sequential execution.
    isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
        && smallInput && smallMemory && smallCpu 
        && notChainJob;
    if (isUber) {
      LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"
          + numReduceTasks + "r tasks (" + dataInputLength
          + " input bytes) will run sequentially on single node.");
      // make sure reduces are scheduled only after all map are completed
      conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
                        1.0f);
      // uber-subtask attempts all get launched on same node; if one fails,
      // probably should retry elsewhere, i.e., move entire uber-AM:  ergo,
      // limit attempts to 1 (or at most 2?  probably not...)
      conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
      conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
      // disable speculation
      conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
      conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
    } else {
      StringBuilder msg = new StringBuilder();
      msg.append("Not uberizing ").append(jobId).append(" because:");
      if (!uberEnabled)
        msg.append(" not enabled;");
      if (!smallNumMapTasks)
        msg.append(" too many maps;");
      if (!smallNumReduceTasks)
        msg.append(" too many reduces;");
      if (!smallInput)
        msg.append(" too much input;");
      if (!smallCpu)
        msg.append(" too much CPU;");
      if (!smallMemory)
        msg.append(" too much RAM;");
      if (!notChainJob)
        msg.append(" chainjob;");
      LOG.info(msg.toString());
    }
  }后记:个人总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。

京东:http://item.jd.com/11846120.html
当当:http://product.dangdang.com/23838168.html
Hadoop2.6.0运行mapreduce之Uber模式验证
标签:
原文地址:http://blog.csdn.net/beliefer/article/details/51160494