码迷,mamicode.com
首页 > 其他好文 > 详细

RabbitMQ 路由、主题模式

时间:2021-02-04 11:41:42      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:订阅   通配   代码   exce   主题模式   队列模式   业务   背景   oid   

一、RabbitMQ的路由模式和应用场景

1、什么是rabbitmq的路由模式

  • ?档: https://www.rabbitmq.com/tutorials/tutorial-four-java.html
  • 交换机类型是Direct
  • 队列和交换机绑定,需要指定?个路由key( 也叫 Bingding Key)
  • 消息?产者发送消息给交换机,需要指定routingKey
  • 交换机根据消息的路由key,转发给对应的队列

技术图片

2、举例

?志采集系统 ELK

  • ?个队列收集error信息-》告警
  • ?个队列收集全部信息-》?常使用

二、路由模式代码实战

发送端

public class Send {
    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.216.130");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        //JDK7语法,自动关闭,创建连接
        try (Connection connection = factory.newConnection();
             //创建信道
             Channel channel = connection.createChannel()) {

            //绑定交换机,直连交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

            String error = "我是订单服务的error日志";
            String info = "我是订单服务的info日志";
            String debug = "我是订单服务的debug日志";

            channel.basicPublish(EXCHANGE_NAME, "errorRoutingKey", null, error.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", null, info.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "debugRoutingKey", null, debug.getBytes(StandardCharsets.UTF_8));

            System.out.println("direct消息发送成功");
        }
    }
}

发送端1,注意和下面的发送端2不一样

public class Recv1 {
    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.216.130");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //绑定交换机,fanout扇形,即广播
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //获取队列
        String queueName = channel.queueDeclare().getQueue();

        //绑定交换机和队列, direct交换机需要指定routingkey
        channel.queueBind(queueName, EXCHANGE_NAME, "errorRoutingKey");
        channel.queueBind(queueName, EXCHANGE_NAME, "infoRoutingKey");
        channel.queueBind(queueName, EXCHANGE_NAME, "debugRoutingKey");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body=" + new String(body, "utf-8"));

                //手工确认消息消费,不是多条确认
                channel.basicAck(envelope.getDeliveryTag(), false);

            }
        };

        //消费,关闭消息消息自动确认,重要
        channel.basicConsume(queueName, false, consumer);
    }
}

发送端2

public class Recv2 {

    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.216.130");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //绑定交换机,fanout扇形,即广播
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);

        //获取队列
        String queueName = channel.queueDeclare().getQueue();

        //绑定交换机和队列, direct交换机需要指定routingkey
        channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body="+new String(body,"utf-8"));

                //手工确认消息消费,不是多条确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //消费,关闭消息消息自动确认,重要
        channel.basicConsume(queueName,false,consumer);
    }
}

技术图片

技术图片

三、topic主题通配符模式和应用场景

1、背景

  • 如果业务很多路由key,怎么维护??
  • topic交换机,?持通配符匹配模式,更加强大
  • 工作基本都是用这个topic模式

2、什么是rabbitmq的主题模式

  • ?档 https://www.rabbitmq.com/tutorials/tutorial-five-java.html
  • 交换机是 topic,可以实现发布订阅模式fanout和路由模Direct的功能,更加灵活,?持模式匹配,通配符等
  • 交换机通过通配符进?转发到对应的队列,* 代表?个词, #代表1个或多个词,?般用#作为通配符居多,?#.order,会匹配 info.order 、 sys.error.order, 而 *.order ,只会匹配 info.order,之间是使用. 点进?分割多个词的; 如果是 ., 则info.ordererror.order会匹配

注意

  • 交换机和队列绑定时用的binding使用通配符的路由
  • ?产者发送消息时需要使用具体的路由键

测试,下?的匹配规则是怎样的

quick.orange.rabbit 只会匹配 *.orange.* 和*.*.rabbit ,进到Q1和Q2
lazy.orange.elephant 只会匹配 *.orange.* 和lazy.#,进到Q1和Q2
quick.orange.fox 只会匹配 *.orange.*,进入Q1
lazy.brown.fox 只会匹配azy.#,进入Q2
lazy.pink.rabbit 只会匹配 lazy.#和*.*.rabbit ,同个队列进入Q2(消息只会发一次)
quick.brown.fox 没有匹配,默认会被丢弃,可以通过回调监听二次处理
lazy.orange.male.rabbit,只会匹配 lazy.#,进入Q2

四、topic主题模式代码实战

例?:?志采集系统

?个队列收集订单系统的error?志信息,order.log.error

?个队列收集全部系统的全部级别?志信息, * .log. *

生产端

public class Send {
    private final static String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.216.130");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        //JDK7语法,自动关闭,创建连接
        try (Connection connection = factory.newConnection();
             //创建信道
             Channel channel = connection.createChannel()) {

            //绑定交换机,topic交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

            String error = "我是订单服务的error日志";
            String info = "我是订单服务的info日志";
            String debug = "我是商品服务的debug日志";

            channel.basicPublish(EXCHANGE_NAME, "order.log.error", null, error.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, info.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "product.log.debug", null, debug.getBytes(StandardCharsets.UTF_8));

            System.out.println("TOPIC消息发送成功");
        }
    }
}

消费端1

public class Recv1 {
    private final static String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.216.130");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();


        //绑定交换机,
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        //获取队列
        String queueName = channel.queueDeclare().getQueue();

        //绑定交换机和队列, 需要指定routingkey
        channel.queueBind(queueName, EXCHANGE_NAME, "order.log.error");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body=" + new String(body, "utf-8"));

                //手工确认消息消费,不是多条确认
                channel.basicAck(envelope.getDeliveryTag(), false);

            }
        };

        //消费,关闭消息消息自动确认,重要
        channel.basicConsume(queueName, false, consumer);
    }
}

消费端2

public class Recv2 {
    private final static String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.216.130");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //绑定交换机,
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);

        //获取队列
        String queueName = channel.queueDeclare().getQueue();

        //绑定交换机和队列, 需要指定routingkey
        channel.queueBind(queueName,EXCHANGE_NAME,"*.log.*");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body="+new String(body,"utf-8"));

                //手工确认消息消费,不是多条确认
                channel.basicAck(envelope.getDeliveryTag(),false);

            }
        };

        //消费,关闭消息消息自动确认,重要
        channel.basicConsume(queueName,false,consumer);
    }
}

技术图片

五、总结

简单模式

  • ?个?产、?个消费,不用指定交换机,使用默认交换

?作队列模式

  • ?个?产,多个消费,可以有轮训和公平策略,不用指定交换机,使用默认交换机

发布订阅模式

  • fanout类型交换机,通过交换机和队列绑定,不用指定绑定的路由键,?产者发送消息到交换机, fanout交换机直接进?转发,消息不用指定routingkey路由键

路由模式

  • direct类型交换机,通过交换机和队列绑定,指定绑定的路由键,?产者发送消息到交换机,交换机根据消息的路由key进?转发到对应的队列,消息要指定routingkey路由键

通配符模式

  • topic交换机,通过交换机和队列绑定,指定绑定的【通配符路由键】,?产者发送消息到交换机,交换机根据消息的路由key进?转发到对应的队列,消息要指定routingkey路由键

RabbitMQ 路由、主题模式

标签:订阅   通配   代码   exce   主题模式   队列模式   业务   背景   oid   

原文地址:https://www.cnblogs.com/jwen1994/p/14362584.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!