标签:flume
如图所示,新建一个JAVA工程,编辑pom文件,pom文件内容如下【这里取出了parent】:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
<artifactId>flume-ng-kafka-sink</artifactId>
<name>Flume Kafka Sink</name>
<version>1.0.0</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-configuration</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
</dependency>
</dependencies>
</project>这里取出了parent,也取出了rat plugin,这样就避免了编译时出现的常见错误https://issues.apache.org/jira/browse/FLUME-1372自定义sink实现需要继承AbstractSink和实现接口Configurable,并重写部分方法,如下:
package com.cmcc.chiwei.kafka;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Throwables;
public class CmccKafkaSink extends AbstractSink implements Configurable {
private static final Logger log = LoggerFactory
.getLogger(CmccKafkaSink.class);
public static final String KEY_HDR = "key";
public static final String TOPIC_HDR = "topic";
private static final String CHARSET = "UTF-8";
private Properties kafkaProps;
private Producer<String, byte[]> producer;
private String topic;
private int batchSize;// 一次事务的event数量,整体提交
private List<KeyedMessage<String, byte[]>> messageList;
@Override
public Status process() throws EventDeliveryException {
// TODO Auto-generated method stub
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = null;
Event event = null;
String eventTopic = null;
String eventKey = null;
try {
long processedEvent = 0;
transaction = channel.getTransaction();
transaction.begin();// 事务开始
messageList.clear();
for (; processedEvent < batchSize; processedEvent++) {
event = channel.take();// 从channel取出一个事件
if (event == null) {
break;
}
// Event对象有头和体之分
Map<String, String> headers = event.getHeaders();
byte[] eventBody = event.getBody();
if ((eventTopic = headers.get(TOPIC_HDR)) == null) {// 判断event头部中的topic是否为null
eventTopic = topic;
}
eventKey = headers.get(KEY_HDR);
if (log.isDebugEnabled()) {
log.debug("{Event}" + eventTopic + ":" + eventKey + ":"
+ new String(eventBody, CHARSET));
log.debug("event #{}", processedEvent);
}
KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(
eventTopic, eventKey, eventBody);
messageList.add(data);
}
if (processedEvent > 0) {
producer.send(messageList);
}
transaction.commit();// batchSize个事件处理完成,一次事务提交
} catch (Exception e) {
String errorMsg = "Failed to publish events !";
log.error(errorMsg, e);
result = Status.BACKOFF;
if (transaction != null) {
try {
transaction.rollback();
log.debug("transaction rollback success !");
} catch (Exception ex) {
log.error(errorMsg, ex);
throw Throwables.propagate(ex);
}
}
throw new EventDeliveryException(errorMsg, e);
} finally {
if (transaction != null) {
transaction.close();
}
}
return result;
}
@Override
public synchronized void start() {
// TODO Auto-generated method stub
ProducerConfig config = new ProducerConfig(kafkaProps);
producer = new Producer<String, byte[]>(config);
super.start();
}
@Override
public synchronized void stop() {
// TODO Auto-generated method stub
producer.close();
super.stop();
}
@Override
public void configure(Context context) {
// TODO Auto-generated method stub
batchSize = context.getInteger(Constants.BATCH_SIZE,
Constants.DEFAULT_BATCH_SIZE);
messageList = new ArrayList<KeyedMessage<String, byte[]>>(batchSize);
log.debug("Using batch size: {}", batchSize);
topic = context.getString(Constants.TOPIC, Constants.DEFAULT_TOPIC);
if (topic.equals(Constants.DEFAULT_TOPIC)) {
log.warn("The property 'topic' is not set . Using the default topic name ["
+ Constants.DEFAULT_TOPIC + "]");
} else {
log.info("Using the configured topic:[" + topic
+ "] this may be over-ridden by event headers");
}
kafkaProps = KafkaUtil.getKafkaConfig(context);
if (log.isDebugEnabled()) {
log.debug("Kafka producer properties : " + kafkaProps);
}
}
}
然后mvn clean install编译打包jar,将此jar包丢到flume安装目录的lib下就可以了,下面就是编辑conf文件了
当然conf文件中具体属性的key跟你自定义sink中的属性是一致的,自定义中读的key就是你配置文件中的key
如:
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink producer.sinks.r.metadata.broker.list=127.0.0.1:9092 producer.sinks.r.partition.key=0 producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition producer.sinks.r.serializer.class=kafka.serializer.StringEncoder producer.sinks.r.request.required.acks=0 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer.type=async producer.sinks.r.custom.encoding=UTF-8 producer.sinks.r.custom.topic.name=testToptic
【Flume】自定义sink kafka,并编译打包jar,unapproval license的问题解决
标签:flume
原文地址:http://blog.csdn.net/simonchi/article/details/42489885