码迷,mamicode.com
首页 > 其他好文 > 详细

rabbitmq-direct路由订阅模型

时间:2021-06-10 18:34:25      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:ann   queue   sage   err   org   out   junit   highlight   live   

生产者:

package com.gavin.mq.direct;

import com.gavin.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 路由订阅模型routing-direct模式:
 */
public class WorkProvider {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {

        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机 参数1:交换机名称
        channel.exchangeDeclare("logs_direct","direct");
        //routingKey为info
        channel.basicPublish("logs_direct","info", MessageProperties.PERSISTENT_BASIC,("hello direct").getBytes());
        RabbitMQUtils.close(channel,connection);
    }
}

  消费者1:

package com.gavin.mq.direct;

import com.gavin.utils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class WorkConsumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("logs_direct","direct");
        String tempQueue = channel.queueDeclare().getQueue();
        channel.queueBind(tempQueue,"logs_direct","error");
        channel.queueBind(tempQueue,"logs_direct","warning");
        channel.basicConsume(tempQueue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        });
    }
}

  

 

      消费者2:

package com.gavin.mq.direct;

import com.gavin.utils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class WorkConsumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("logs_direct","direct");
        String tempQueue = channel.queueDeclare().getQueue();
        channel.queueBind(tempQueue,"logs_direct","info");
        channel.basicConsume(tempQueue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer2:"+new String(body));
            }
        });
    }
}

  

 

rabbitmq-direct路由订阅模型

标签:ann   queue   sage   err   org   out   junit   highlight   live   

原文地址:https://www.cnblogs.com/gavinmiao/p/14870598.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!