标签:none exchange argument tco integer publish ice art eth
1.依赖
<!--rabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dep
2.配置信息
#设置端口 server.port=80 #安装的RabbitMq的服务器IP spring.rabbitmq.host=192.168.***.** #安装的RabbitMq的服务器端口 spring.rabbitmq.port=5672 #安装的RabbitMq的用户名 spring.rabbitmq.username=xxx #安装的RabbitMq的密码 spring.rabbitmq.password=xxx #消息确认机制 spring.rabbitmq.publisher-confirms=true #与消息确认机制联合使用,保证能够收到回调 spring.rabbitmq.publisher-returns=true #消息确认模式 MANUAL:手动确认 NONE:不确认 AUTO:自动确认 spring.rabbitmq.listener.simple.acknowledge-mode=auto #消费者 spring.rabbitmq.listener.simple.concurrency=10 spring.rabbitmq.listener.simple.max-concurrency=10 #发布后重试 spring.rabbitmq.listener.simple.retry.enabled=true spring.rabbitmq.listener.simple.retry.initial-interval=5000 spring.rabbitmq.listener.simple.retry.max-attempts=5 #每隔多久进行重试 spring.rabbitmq.template.retry.multiplier=1.0 #消费失败后重新消费 spring.rabbitmq.listener.simple.default-requeue-rejected=false #自定义的vhost spring.rabbitmq.dev-virtual-host=devVir spring.rabbitmq.test-virtual-host=testVir
3.配置信息:此处为多个Vhost配置,单个可直接使用,无需另外配置,只需声明队列信息即可
package com.rabbit.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
/**
* 2019年7月7日15:43:38 Joelan整合 RabbitConfig 概念介绍:
* 1.Queue:队列,是RabbitMq的内部对象,用于存储消息,RabbitMq的多个消费者可以订阅同一个队列,此时队列会以轮询的方式给多个消费者消费,而非多个消费者都收到所有的消息进行消费
* 注意:RabbitMQ不支持队列层面的广播消费,如果需要广播消费,可以采用一个交换器通过路由Key绑定多个队列,由多个消费者来订阅这些队列的方式。
* 2.Exchange:交换器,在RabbitMq中,生产者并非直接将消息投递到队列中。真实情况是,生产者将消息发送到Exchange(交换器),由交换器将消息路由到一个或多个队列中。
* 注意:如果路由不到,或返回给生产者,或直接丢弃,或做其它处理。
* 3.RoutingKey:路由Key,生产者将消息发送给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则。这个路由Key需要与交换器类型和绑定键(BindingKey)联合使用才能
* 最终生效。在交换器类型和绑定键固定的情况下,生产者可以在发送消息给交换器时通过指定RoutingKey来决定消息流向哪里。
* 4.Binding:RabbitMQ通过绑定将交换器和队列关联起来,在绑定的时候一般会指定一个绑定键,这样RabbitMQ就可以指定如何正确的路由到队列了。
*/
@Configuration
public class RabbitConfig {
/**
* RabbitMq的主机地址
*/
@Value("${spring.rabbitmq.host}")
private String host;
/**
* RabbitMq的端口
*/
@Value("${spring.rabbitmq.port}")
private Integer port;
/**
* 用户账号
*/
@Value("${spring.rabbitmq.username}")
private String username;
/**
* 用户密码
*/
@Value("${spring.rabbitmq.password}")
private String password;
/**
* 消息确认,回调机制
*/
@Value("${spring.rabbitmq.publisher-confirms}")
private boolean confirms;
@Value("${spring.rabbitmq.publisher-returns}")
private boolean returns;
/**
* vhost:dev
*/
@Value("${spring.rabbitmq.dev-virtual-host}")
private String hrmDevVirtualHost;
/**
* vhost:test
*/
@Value("${spring.rabbitmq.test-virtual-host}")
private String hrmTestVirtualHost;
/**
* 若一个项目只使用一个virtualHost的话,默认只需要在配置文件中配置其属性即可
* 若项目中使用到多个virtualHost,那么可以以通过创建ConnectionFactory的方式指定不同的virtualHost
*/
public ConnectionFactory createConnectionFactory(String host, Integer port, String username, String password,
String vHost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setSimplePublisherConfirms(confirms);
connectionFactory.setPublisherReturns(returns);
connectionFactory.setVirtualHost(vHost);
return connectionFactory;
}
// ----------------------------------------------------------------------------------------第一步,创建消息连接,第一个VirtualHost
/**
* 创建指定vhost:dev的连接工厂
*/
@Primary
@Bean(name = "devConnectionFactory")
public ConnectionFactory devConnectionFactory() {
return createConnectionFactory(host, port, username, password, hrmDevVirtualHost);
}
/**
* 若有多个vhost则自定义RabbitMqTemplate 通过名称指定对应的vhost
*/
@Primary
@Bean(name = "devRabbitTemplate")
public RabbitTemplate devRabbitTemplate(
@Qualifier(value = "devConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 消息确认机制,ConnectionFactory中必须设置回调机制(publisher-confirms,publisher-returns)
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息id为: " + correlationData + "的消息,已经被ack成功");
} else {
System.out.println("消息id为: " + correlationData + "的消息,消息nack,失败原因是:" + cause);
}
}
});
return rabbitTemplate;
}
// ----------------------------------------------------------------------------------------第二个VirtualHost,以此类推
/**
* 创建指定vhost:test的连接工厂
*/
@Bean(name = "testConnectionFactory")
public ConnectionFactory testConnectionFactory() {
return createConnectionFactory(host, port, username, password, hrmTestVirtualHost);
}
/**
* 若有多个vhost则自定义RabbitMqTemplate 通过名称指定对应的vhost,此处未使用回调
*/
@Bean(name = "testRabbitTemplate")
public RabbitTemplate testRabbitTemplate(
@Qualifier(value = "testConnectionFactory") ConnectionFactory connectionFactory) {
// RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// Map<String, Object> args = new HashMap<String, Object>();
// args.put("x-dead-letter-exchange","test_topic_exchange");
// Queue queue = new Queue("test_topic_queue");
return new RabbitTemplate(connectionFactory);
}
// ----------------------------------------------------------------------------------------第二步,声明队列信息,Fanout模式
/**
* 创建队列 参数name:队列的名称,不能为空;设置为“”以使代理生成该名称。
* 参数durable:true表示为持久队列,该队列将在服务器重新启动后继续存在
* 参数exclusive:如果声明独占队列,则为true,该队列将仅由声明者的连接使用
* 参数autoDelete:如果服务器不再使用队列时应将其删除,则自动删除为true 参数arguments:用于声明队列的参数
*/
@Bean
public Queue testFanoutQueue() {
/*
* 1.new Queue(name); return new Queue("test_fanout_queue");
*/
/*
* 2.new Queue(name,durable);
*/
return new Queue("test_fanout_queue", true, false, true);
/*
* 3.new Queue(name,durable,exclusive,autoDelete); return new
* Queue("test_fanout_queue", true, false, false);
*/
/*
* 4.new Queue(name,durable,exclusive,autoDelete,arguments); return new
* Queue("test_fanout_queue", true, true, true, null);
*/
}
/**
* 创建交换机 1.fanout:扇形交换器,它会把发送到该交换器的消息路由到所有与该交换器绑定的队列中,如果使用扇形交换器,则不会匹配路由Key
* 白话:一个交换机可以绑定N个队列,此模式会将写入的队列发送到一个交换机,由此交换机发送到N个队列中,那么监听该队列的消费者都能收到对应的消息
*/
@Bean
@Primary
public Exchange testFanoutExchange() {
return new FanoutExchange("test_fanout_exchange");
}
/**
* 绑定队列到交换机 Fanout模式不需要RoutingKey
*/
@Bean
public Binding testFanoutBinding() {
return BindingBuilder.bind(testFanoutQueue()).to(testFanoutExchange()).with("").noargs();
}
// ----------------------------------------------------------------------------------------Direct模式
/**
* 创建队列
*/
@Bean
public Queue testDirectQueue() {
return new Queue("test_direct_queue", true, false, true);
}
/**
* 创建交换机 2.direct交换器 直连模式,会把消息路由到RoutingKey与BindingKey完全匹配的队列中。
* 白话:直连模式在绑定队列到交换机的时候,RoutingKey与发送队列的RoutingKey要完全保持一致
*/
@Bean
public Exchange testDirectExchange() {
return new TopicExchange("test_direct_exchange");
}
/**
* 绑定队列到交换机并指定一个路由,此处的RoutingKey为test,发送队列时也必须使用test
*/
@Bean
public Binding testDirectBinding() {
return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with("test").noargs();
}
// ----------------------------------------------------------------------------------------Topic模式
/**
* 创建队列
*/
@Bean
public Queue testQueue() {
return new Queue("test_topic_queue", true, false, true);
}
/**
* 创建交换机 2.topic 匹配模式(个人)与直连模式区别:RoutingKey可以模糊匹配,两种匹配风格: *匹配 #匹配
* 我们的RoutingKey和BindKey为一个点分隔的字符串,例:test.routing.client
* 那么我们的模糊匹配,*可以匹配一个单词,即:*.routing.* 可以匹配到 test.routing.client,
* #可以匹配多个单词,即:#.client 可以匹配到 test.routing.client,以此类推
*/
@Bean
public Exchange topicExchange() {
return new TopicExchange("test_topic_exchange");
}
/**
* 绑定队列到交换机并指定一个路由
*/
@Bean
public Binding testBinding() {
return BindingBuilder.bind(testQueue()).to(topicExchange()).with("test.*").noargs();
}
// ----------------------结束强调:第一步创建连接,第二步声明队列,交换器,路由Key信息,第三步发送队列,第四步监听队列
}
4.发送队列
package com.rabbit.send;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
/**
* RabbitSend
*/
@Component
public class RabbitSend {
@Autowired
@Qualifier(value = "devRabbitTemplate")
private RabbitTemplate rabbitTemplate;
/**
* 发送一条Fanout扇形队列
*/
public void sendTestFanoutMsg(String msg) {
rabbitTemplate.convertAndSend("test_fanout_exchange", "", msg);
}
/**
* 发送一条Direct直连队列
* 若有开启回调机制,必须传此参数new CorrelationData("1"),用于声明ID
*/
public void sendTestDirectMsg(String msg) {
rabbitTemplate.convertAndSend("test_direct_exchange", "test", msg, new CorrelationData("1"));
}
/**
* 发送一条Topic消息队列
*/
public void sendTestMsg(String msg) {
rabbitTemplate.convertAndSend("test_topic_exchange", "test.mq", msg);
}
}
5.监听队列
package com.rabbit.receiver;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* RabbitReceiver
*/
@Component
public class RabbitReceiver {
@RabbitHandler
@RabbitListener(queues = "test_fanout_queue")
public void handlerFanout(String msg) {
System.out.println("RabbitReceiver:" + msg + "test_fanout_queue");
}
@RabbitHandler
@RabbitListener(queues = "test_direct_queue")
public void handlerDirect(String msg) {
System.out.println("RabbitReceiver:" + msg + "test_direct_queue");
}
@RabbitHandler
@RabbitListener(queues = "test_topic_queue")
public void handlerTopic(String msg) {
System.out.println("RabbitReceiver:" + msg);
}
}
标签:none exchange argument tco integer publish ice art eth
原文地址:https://www.cnblogs.com/joelan0927/p/11143408.html