标签:删除 fir highlight and add string throw 结果 consumer
消息的确认:是指生产者投递消息后,如果Broker收到消息,则会给生产者一个应答。
生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障。

生产端
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.10.132");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//创建连接
Connection connection = connectionFactory.newConnection();
//通过连接创建一个Channel
Channel channel = connection.createChannel();
//指定消息的投递模式:消息的确认模式
channel.confirmSelect();
//通过Channel发送数据
channel.basicPublish("","hello",null,"hello world".getBytes());
//添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("----handleAck---");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("----handleNack---");
}
});
}
消费端:
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.10.132");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//创建连接
Connection connection = connectionFactory.newConnection();
//通过连接创建一个Channel
Channel channel = connection.createChannel();
//创建一个队列
String queueName = "hello";
channel.queueDeclare(queueName,true,false,false,null);
//创建一个消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//设置Channel
channel.basicConsume(queueName,true,consumer);
//获取消息
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消费端:"+msg);
}
}
运行结果:
消费端:

生产端:

某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这时候如果我们需要监听这种不可达的消息,就需要使用Return Listener
在API中有个一重要配置项:
Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,则broker端自动删除该消息。
Return消息机制流程:

消费端跟上文一样,
生产端:
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.10.132");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//创建连接
Connection connection = connectionFactory.newConnection();
//通过连接创建一个Channel
Channel channel = connection.createChannel();
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
System.out.println(properties);
System.out.println(Arrays.toString(body));
}
});
//通过Channel发送数据
// 在这里要设置Mandatory(第三个参数)为true,否则broker会自动删除消息
channel.basicPublish("","return",true,null,"hello world".getBytes());
}
打印结果:

RabbitMQ:Confirm确认消息 Return返回消息
标签:删除 fir highlight and add string throw 结果 consumer
原文地址:https://www.cnblogs.com/wwjj4811/p/12986683.html