码迷,mamicode.com
首页 > 其他好文 > 详细

ActiveMQ使用示例之Topic

时间:2017-04-12 01:49:36      阅读:369      评论:0      收藏:0      [点我收藏+]

标签:string   efault   tor   无法   name   send   分享   技术分享   持久化消息   

非持久的Topic消息示例 

对于非持久的Topic消息的发送基本跟前面发送队列信息是一样的,只是把创建Destination的地方,由创建队列替换成创建Topic,例如:

Destination destination = session.createTopic("MyTopic");

对于非持久的Topic消息的接收
1:必须要接收方在线,然后客户端再发送信息,接收方才能接收到消息
2:同样把创建Destination的地方,由创建队列替换成创建Topic,例如:
Destination destination = session.createTopic("MyTopic");
3:由于不知道客户端发送多少信息,因此改成while循环的方式了,例如:

Message message = consumer.receive();
while(message!=null) {
  TextMessage txtMsg = (TextMessage)message;
  System.out.println("收到消 息:" + txtMsg.getText());
  message = consumer.receive(1000L);
} 

生产者代码:

public class NoPersistenceSender {

    //默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKEURL = "tcp://192.168.0.101:61616";
    //发送的消息数量
    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        //连接工厂
        ConnectionFactory connectionFactory;
        //连接
        Connection connection = null;
        //会话 接受或者发送消息的线程
        Session session;
        //消息的目的地
        Destination destination;
        //消息生产者
        MessageProducer messageProducer;
        //实例化连接工厂(连接到ActiveMQ服务器)
        connectionFactory = new ActiveMQConnectionFactory(NoPersistenceSender.USERNAME, NoPersistenceSender.PASSWORD, NoPersistenceSender.BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建session
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //创建一个名称为MyTopic的消息队列(生产者生成的消息放在哪)
            destination = session.createTopic("MyTopic");
            //创建消息生产者
            messageProducer = session.createProducer(destination);
            //发送消息
            sendMessage(session, messageProducer);

            session.commit();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    /**
     * 发送消息
     *
     * @param session
     * @param messageProducer 消息生产者
     * @throws Exception
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
        for (int i = 0; i < NoPersistenceSender.SENDNUM; i++) {
            //创建一条文本消息
            TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
            System.out.println("发送消息:Activemq 发送消息" + i);
            //通过消息生产者发出消息
            messageProducer.send(message);
        }

    }
}

消费者代码:

public class NoPersistenceReceiver {

    //默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKEURL = "tcp://192.168.0.101:61616";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//连接工厂
        Connection connection = null;//连接

        Session session;//会话 接受或者发送消息的线程
        Destination destination;//消息的目的地

        MessageConsumer messageConsumer;//消息的消费者

        //实例化连接工厂(连接到ActiveMQ服务器)
        connectionFactory = new ActiveMQConnectionFactory(NoPersistenceReceiver.USERNAME, NoPersistenceReceiver.PASSWORD, NoPersistenceReceiver.BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建session
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //生产者将消息发送到MyTopic,所以消费者要到MyTopic去取
            destination = session.createTopic("MyTopic");
            //创建消息消费者
            messageConsumer = session.createConsumer(destination);

            Message message = messageConsumer.receive();
            while (message != null) {
                TextMessage txtMsg = (TextMessage) message;
                System.out.println("收到消息:" + txtMsg.getText());
                message = messageConsumer.receive(1000L);
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

如果运行生产者的时候没有启动消费者,也就是先运行生产者后运行消费者,那么运行效果是这样的

技术分享

消费者阻塞

技术分享

查看一下控制台

技术分享

队列中有消息,但是无法消费~

在消费者运行的情况下再运行生产者

看下控制台

技术分享

持久的Topic消息示例

生产者:

1:要用持久化订阅,发送消息者要用 DeliveryMode.PERSISTENT 模式发现,在连接之前设定
2:一定要设置完成后,再start 这个 connection

public class PersistenceSender {

    //默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKEURL = "tcp://192.168.0.101:61616";
    //发送的消息数量
    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        //连接工厂
        ConnectionFactory connectionFactory;
        //连接
        Connection connection = null;
        //会话 接受或者发送消息的线程
        Session session;
        //消息的目的地
        Destination destination;
        //消息生产者
        MessageProducer messageProducer;
        //实例化连接工厂(连接到ActiveMQ服务器)
        connectionFactory = new ActiveMQConnectionFactory(PersistenceSender.USERNAME, PersistenceSender.PASSWORD, PersistenceSender.BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            //创建session
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //创建一个名称为MyTopic的消息队列(生产者生成的消息放在哪)
            destination = session.createTopic("MyTopic");
            //创建消息生产者
            messageProducer = session.createProducer(destination);
            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
            //启动连接
            connection.start();
            //发送消息
            sendMessage(session, messageProducer);

            session.commit();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    /**
     * 发送消息
     *
     * @param session
     * @param messageProducer 消息生产者
     * @throws Exception
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
        for (int i = 0; i < PersistenceSender.SENDNUM; i++) {
            //创建一条文本消息
            TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
            System.out.println("发送消息:Activemq 发送消息" + i);
            //通过消息生产者发出消息
            messageProducer.send(message);
        }

    }
}

消费者:

1:需要在连接上设置消费者id,用来识别消费者

2:需要创建TopicSubscriber来订阅

3:要设置好了过后再start 这个connection

4:一定要先运行一次,等于向消息服务中间件注册这个消费者,然后再运行客户端发送信息,这个时候,无论消费者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收过的消息都接收下来。

public class PersistenceReceiver {

    //默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKEURL = "tcp://192.168.0.101:61616";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//连接工厂
        Connection connection = null;//连接

        Session session;//会话 接受或者发送消息的线程
        Topic topic;//消息的目的地

        //实例化连接工厂(连接到ActiveMQ服务器)
        connectionFactory = new ActiveMQConnectionFactory(PersistenceReceiver.USERNAME, PersistenceReceiver.PASSWORD, PersistenceReceiver.BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            connection.setClientID("winner_0715");
            //创建session
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //生产者将消息发送到MyTopic,所以消费者要到MyTopic去取
            topic = session.createTopic("MyTopic");
            //创建消息消费者
            TopicSubscriber consumer = session.createDurableSubscriber(topic, "t1");

            //启动连接
            connection.start();

            Message message = consumer.receive();
            while (message != null) {
                TextMessage txtMsg = (TextMessage) message;
                System.out.println("收到消 息:" + txtMsg.getText());
                //没这句有错
                message = consumer.receive(1000L);
            }
            session.commit();
            session.close();
            connection.close();

        } catch (JMSException e) {
            e.printStackTrace();
        }

    }

}

控制台:

技术分享

技术分享

关于持久化和非持久化消息

持久化消息
这是 ActiveMQ 的默认传送模式,此模式保证这些消息只被传送一次和成 功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。这意味着在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。

非持久化消息
保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。 此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。 有两种方法指定传送模式:
1.使用setDeliveryMode 方法,这样所有的消息都采用此传送模式; 如:
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
2.使用send 方法为每一条消息设置传送模式

 

ActiveMQ使用示例之Topic

标签:string   efault   tor   无法   name   send   分享   技术分享   持久化消息   

原文地址:http://www.cnblogs.com/winner-0715/p/6697102.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!