标签:thread 语法 mat java out pre com serial buffer
既然Kafka使用Scala写的,最近也在慢慢学习Scala的语法,虽然还比较生疏,但是还是想尝试下用Scala实现Producer和Consumer,并且用HashPartitioner实现消息根据key路由到指定的partition。
Producer:
import java.util.Properties
import kafka.producer.ProducerConfig
import kafka.producer.Producer
import kafka.producer.KeyedMessage
object ProducerDemo {
def main(args: Array[String]): Unit = {
val brokers = "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092"
val topic = "ScalaTopic";
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("partitioner.class", classOf[HashPartitioner].getName)
props.put("producer.type", "sync")
props.put("batch.num.messages", "1")
props.put("queue.buffering.max.messages", "1000000")
props.put("queue.enqueue.timeout.ms", "20000000")
val config = new ProducerConfig(props)
val producer = new Producer[String, String](config);
val sleepFlag = false;
val message1 = new KeyedMessage[String, String](topic, "1", "test 0");
producer.send(message1);
if(sleepFlag) Thread.sleep(5000);
val message2 = new KeyedMessage[String, String](topic, "1", "test 1");
producer.send(message2);
if(sleepFlag) Thread.sleep(5000);
val message3 = new KeyedMessage[String, String](topic, "1", "test 2");
producer.send(message3);
if(sleepFlag) Thread.sleep(5000);
val message4 = new KeyedMessage[String, String](topic, "4", "test 3");
producer.send(message4);
if(sleepFlag) Thread.sleep(5000);
val message5 = new KeyedMessage[String, String](topic, "4", "test 4");
producer.send(message5);
if(sleepFlag) Thread.sleep(5000);
val message6 = new KeyedMessage[String, String](topic, "4", "test 4");
producer.send(message6);
if(sleepFlag) Thread.sleep(5000);
}
}
Consumer:
import java.util.Properties
import kafka.consumer.ConsumerConfig
import kafka.consumer.Consumer
import kafka.message.MessageAndMetadata
object ConsumerDemo {
def main(args: Array[String]): Unit = {
var groupid = ""
var consumerid = ""
var topic = ""
args match {
case Array(arg1, arg2, arg3) => topic = arg1; groupid = arg2; consumerid = arg3
}
val props = new Properties()
props.put("zookeeper.connect", "192.168.1.151:2181,192.168.1.152:2181,192.168.1.153:2181")
props.put("group.id", groupid)
props.put("client.id", "test")
props.put("consumer.id", consumerid)
props.put("auto.offset.reset", "smallest")
props.put("auto.commit.enable", "true")
props.put("auto.commit.interval.ms", "100")
val consumerConfig = new ConsumerConfig(props)
val consumer = Consumer.create(consumerConfig)
val topicCountMap = Map(topic -> 1)
val consumerMap = consumer.createMessageStreams(topicCountMap)
val streams = consumerMap.get(topic).get
for (stream <- streams) {
val it = stream.iterator()
while (it.hasNext()) {
val messageAndMetadata = it.next()
val message = s"Topic:${messageAndMetadata.topic}, GroupID:$groupid, Consumer ID:$consumerid, PartitionID:${messageAndMetadata.partition}, " +
s"Offset:${messageAndMetadata.offset}, Message Key:${new String(messageAndMetadata.key())}, Message Payload: ${new String(messageAndMetadata.message())}"
System.out.println(message);
}
}
}
}
HashPartitioner:
import kafka.producer.Partitioner
import scala.math._
import kafka.utils.VerifiableProperties
class HashPartitioner extends Partitioner {
def this(verifiableProperties: VerifiableProperties) { this }
override def partition(key: Any, numPartitions: Int): Int = {
if (key.isInstanceOf[Int]) {
abs(key.toString().toInt) % numPartitions
}
key.hashCode() % numPartitions
}
}
运行结果:
所有消息都被路由到了Partition1,测试成功!

Kafka 学习笔记之 Producer/Consumer (Scala)
标签:thread 语法 mat java out pre com serial buffer
原文地址:http://www.cnblogs.com/AK47Sonic/p/7260577.html