码迷,mamicode.com
首页 > 其他好文 > 详细

RabbitMQ——工作队列模式

时间:2020-08-18 13:51:39      阅读:63      评论:0      收藏:0      [点我收藏+]

标签:object   发送   关闭   live   ble   code   rop   虚拟   throw   

1.模式说明

技术图片

 

 应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度
只有3个角色:
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
queue:消息队列,图中红色部分

2.应用举例

Producer:

技术图片
 1 /**
 2  * 发送消息
 3  */
 4 public class Producer_WorkQueues {
 5     public static void main(String[] args) throws IOException, TimeoutException {
 6 
 7         //1.创建连接工厂
 8         ConnectionFactory factory = new ConnectionFactory();
 9         //2. 设置参数
10         factory.setHost("172.16.98.133");//ip  默认值 localhost
11         factory.setPort(5672); //端口  默认值 5672
12         factory.setVirtualHost("/it");//虚拟机 默认值/
13         factory.setUsername("jingdong");//用户名 默认 guest
14         factory.setPassword("jingdong");//密码 默认值 guest
15         //3. 创建连接 Connection
16         Connection connection = factory.newConnection();
17         //4. 创建Channel
18         Channel channel = connection.createChannel();
19         //5. 创建队列Queue
20         /*
21         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
22         参数:
23             1. queue:队列名称
24             2. durable:是否持久化,当mq重启之后,还在
25             3. exclusive:
26                 * 是否独占。只能有一个消费者监听这队列
27                 * 当Connection关闭时,是否删除队列
28                 *
29             4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
30             5. arguments:参数。
31 
32          */
33         //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
34         channel.queueDeclare("work_queues",true,false,false,null);
35         /*
36         basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
37         参数:
38             1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
39             2. routingKey:路由名称
40             3. props:配置信息
41             4. body:发送消息数据
42 
43          */
44         for (int i = 1; i <= 10; i++) {
45             String body = i+"hello rabbitmq~~~";
46 
47             //6. 发送消息
48             channel.basicPublish("","work_queues",null,body.getBytes());
49         }
50         
51         //7.释放资源
52       channel.close();
53       connection.close();
54 
55     }
56 }
View Code

Customer:

技术图片
 1 public class Consumer_WorkQueues1 {
 2     public static void main(String[] args) throws IOException, TimeoutException {
 3 
 4         //1.创建连接工厂
 5         ConnectionFactory factory = new ConnectionFactory();
 6         //2. 设置参数
 7         factory.setHost("172.16.98.133");//ip  默认值 localhost
 8         factory.setPort(5672); //端口  默认值 5672
 9         factory.setVirtualHost("/it");//虚拟机 默认值/
10         factory.setUsername("jingdong");//用户名 默认 guest
11         factory.setPassword("jingdong");//密码 默认值 guest
12         //3. 创建连接 Connection
13         Connection connection = factory.newConnection();
14         //4. 创建Channel
15         Channel channel = connection.createChannel();
16         //5. 创建队列Queue
17         /*
18         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
19         参数:
20             1. queue:队列名称
21             2. durable:是否持久化,当mq重启之后,还在
22             3. exclusive:
23                 * 是否独占。只能有一个消费者监听这队列
24                 * 当Connection关闭时,是否删除队列
25                 *
26             4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
27             5. arguments:参数。
28 
29          */
30         //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
31         channel.queueDeclare("work_queues",true,false,false,null);
32 
33         /*
34         basicConsume(String queue, boolean autoAck, Consumer callback)
35         参数:
36             1. queue:队列名称
37             2. autoAck:是否自动确认
38             3. callback:回调对象
39 
40          */
41         // 接收消息
42         Consumer consumer = new DefaultConsumer(channel){
43             /*
44                 回调方法,当收到消息后,会自动执行该方法
45 
46                 1. consumerTag:标识
47                 2. envelope:获取一些信息,交换机,路由key...
48                 3. properties:配置信息
49                 4. body:数据
50 
51              */
52             @Override
53             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
54               /*  System.out.println("consumerTag:"+consumerTag);
55                 System.out.println("Exchange:"+envelope.getExchange());
56                 System.out.println("RoutingKey:"+envelope.getRoutingKey());
57                 System.out.println("properties:"+properties);*/
58                 System.out.println("body:"+new String(body));
59             }
60         };
61         channel.basicConsume("work_queues",true,consumer);
62 
63 
64         //关闭资源?不要
65 
66     }
67 }
View Code

 

RabbitMQ——工作队列模式

标签:object   发送   关闭   live   ble   code   rop   虚拟   throw   

原文地址:https://www.cnblogs.com/aaaazzzz/p/13511717.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!