码迷,mamicode.com
首页 > 编程语言 > 详细

java 点对点实例

时间:2018-05-27 20:56:22      阅读:202      评论:0      收藏:0      [点我收藏+]

标签:nal   lse   mqc   状态   system   producer   tran   tor   set   

1.创建一个抽象类定义发送消息和接受消息的抽象方法

package cn.base.jms;  
  
import javax.jms.*;  
  
/** 
 * @author gu.fei 
 * @version 2017-03-24 9:20 
 */  
public abstract class Queuehandler {  
  
    //默认队列名称queue  
    private String queue = "queue";  
  
    //连接工厂  
    private Connection connection;  
  
    private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;  
  
    private boolean transacted = false;  
  
    public Queuehandler() {  
    }  
  
    /** 
     * 发送消息 
     * @return 
     */  
    public abstract Object sendMessage(MessageProducer producer,Session session);  
  
    /** 
     * 接收消息 
     * @return 
     */  
    public abstract Object reciveMessage(Message message);  
  
    /** 
     * 执行发送 
     */  
    public void doSend() {  
        Session session = null;  
        try {  
            session = connection.createSession(transacted,acknowledgeMode);  
            Destination destination = session.createQueue(queue);  
            MessageProducer producer = session.createProducer(destination);  
            sendMessage(producer,session);  
        } catch (JMSException e) {  
            e.printStackTrace();  
        } finally {  
            if(null != session) {  
                try {  
                    session.close();  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }  
            }  
        }  
    }  
  
    /** 
     * 执行发送 
     */  
    public void doRecive() {  
        Session session = null;  
        try {  
            session = connection.createSession(transacted,acknowledgeMode);  
            Destination destination = session.createQueue(queue);  
            MessageConsumer consumer = session.createConsumer(destination);  
            consumer.setMessageListener(new MessageListener() {  
                @Override  
                public void onMessage(Message message) {  
                    reciveMessage(message);  
                }  
            });  
  
            //保持进程启动状态  
            while (true) {}  
        } catch (JMSException e) {  
            e.printStackTrace();  
        } finally {  
            if(null != session) {  
                try {  
                    session.close();  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }  
            }  
        }  
    }  
  
    private void init() {  
  
    }  
  
    public String getQueue() {  
        return queue;  
    }  
  
    public void setQueue(String queue) {  
        this.queue = queue;  
    }  
  
    public Connection getConnection() {  
        return connection;  
    }  
  
    public void setConnection(Connection connection) {  
        this.connection = connection;  
    }  
  
    public int getAcknowledgeMode() {  
        return acknowledgeMode;  
    }  
  
    public void setAcknowledgeMode(int acknowledgeMode) {  
        this.acknowledgeMode = acknowledgeMode;  
    }  
  
    public boolean isTransacted() {  
        return transacted;  
    }  
  
    public void setTransacted(boolean transacted) {  
        this.transacted = transacted;  
    }  
} 

2.定义一个发送类集成上面抽象方法

package cn.base.jms;  
  
import org.apache.activemq.ActiveMQConnectionFactory;  
  
import javax.jms.*;  
  
/** 
 * @author gu.fei 
 * @version 2017-03-24 9:50 
 */  
public class OneProducer extends Queuehandler {  
    static ConnectionFactory connectionFactory = null;  
    static Connection connection = null;  
    static {  
        connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
        try {  
            connection = connectionFactory.createConnection();  
            connection.start();  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }  
  
    public OneProducer() {  
    }  
  
    @Override  
    public Object sendMessage(MessageProducer producer, Session session) {  
        try {  
            for (int i = 0; i <10 ; i++) {  
                Message message = session.createTextMessage("hello,world!" + i);  
                producer.send(message);  
            }  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
        return null;  
    }  
  
    @Override  
    public Object reciveMessage(Message message) {  
        return null;  
    }  
  
    public static void main(String[] args) {  
        OneProducer oneProducer = new OneProducer();  
        oneProducer.setConnection(connection);  
        oneProducer.doSend();  
        try {  
            connection.close();  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }  
}  

3.定义两个消费者

package cn.base.jms;  
  
import org.apache.activemq.ActiveMQConnectionFactory;  
  
import javax.jms.*;  
  
/** 
 * @author gu.fei 
 * @version 2017-03-24 9:50 
 */  
public class OneCustomer extends Queuehandler {  
  
    static ConnectionFactory connectionFactory = null;  
    static Connection connection = null;  
    static {  
        connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
        try {  
            connection = connectionFactory.createConnection();  
            connection.start();  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }  
  
    public OneCustomer() {  
    }  
  
    @Override  
    public Object sendMessage(MessageProducer producer, Session session) {  
        return null;  
    }  
  
    @Override  
    public Object reciveMessage(Message message) {  
        TextMessage text = (TextMessage)message;  
        try {  
            Thread.sleep(1000);  
            System.out.println("One接受消息:" + text.getText());  
        } catch (JMSException e) {  
            e.printStackTrace();  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
        return null;  
    }  
  
    public static void main(String[] args) {  
        OneCustomer customer = new OneCustomer();  
        customer.setConnection(connection);  
        customer.doRecive();  
        try {  
            connection.close();  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }  
}  
package cn.base.jms;  
  
import org.apache.activemq.ActiveMQConnectionFactory;  
  
import javax.jms.*;  
  
/** 
 * @author gu.fei 
 * @version 2017-03-24 9:50 
 */  
public class TwoCustomer extends Queuehandler {  
  
    static ConnectionFactory connectionFactory = null;  
    static Connection connection = null;  
    static {  
        connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
        try {  
            connection = connectionFactory.createConnection();  
            connection.start();  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }  
  
    public TwoCustomer() {  
    }  
  
    @Override  
    public Object sendMessage(MessageProducer producer, Session session) {  
        return null;  
    }  
  
    @Override  
    public Object reciveMessage(Message message) {  
        TextMessage text = (TextMessage)message;  
        try {  
            Thread.sleep(1000);  
            System.out.println("Two接受消息:" + text.getText());  
        } catch (JMSException e) {  
            e.printStackTrace();  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
        return null;  
    }  
  
    public static void main(String[] args) {  
        TwoCustomer customer = new TwoCustomer();  
        customer.setConnection(connection);  
        customer.doRecive();  
        try {  
            connection.close();  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }  
}  

先启动两个消费者,然后启动生产者

结果如下:

one:

One接受消息:hello,world!1
One接受消息:hello,world!3
One接受消息:hello,world!5
One接受消息:hello,world!7
One接受消息:hello,world!9

 

two:

Two接受消息:hello,world!0
Two接受消息:hello,world!2
Two接受消息:hello,world!4
Two接受消息:hello,world!6
Two接受消息:hello,world!8

 

java 点对点实例

标签:nal   lse   mqc   状态   system   producer   tran   tor   set   

原文地址:https://www.cnblogs.com/shouhutian/p/9097240.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!