码迷,mamicode.com
首页 > 其他好文 > 详细

RabbitMQ - topic

时间:2014-11-15 21:39:10      阅读:299      评论:0      收藏:0      [点我收藏+]

标签:style   blog   http   io   color   ar   os   使用   sp   

在publish/subscribe模式中使用fanout类型有个缺陷,就是不能选择性接收的消息。
我们可以让consumer获得所有已发布的消息中指定的几个消息。


在之前的例子中我们这样绑定exchange和队列:

channel.queueBind(queueName, EXCHANGE_NAME, "");


暂且不论该代码中绑定的exchange类型,这里空着的参数就是routing key。
routing key的意义与exchange类型有关,比如使用fanout类型就会忽略掉routing key。


而解决这一问题的就是direct类型。
direct exchange并不复杂,只不过是producer和consumer双方的exchange对应时还需要对应routing key。
bubuko.com,布布扣

 

以下代码中,同一个exchange和两个队列进行绑定,两个队列分别和不同的binding key绑定。
(PS:当然,我们也可以将同一个routing key绑定给不同的队列也没有问题。)
另外,SERVERITY变量是rounting数组,假设将日志通过exchange发送出去,consumer根据自己的需要获取不同级别的日志:

final class ChannelFactory_{
    private final static ConnectionFactory connFactory = new ConnectionFactory();
 
    public final static String EXCHANGE_NAME = "direct_exchange";
    public final static String[] SEVERITY = {"info","warning","error"};
 
    static {
        Channel temp = getChannel();
        try {
            temp.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    public static Channel getChannel(int channelNumber){
        try {
            Connection connection = connFactory.newConnection();
            return connection.createChannel(channelNumber);
        } catch (IOException e) {
            e.printStackTrace();
        }return null;
    }
 
    public static Channel getChannel(){
        try {
            Connection connection = connFactory.newConnection();
            return connection.createChannel();
        } catch (IOException e) {
            e.printStackTrace();
        }return null;
    }
 
    public static void  closeChannel(Channel channel) throws IOException {
        channel.close();
        channel.getConnection().close();
    }
 
}

 

确认定义:

bubuko.com,布布扣

 

consumer只需要warning和error级别(routing)的日志消息:

public static void main(String[] args) throws IOException, InterruptedException {
        Channel channel = ChannelFactory_.getChannel();
 
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"warning");
        channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"error");
 
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName,true,consumer);
        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();
 
            System.out.println(" [x] Received ‘" + routingKey + "‘:‘" + message + "‘");
        }
 
    }

 

producer将所有级别的日志都发送出去:

public static void main(String[] args) throws IOException {
        Channel channel = ChannelFactory_.getChannel();
        String content = "message "+new Date();
 
        for (int i = 0; i <ChannelFactory_.SEVERITY.length ; i++) {
            channel.basicPublish(EXCHANGE_NAME,ChannelFactory_.SEVERITY[i],null,content.getBytes());
        }
        ChannelFactory_.closeChannel(channel);
    }

 

运行结果:

bubuko.com,布布扣

 

direct exchange可以让我们有选择性地接受消息。
但这样做仍然有缺陷。
虽然我可以只要求error和warning级别的日志,但是我不能再进行细分。
比如我只想要数据库相关的error和warning级别的日志。


为了实现这一点,我们需要使用另一个exchange类型——Topic。
exchange类型为topic时,routing key是一组用"."隔开的词,但仅限255bytes。
比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"


topic和direct的不同点还有在consumer中定义routing key时我们可以使用通配符,比如:
符号‘*‘:可以匹配某一个词。
符号‘#‘:可以匹配0~N个词。

bubuko.com,布布扣

 

举个例子说明,假设我们用rounting key描述一个动物。
格式为: <性格>.<颜色>.<种类>
用符号‘*‘,我想要得到桔***的动物,即:"*.orange.*"
用符号‘#‘,我想要得到懒散的动物,即:"lazy.#"
如果使用过程中有人破坏了格式,即使rounting key为"lazy.orange.male.rabbit"也可以匹配"lazy.#"。


稍微修改上面的代码,首先定义一个topic exchange。

public  final static String EXCHANGE_NAME = "topic_exchange";
temp.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC);

 

确认定义:

bubuko.com,布布扣

 

发送sql相关的log:

public static void main(String[] args) throws IOException {
        Channel channel = ChannelFactory_.getChannel();
        String content = "message #$#$#$#$#$#$";
 
        channel.basicPublish(EXCHANGE_NAME,"warning.sql.connection.close",null,content.getBytes());
        channel.basicPublish(EXCHANGE_NAME,"error.sql.syntax",null,content.getBytes());
 
        ChannelFactory_.closeChannel(channel);
    }

 

consumer接收所有sql相关的warning和所有error:

public static void main(String[] args) throws IOException, InterruptedException {
        Channel channel = ChannelFactory_.getChannel();
 
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"warning.sql.#");
        channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"error.#");
 
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName,true,consumer);
        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();
 
            System.out.println(" [x] Received ‘" + routingKey + "‘:‘" + message + "‘");
        }
 
    }

 

运行结果:

bubuko.com,布布扣

RabbitMQ - topic

标签:style   blog   http   io   color   ar   os   使用   sp   

原文地址:http://www.cnblogs.com/alvez/p/4100128.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!