码迷,mamicode.com
首页 > Web开发 > 详细

flume interceptors flume拦截器

时间:2021-03-09 13:52:58      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:image   type   ima   目录   cap   span   代码   min   letter   

flume用户自定义拦截器.创建flume-demo的maven项目.

创建项目文件POM.xml.

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.7.0</version>
</dependency>
package com.kpwong.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;
import java.util.Map;

public class CustomInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    //单个事件拦截
    @Override
    public Event intercept(Event event) {

        Map<String, String> headers = event.getHeaders();
        String body = new String( event.getBody());

        if (body.contains("hello")){
            headers.put("topic","letter");
        }
        else
        {
            headers.put("topic","number");
        }

        return event;
    }

    //多个事件拦截
    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new CustomInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

打包项目jar文件。拷贝文件到/flume/lib目录下

技术图片

 

 

 配置conf文件.准备三台机器(hadoop202,hadoop203,hadoop204)

在hadoop202上。配置flume2.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1 k2
a2.channels = c1 c2

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 44444

#channel interceptors
a2.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1.type =com.kpwong.flume.interceptor.CustomInterceptor$Builder
a2.sources.r1.selector.type = multiplexing
a2.sources.r1.selector.header = topic
a2.sources.r1.selector.mapping.letter = c1
a2.sources.r1.selector.mapping.number = c2

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop203
a2.sinks.k1.port = 4141

a2.sinks.k2.type=avro
a2.sinks.k2.hostname = hadoop204
a2.sinks.k2.port = 4142

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1 c2
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c2

 拦截器配置代码:

a2.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1.type =com.kpwong.flume.interceptor.CustomInterceptor$Builder
a2.sources.r1.selector.type = multiplexing
a2.sources.r1.selector.header = topic
a2.sources.r1.selector.mapping.letter = c1
a2.sources.r1.selector.mapping.number = c2
hadoop203上配置flume3.conf
a3.sources = r1
a3.sinks = k1
a3.channels = c1
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop203
a3.sources.r1.port = 4141
a3.sinks.k1.type = logger
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
a3.sinks.k1.channel = c1
a3.sources.r1.channels = c1
hadoop204上配置:
a4.sources = r1
a4.sinks = k1
a4.channels = c1
a4.sources.r1.type = avro
a4.sources.r1.bind = hadoop204
a4.sources.r1.port = 4142
a4.sinks.k1.type = logger
a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100
a4.sinks.k1.channel = c1
a4.sources.r1.channels = c1

在hadoop204上运行:

bin/flume-ng agent -c conf/ -f job/interceptor/flume4.conf  -n a4 -Dflume.root.logger=INFO,console

在hadoop203上运行:

bin/flume-ng agent -c conf/ -f job/interceptor/flume3.conf -n a3 -Dflume.root.logger=INFO,console

在hadoop202上运行:

bin/flume-ng agent -c conf/ -f job/interceptor/flume2.conf -n a2

 nc localhost 44444

实验结果:

技术图片

 

flume interceptors flume拦截器

标签:image   type   ima   目录   cap   span   代码   min   letter   

原文地址:https://www.cnblogs.com/kpwong/p/14504079.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!