标签:ace gets 创建 void rod group 订阅 scribe list
先在nuget里安装RdKafka
using RdKafka;
using System;
using System.Text;
namespace KafkaApp
{
    class Program
    {
        static void Main(string[] args)
        {
            //ProducterTest();
            ConsumerTest();
        }
        private static async void ProducterTest()
        {
            // Producer 接受一个或多个 BrokerList
            using (Producer producer = new Producer("192.168.1.4:9092"))
            //发送到一个名为 testtopic 的Topic,如果没有就会创建一个
            using (Topic topic = producer.Topic("666"))
            {
                //将message转为一个 byte[]
                byte[] data = Encoding.UTF8.GetBytes("Hello RdKafka");
                DeliveryReport deliveryReport = await topic.Produce(data);
                Console.WriteLine($"发送到分区:{deliveryReport.Partition}, Offset 为: {deliveryReport.Offset}");
            }
        }
        public static void ConsumerTest()
        {
            //配置消费者组
            var config = new Config() { GroupId = "example-csharp-consumer" };
            using (var consumer = new EventConsumer(config, "192.168.1.4:9092"))
            {
                //注册一个事件
                consumer.OnMessage += (obj, msg) =>
                {
                    string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);
                    Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");
                };
                //订阅一个或者多个Topic
                consumer.Subscribe(new[] { "666" });
                //启动
                consumer.Start();
                Console.WriteLine("Started consumer, press enter to stop consuming");
                Console.ReadLine();
            }
        }
    }
}
标签:ace gets 创建 void rod group 订阅 scribe list
原文地址:https://www.cnblogs.com/bigmangotree/p/10560307.html