码迷,mamicode.com
首页 > Web开发 > 详细

基于EasyNetQ的RabbitMQ封装类

时间:2017-06-06 13:00:58      阅读:649      评论:0      收藏:0      [点我收藏+]

标签:top   queue   res   open   efi   figure   mqc   pos   receive   

最近在捣鼓RabbitMQ,为了方便使用,自己基于EasyNetQ封装了一个类,现在贴出来还望各路大佬神明指点,共同学习。

技术分享
  1     /// <summary>
  2     /// RabbitMQ客户端封装类,基于EasyNetQ,使用时需要从nuget安装EasyNetQ。
  3     /// <para>
  4     /// <example>
  5     /// 使用方法:
  6     /// <code>
  7     /// using(var mq = new RabbitMqClient(‘rabbitmq连接字符串‘))
  8     /// { ...
  9     /// }
 10     /// </code>
 11     /// </example>
 12     /// </para>
 13     /// </summary>
 14     public class RabbitMqClient : IDisposable
 15     {
 16         private readonly IBus bus;
 17 
 18         /// <summary>
 19         /// 构造函数
 20         /// </summary>
 21         /// <param name="connectionString">rabbitmq连接字符串</param>
 22         public RabbitMqClient(string connectionString)
 23         {
 24             if (string.IsNullOrEmpty(connectionString))
 25                 throw new ArgumentNullException(nameof(connectionString));
 26             bus = RabbitHutch.CreateBus(connectionString);
 27         }
 28         /// <summary>
 29         /// 发布一条消息(广播)
 30         /// </summary>
 31         /// <param name="message"></param>
 32         public void Publish<TMessage>(TMessage message) where TMessage:class 
 33         {
 34             bus.PublishAsync(message);
 35         }
 36 
 37         /// <summary>
 38         /// 指定Topic,发布一条消息
 39         /// </summary>
 40         /// <param name="message"></param>
 41         /// <param name="topic"></param>
 42         public void PublishWithTopic<TMessage>(TMessage message, string topic) where TMessage : class
 43         {
 44             if(string.IsNullOrEmpty(topic))
 45                 Publish(message);
 46             else
 47                 bus.PublishAsync(message, x=>x.WithTopic(topic));
 48         }
 49 
 50         /// <summary>
 51         /// 发布消息。一次性发布多条
 52         /// </summary>
 53         /// <param name="messages"></param>
 54         public void PublishMany<TMessage>(List<TMessage> messages) where TMessage : class 
 55         {
 56             foreach (var message in messages)
 57             {
 58                 Publish(message);
 59                 Thread.Sleep(50);//必须加上,以防消息阻塞
 60             }
 61         }
 62 
 63         /// <summary>
 64         /// 发布消息。一次性发布多条
 65         /// </summary>
 66         /// <param name="messages"></param>
 67         /// <param name="topic"></param>
 68         public void PublishManyWithTopic<TMessage>(List<TMessage> messages, string topic) where TMessage : class
 69         {
 70             foreach (var message in messages)
 71             {
 72                 PublishWithTopic(message, topic);
 73                 Thread.Sleep(50);//必须加上,以防消息阻塞
 74             }
 75         }
 76 
 77         /// <summary>
 78         /// 给指定队列发送一条信息
 79         /// </summary>
 80         /// <param name="queue">队列名称</param>
 81         /// <param name="message">消息</param>
 82         public void Send<TMessage>(string queue, TMessage message) where TMessage : class
 83         {
 84             bus.Send(queue, message);
 85         }
 86 
 87         /// <summary>
 88         /// 给指定队列批量发送信息
 89         /// </summary>
 90         /// <param name="queue">队列名称</param>
 91         /// <param name="messages">消息</param>
 92         public void SendMany<TMessage>(string queue, IList<TMessage> messages) where TMessage : class
 93         {
 94             foreach (var message in messages)
 95             {
 96                 SendAsync(queue, message);
 97                 Thread.Sleep(50);//必须加上,以防消息阻塞
 98             }
 99         }
100 
101         /// <summary>
102         /// 给指定队列发送一条信息(异步)
103         /// </summary>
104         /// <param name="queue">队列名称</param>
105         /// <param name="message">消息</param>
106         /// <returns></returns>
107         public async void SendAsync<TMessage>(string queue, TMessage message) where TMessage:class 
108         {
109             await bus.SendAsync(queue, message);
110         }
111 
112         /// <summary>
113         /// 从指定队列接收一天信息,并做相关处理。
114         /// </summary>
115         /// <param name="queue">队列名称</param>
116         /// <param name="process">
117         /// 消息处理委托方法
118         /// <para>
119         /// <example>
120         /// 例如:
121         /// <code>
122         /// message=>Task.Factory.StartNew(()=>{
123         ///     Console.WriteLine(message);
124         /// })
125         /// </code>
126         /// </example>
127         /// </para>
128         /// </param>
129         public void Receive<TMessage>(string queue, Func<TMessage, Task> process) where TMessage:class 
130         {
131             bus.Receive(queue, process);
132         }
133 
134         /// <summary>
135         /// 消息订阅
136         /// </summary>
137         /// <param name="subscriptionId">消息订阅标识</param>
138         /// <param name="process">
139         /// 消息处理委托方法
140         /// <para>
141         /// <example>
142         /// 例如:
143         /// <code>
144         /// message=>Task.Factory.StartNew(()=>{
145         ///     Console.WriteLine(message);
146         /// })
147         /// </code>
148         /// </example>
149         /// </para>
150         /// </param>
151         public void Subscribe<TMessage>(string subscriptionId, Func<TMessage, Task> process) where TMessage:class 
152         {
153             bus.Subscribe<TMessage>(subscriptionId, message => process(message));
154         }
155 
156         /// <summary>
157         /// 消息订阅
158         /// </summary>
159         /// <param name="subscriptionId">消息订阅标识</param>
160         /// <param name="process">
161         /// 消息处理委托方法
162         /// <para>
163         /// <example>
164         /// 例如:
165         /// <code>
166         /// message=>Task.Factory.StartNew(()=>{
167         ///     Console.WriteLine(message);
168         /// })
169         /// </code>
170         /// </example>
171         /// </para>
172         /// </param>
173         /// <param name="topic">topic</param>
174         public void SubscribeWithTopic<TMessage>(string subscriptionId, Func<TMessage, Task> process, string topic) where TMessage:class 
175         {
176             bus.Subscribe<TMessage>(subscriptionId, message => process(message), x=>x.WithTopic(topic));
177         }
178 
179         /// <summary>
180         /// 自动订阅
181         /// </summary>
182         /// <param name="assemblyName"></param>
183         /// <param name="subscriptionIdPrefix"></param>
184         /// <param name="topic"></param>
185         public void AutoSubscribe(string assemblyName, string subscriptionIdPrefix, string topic)
186         {
187             var subscriber = new AutoSubscriber(bus, subscriptionIdPrefix);
188             if (!string.IsNullOrEmpty(topic))
189                 subscriber.ConfigureSubscriptionConfiguration = x => x.WithTopic(topic);
190             subscriber.Subscribe(Assembly.Load(assemblyName));
191         }
192 
193         /// <summary>
194         /// 资源释放
195         /// </summary>
196         public void Dispose()
197         {
198             if (bus != null) bus.Dispose();
199         }
200     }
View Code

 

基于EasyNetQ的RabbitMQ封装类

标签:top   queue   res   open   efi   figure   mqc   pos   receive   

原文地址:http://www.cnblogs.com/tongyinaocan/p/6950772.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!