码迷,mamicode.com
首页 > Web开发 > 详细

【原创】大数据基础之Flume(2)Sink代码解析

时间:2019-03-21 12:04:24      阅读:272      评论:0      收藏:0      [点我收藏+]

标签:ssi   接口   his   turn   int   failure   div   abs   bounds   

flume sink核心类结构

1 核心接口Sink

org.apache.flume.Sink

  /**
   * <p>Requests the sink to attempt to consume data from attached channel</p>
   * <p><strong>Note</strong>: This method should be consuming from the channel
   * within the bounds of a Transaction. On successful delivery, the transaction
   * should be committed, and on failure it should be rolled back.
   * @return READY if 1 or more Events were successfully delivered, BACKOFF if
   * no data could be retrieved from the channel feeding this sink
   * @throws EventDeliveryException In case of any kind of failure to
   * deliver data to the next hop destination.
   */
  public Status process() throws EventDeliveryException;

  public static enum Status {
    READY, BACKOFF
  }

process为核心接口,返回值为状态,只有两个:ready和backoff,调用方会根据返回值做相应处理,后边会看到;
这个接口也是扩展flume sink需要实现的接口,比如KuduSink;

 

2 Sink封装

org.apache.flume.SinkProcessor

/**
 * <p>
 * Interface for a device that allows abstraction of the behavior of multiple
 * sinks, always assigned to a SinkRunner
 * </p>
 * <p>
 * A sink processors {@link SinkProcessor#process()} method will only be
 * accessed by a single runner thread. However configuration methods
 * such as {@link Configurable#configure} may be concurrently accessed.
 *
 * @see org.apache.flume.Sink
 * @see org.apache.flume.SinkRunner
 * @see org.apache.flume.sink.SinkGroup
 */
public interface SinkProcessor extends LifecycleAware, Configurable {
  /**
   * <p>Handle a request to poll the owned sinks.</p>
   *
   * <p>The processor is expected to call {@linkplain Sink#process()} on
   *  whatever sink(s) appropriate, handling failures as appropriate and
   *  throwing {@link EventDeliveryException} when there is a failure to
   *  deliver any events according to the delivery policy defined by the
   *  sink processor implementation. See specific implementations of this
   *  interface for delivery behavior and policies.</p>
   *
   * @return Returns {@code READY} if events were successfully consumed,
   * or {@code BACKOFF} if no events were available in the channel to consume.
   * @throws EventDeliveryException if the behavior guaranteed by the processor
   * couldn‘t be carried out.
   */
  Status process() throws EventDeliveryException;

这个类负责封装单个sink或者sink group的处理,常用的子类有:

1)单个sink

org.apache.flume.sink.DefaultSinkProcessor

  @Override
  public Status process() throws EventDeliveryException {
    return sink.process();
  }

DefaultSinkProcessor的process会直接调用内部sink的process;

2)sink group

org.apache.flume.sink.LoadBalancingSinkProcessor
org.apache.flume.sink.FailoverSinkProcessor.FailedSink

 

3 sink的调用方为SinkRunner

org.apache.flume.SinkRunner

/**
 * <p>
 * A driver for {@linkplain Sink sinks} that polls them, attempting to
 * {@linkplain Sink#process() process} events if any are available in the
 * {@link Channel}.
 * </p>
 *
 * <p>
 * Note that, unlike {@linkplain Source sources}, all sinks are polled.
 * </p>
 *
 * @see org.apache.flume.Sink
 * @see org.apache.flume.SourceRunner
 */
public class SinkRunner implements LifecycleAware {
...
  private static final long backoffSleepIncrement = 1000;
  private static final long maxBackoffSleep = 5000;

org.apache.flume.SinkRunner.PollingRunner

  public static class PollingRunner implements Runnable {

    private SinkProcessor policy;
    private AtomicBoolean shouldStop;
    private CounterGroup counterGroup;

    @Override
    public void run() {
      logger.debug("Polling sink runner starting");

      while (!shouldStop.get()) {
        try {
          if (policy.process().equals(Sink.Status.BACKOFF)) {
            counterGroup.incrementAndGet("runner.backoffs");

            Thread.sleep(Math.min(
                counterGroup.incrementAndGet("runner.backoffs.consecutive")
                * backoffSleepIncrement, maxBackoffSleep));
          } else {
            counterGroup.set("runner.backoffs.consecutive", 0L);
          }
        } catch (InterruptedException e) {
          logger.debug("Interrupted while processing an event. Exiting.");
          counterGroup.incrementAndGet("runner.interruptions");
        } catch (Exception e) {
          logger.error("Unable to deliver event. Exception follows.", e);
          if (e instanceof EventDeliveryException) {
            counterGroup.incrementAndGet("runner.deliveryErrors");
          } else {
            counterGroup.incrementAndGet("runner.errors");
          }
          try {
            Thread.sleep(maxBackoffSleep);
          } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
          }
        }
      }
      logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
    }

  }

无论process返回backoff或者抛exception,都会sleep一段时间,所以flume的sink一旦遇到大量异常数据或者自定义sink返回backoff,都会非常慢;

 

【原创】大数据基础之Flume(2)Sink代码解析

标签:ssi   接口   his   turn   int   failure   div   abs   bounds   

原文地址:https://www.cnblogs.com/barneywill/p/10570545.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!