标签:result 图片 message 文章 接收 ofo append 日志 分类

/// <summary>
/// 消费者
/// </summary>
public interface IKafkaConsumer : IDisposable
{
/// <summary>
/// 消费数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
T Consume<T>() where T : class;
}
public interface IKafkaProducer : IDisposable
{
/// <summary>
/// 发布消息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="key"></param>
/// <param name="data"></param>
/// <param name="operateType"></param>
/// <returns></returns>
bool Produce<T>(string key, T data, int operateType) where T : class;
}
实现方法
using Confluent.Kafka;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Text;
namespace Kafka
{
public class KafkaConsumer : IKafkaConsumer
{
private bool disposeHasBeenCalled = false;
private readonly object disposeHasBeenCalledLockObj = new object();
private readonly IConsumer<string, string> _consumer;
/// <summary>
/// 构造函数,初始化配置
/// </summary>
/// <param name="config">配置参数</param>
/// <param name="topic">主题名称</param>
public KafkaConsumer(ConsumerConfig config, string topic)
{
_consumer = new ConsumerBuilder<string, string>(config).Build();
_consumer.Subscribe(topic);
}
/// <summary>
/// 消费
/// </summary>
/// <returns></returns>
public T Consume<T>() where T : class
{
try
{
var result = _consumer.Consume(TimeSpan.FromSeconds(1));
if (result != null)
{
if (typeof(T) == typeof(string))
return (T)Convert.ChangeType(result.Value, typeof(T));
return JsonConvert.DeserializeObject<T>(result.Value);
}
}
catch (ConsumeException e)
{
Console.WriteLine($"consume error: {e.Error.Reason}");
}
catch (Exception e)
{
Console.WriteLine($"consume error: {e.Message}");
}
return default;
}
/// <summary>
/// 释放
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Dispose
/// </summary>
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
lock (disposeHasBeenCalledLockObj)
{
if (disposeHasBeenCalled) { return; }
disposeHasBeenCalled = true;
}
if (disposing)
{
_consumer?.Close();
}
}
}
}
public class KafkaProducer : IKafkaProducer
{
private bool disposeHasBeenCalled = false;
private readonly object disposeHasBeenCalledLockObj = new object();
private readonly IProducer<string, string> _producer;
private readonly string _topic;
/// <summary>
/// 构造函数,初始化配置
/// </summary>
/// <param name="config">配置参数</param>
/// <param name="topic">主题名称</param>
public KafkaProducer(ProducerConfig config, string topic)
{
_producer = new ProducerBuilder<string, string>(config).Build();
_topic = topic;
}
/// <summary>
/// 发布消息
/// </summary>
/// <typeparam name="T">数据实体</typeparam>
/// <param name="key">数据key,partition分区会根据key</param>
/// <param name="data">数据</param>
/// <param name="operateType">操作类型[增、删、改等不同类型]</param>
/// <returns></returns>
public bool Produce<T>(string key, T data, int operateType) where T : class
{
var obj = JsonConvert.SerializeObject(new
{
Type = operateType,
Data = data
});
try
{
var result = _producer.ProduceAsync(_topic, new Message<string, string>
{
Key = key,
Value = obj
}).ConfigureAwait(false).GetAwaiter().GetResult();
#if DEBUG
Console.WriteLine($"Topic: {result.Topic} Partition: {result.Partition} Offset: {result.Offset}");
#endif
return true;
}
catch (ProduceException<string, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
catch (Exception e)
{
Console.WriteLine($"Delivery failed: {e.Message}");
}
return false;
}
/// <summary>
/// 释放
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Dispose
/// </summary>
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
lock (disposeHasBeenCalledLockObj)
{
if (disposeHasBeenCalled) { return; }
disposeHasBeenCalled = true;
}
if (disposing)
{
_producer?.Dispose();
}
}
}
static void Main(string[] args)
{
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
Acks = Acks.All
};
//发送消息
using (var kafkaProducer = new KafkaProducer(config, "topic-d"))
{
var result = kafkaProducer.Produce<object>("a", new { name = "猪八戒3" }, 1);
}
Console.WriteLine("消息发送成功");
}
static void Main(string[] args)
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "test",
AutoOffsetReset = AutoOffsetReset.Earliest
};
string text;
Console.WriteLine("接受中......");
while ((text = Console.ReadLine()) != "q")
{
//接受消息
using (var kafkaProducer = new KafkaConsumer(config, "topic-d"))
{
var result = kafkaProducer.Consume<object>();
if (result != null)
{
Console.WriteLine(result.ToString());
}
}
}
}
上结果、

可以看到,消息已经收到了。这个demo里,消费端要一直处于正常状态才行,才能消费生产者得信息
本文版权归作者和博客园共有,来源网址:欢迎各位转载,但是未经作者本人同意,转载文章之后必须在文章页面明显位置给出作者和原文连接,否则保留追究法律责任的权利。
标签:result 图片 message 文章 接收 ofo append 日志 分类
原文地址:https://www.cnblogs.com/DanielYao/p/12922745.html