码迷,mamicode.com
首页 > 移动开发 > 详细

【Flume】从入口Application来分析flume的source和sink是如何与channel交互的

时间:2015-01-21 18:22:15      阅读:330      评论:0      收藏:0      [点我收藏+]

标签:flume   源码   

大家在启动flume的时候,输入的命令就可以看出flume的启动入口了

[root@com21 apache-flume-1.5.2-bin]# sh bin/flume-ng agent -c conf -f conf/server.conf -n a1
Info: Sourcing environment configuration script /home/flume/apache-flume-1.5.2-bin/conf/flume-env.sh
+ exec /home/flume/jdk1.7.0_71/bin/java -server -Xms2048m -Xmx2048m -Xss256K -XX:PermSize=32M -XX:MaxPermSize=512M -XX:+UseConcMarkSweepGC -XX:+DisableExplicitGC -XX:+UseParNewGC -XX:+CMSClassUnloadingEnabled -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+UseCompressedOops -XX:CMSInitiatingOccupancyFraction=70 -XX:+HeapDumpOnOutOfMemoryError -XX:SurvivorRatio=8 -cp '/home/flume/apache-flume-1.5.2-bin/conf:/home/flume/apache-flume-1.5.2-bin/lib/*' -Djava.library.path= org.apache.flume.node.Application -f conf/server.conf -n a1

从这里可以看出flume的启动入口是:org.apache.flume.node.Application

下面我们就来看该入口程序是如何来运行的

找到main函数

附:flume每次启动都会先判断有没有与当前配置的三大组件同名的组件存在,存在的话先停掉该组件,顺序为source,sink,channel

其次是启动所有当前配置的组件,启动顺序为channel,sink,source

通过这个启动停止的顺序可以看出flume也是对数据一致性做了保证的。

if(reload) {
        EventBus eventBus = new EventBus(agentName + "-event-bus");
        PollingPropertiesFileConfigurationProvider configurationProvider =
            new PollingPropertiesFileConfigurationProvider(agentName,
                configurationFile, eventBus, 30);
        components.add(configurationProvider);
        application = new Application(components);
        eventBus.register(application);
      } else {
        PropertiesFileConfigurationProvider configurationProvider =
            new PropertiesFileConfigurationProvider(agentName,
                configurationFile);
        application = new Application();
        application.handleConfigurationEvent(configurationProvider.getConfiguration());
      }
这个if的作用就是是否30秒读一下配置,判断是否有更新

主要看一下对于配置内容的处理,两个分支虽然从代码上看逻辑不一样,但是处理的逻辑是一样的

我们看else分支的代码吧:

看configurationProvider.getConfiguration()

public MaterializedConfiguration getConfiguration() {
    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
    FlumeConfiguration fconfig = getFlumeConfiguration();
    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
    if (agentConf != null) {
      Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
      Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
      Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
      try {
        loadChannels(agentConf, channelComponentMap);
        loadSources(agentConf, channelComponentMap, sourceRunnerMap);
        loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
        Set<String> channelNames =
            new HashSet<String>(channelComponentMap.keySet());
        for(String channelName : channelNames) {
          ChannelComponent channelComponent = channelComponentMap.
              get(channelName);
          if(channelComponent.components.isEmpty()) {
            LOGGER.warn(String.format("Channel %s has no components connected" +
                " and has been removed.", channelName));
            channelComponentMap.remove(channelName);
            Map<String, Channel> nameChannelMap = channelCache.
                get(channelComponent.channel.getClass());
            if(nameChannelMap != null) {
              nameChannelMap.remove(channelName);
            }
          } else {
            LOGGER.info(String.format("Channel %s connected to %s",
                channelName, channelComponent.components.toString()));
            conf.addChannel(channelName, channelComponent.channel);
          }
        }
        for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
          conf.addSourceRunner(entry.getKey(), entry.getValue());
        }
        for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
          conf.addSinkRunner(entry.getKey(), entry.getValue());
        }
      } catch (InstantiationException ex) {
        LOGGER.error("Failed to instantiate component", ex);
      } finally {
        channelComponentMap.clear();
        sourceRunnerMap.clear();
        sinkRunnerMap.clear();
      }
    } else {
      LOGGER.warn("No configuration found for this host:{}", getAgentName());
    }
    return conf;
  }

我们看在载入source组件的时候有个方法: SourceRunner.forSource(source)

public static SourceRunner forSource(Source source) {
    SourceRunner runner = null;

    if (source instanceof PollableSource) {
      runner = new PollableSourceRunner();
      ((PollableSourceRunner) runner).setSource((PollableSource) source);
    } else if (source instanceof EventDrivenSource) {
      runner = new EventDrivenSourceRunner();
      ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
    } else {
      throw new IllegalArgumentException("No known runner type for source "
          + source);
    }

    return runner;
  }
这个方法里面通过对source的类型判断来选择使用哪种SourceRunner

我们来看一个具体例子吧AvroSource,它是事件驱动类型的source——EventDrivenSourceRunner

  public void start() {
    Source source = getSource();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.initialize();
    source.start();
    lifecycleState = LifecycleState.START;
  }

这个方法MonitorRunnable类会来调的,这个类就是负责监控flume的所有组件的

那么什么时候来调呢?一旦调用这个方法,source与channel的交互就开始了

 switch (supervisoree.status.desiredState) {
              case START:
                try {
                  lifecycleAware.start();
上面的代码出现在LifecycleSupervisor类中的内部静态类MonitorRunnable的run方法中,再来看这个线程类谁来调用?

  MonitorRunnable monitorRunnable = new MonitorRunnable();
    monitorRunnable.lifecycleAware = lifecycleAware;
    monitorRunnable.supervisoree = process;
    monitorRunnable.monitorService = monitorService;

    supervisedProcesses.put(lifecycleAware, process);

    ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
        monitorRunnable, 0, 3, TimeUnit.SECONDS);
    monitorFutures.put(lifecycleAware, future);
在LifecycleSupervisor类中supervise方法

从这里我们终于看到核心中的核心了,也就是每隔3秒,source会和channel交互一次。

 Supervisoree process = new Supervisoree();
    process.status = new Status();

    process.policy = policy;
    process.status.desiredState = desiredState;
    process.status.error = false;
那么上面的代码所在方法又是被谁调用的呢?

是Application

public synchronized void start() {
    for(LifecycleAware component : components) {
      supervisor.supervise(component,
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
    }
  }
这样的话,整个链就串起来了
所以从这里看出来source和channel的交互频率是3秒


看完source和channel的交互,再来看sink和channel的交互

到这里再看sink就很简单了,因为flume中三大组件都实现自接口LifecycleAware

所以从flume的入口Application来看,从start开始最终都是到LifecycleSupervisor类的supervise方法,而该方法同样:

MonitorRunnable monitorRunnable = new MonitorRunnable();
    monitorRunnable.lifecycleAware = lifecycleAware;
    monitorRunnable.supervisoree = process;
    monitorRunnable.monitorService = monitorService;

    supervisedProcesses.put(lifecycleAware, process);

    ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
        monitorRunnable, 0, 3, TimeUnit.SECONDS);
    monitorFutures.put(lifecycleAware, future);
这串逻辑,不分具体的source,sink,同样是3秒执行一次。


至此,flume中三大组件的交互以及交互频率就说完了,望各位网友不吝指教!!





【Flume】从入口Application来分析flume的source和sink是如何与channel交互的

标签:flume   源码   

原文地址:http://blog.csdn.net/simonchi/article/details/42970373

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!