码迷,mamicode.com
首页 > 其他好文 > 详细

Kafka的Producer和Consumer的示例

时间:2016-08-01 15:43:58      阅读:128      评论:0      收藏:0      [点我收藏+]

标签:

我使用的kafka版本是:0.7.2

jdk版本是:1.6.0_20

http://kafka.apache.org/07/quickstart.html官方给的示例并不是很完整,以下代码是经过我补充的并且编译后能运行的。

分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm

Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm

Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm

Kafka使用入门教程 http://www.linuxidc.com/Linux/2014-07/104470.htm

Producer Code

import java.util.*;
import kafka.message.Message;
import kafka.producer.ProducerConfig;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;

public class ProducerSample {


 public static void main(String[] args) {
  ProducerSample ps = new ProducerSample();

  Properties props = new Properties();
  props.put("zk.connect", "127.0.0.1:2181");
  props.put("serializer.class", "kafka.serializer.StringEncoder");

  ProducerConfig config = new ProducerConfig(props);
  Producer<String, String> producer = new Producer<String, String>(config);
  ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-message2");
  producer.send(data);
  producer.close();
 }
}

Consumer Code

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;

public class ConsumerSample {

 public static void main(String[] args) {
                // specify some consumer properties
  Properties props = new Properties();
  props.put("zk.connect", "localhost:2181");
  props.put("zk.connectiontimeout.ms", "1000000");
  props.put("groupid", "test_group");

                // Create the connection to the cluster
  ConsumerConfig consumerConfig = new ConsumerConfig(props);
  ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

                // create 4 partitions of the stream for topic “test-topic”, to allow 4 threads to consume
  HashMap<String, Integer> map = new HashMap<String, Integer>();
  map.put("test-topic", 4);
  Map<String, List<KafkaStream<Message>>> topicMessageStreams =
    consumerConnector.createMessageStreams(map);
  List<KafkaStream<Message>> streams = topicMessageStreams.get("test-topic");

                // create list of 4 threads to consume from each of the partitions 
  ExecutorService executor = Executors.newFixedThreadPool(4);

                // consume the messages in the threads
  for (final KafkaStream<Message> stream : streams) {
   executor.submit(new Runnable() {
    public void run() {
     for (MessageAndMetadata msgAndMetadata : stream) {
      // process message (msgAndMetadata.message())
      System.out.println("topic: " + msgAndMetadata.topic());
      Message message = (Message) msgAndMetadata.message();
      ByteBuffer buffer = message.payload();
     <SPAN style="WHITE-SPACE: pre"> </SPAN>byte[] bytes = new byte[message.payloadSize()];
      buffer.get(bytes);
      String tmp = new String(bytes);
      System.out.println("message content: " + tmp);
     }
    }
   });
  }

 }
}

 

分别启动zookeeper,kafka server之后,依次运行Producer,Consumer的代码

运行ProducerSample:

技术分享

运行ConsumerSample:

技术分享

Kafka的Producer和Consumer的示例

标签:

原文地址:http://www.cnblogs.com/nima/p/5725726.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!