标签:io os java ar 数据 sp on c ad
总结:
1.kafka 中可以分步不同的组,消息可以被不同组里面的消费者多次消费
2. 观察zookeeper中kafka中的信息:
[zk: air00:2181(CONNECTED) 8] ls /
[consumers, config, controller, admin, brokers, zookeeper, controller_epoch]
[zk: air00:2181(CONNECTED) 9] ls /consumers
[test01, test02]
[zk: air00:2181(CONNECTED) 10] ls /consumers/test01
[offsets, owners, ids]
[zk: air00:2181(CONNECTED) 11] ls /consumers/test01/offsets
[test]
[zk: air00:2181(CONNECTED) 12] ls /consumers/test01/offsets/test
[1, 0]
[zk: air00:2181(CONNECTED) 13]
3. 新来的消费者,不能获取老的数据
可以看出消费者的信息存在于zookeeper中的节点里面
生产者:
package com.kafka.test;
import java.util.*;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.javaapi.producer.Producer;
public class Producer01 {
public static void main(String[] args) {
String topic="test";
Properties props = new Properties(); //9092
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "air00:9092");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
producer.send(new KeyedMessage<String, String>(topic, "test" ));
producer.close();
}
}
消费者:
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class Consumer01 {
static String groupId="test01";
static String topic="test";
public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect","air00:2181,air01:2181,air02:2181");
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
kafka.javaapi.consumer.ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
new ConsumerConfig(props));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while(it.hasNext())
System.out.println(new String(it.next().message()));
}
}标签:io os java ar 数据 sp on c ad
原文地址:http://my.oschina.net/u/1388024/blog/318517