码迷,mamicode.com
首页 > 编程语言 > 详细

activitemq整合spring

时间:2019-01-23 01:30:52      阅读:158      评论:0      收藏:0      [点我收藏+]

标签:snap   ogr   beans   ice   mdt   listener   code   键盘输入   cee   

技术分享图片

activitemq整合spring

一.activmq的点对点模型

技术分享图片

pom.xml:
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.demo</groupId>
    <artifactId>aq-test</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>war</packaging>

    <name>aq-test Maven Webapp</name>
    <!-- FIXME change it to the project's website -->
    <url>http://www.example.com</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>javax.jms</groupId>
            <artifactId>jms-api</artifactId>
            <version>1.1-rev-1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.14.5</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.34</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework/spring-aop -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>5.1.3.RELEASE</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.6.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-dbcp2 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-dbcp2</artifactId>
            <version>2.1.1</version>
        </dependency>

    </dependencies>

    <build>
        <finalName>aq-test</finalName>
        <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
            <plugins>
                <plugin>
                    <artifactId>maven-clean-plugin</artifactId>
                    <version>3.1.0</version>
                </plugin>
                <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging -->
                <plugin>
                    <artifactId>maven-resources-plugin</artifactId>
                    <version>3.0.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                </plugin>
                <plugin>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.22.1</version>
                </plugin>
                <plugin>
                    <artifactId>maven-war-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-install-plugin</artifactId>
                    <version>2.5.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-deploy-plugin</artifactId>
                    <version>2.8.2</version>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>
ActiviteMq.class:(发送端)
package com.demo;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

import javax.jms.*;

public class ActiviteMq {

    @Test
    public void testQueueProducer() throws JMSException {
        //1.创建connectinfactory对象,需要指定服务的IP以及端口号
        //brokerURL服务器的ip以及端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://192.168.1.20:61616"
        );

        //2.使用ConnectionFactory创建
        Connection connection = connectionFactory.createConnection();

        //3.开启链接,调用connection对象的start的方法
        connection.start();

        //4.使用connection对创建一个session对象
        //[4.1] 第一参数:是否开启事务
        //[4.2] 第二参数:当第一个参数为false的时候 才有意义 消息的应答模式
        //1.自动应答2.手动应答 一般为自动

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //第五步:使用session对象创建一个destination对象(topic,queue) 此处创建一个queue对象
        //参数:队列名称

        Queue queue = session.createQueue("test-queue2");

        //第六步 使用session创建一个producer对象
        MessageProducer producer = session.createProducer(queue);

        //第七步 创建一个message对象 创建一个textmessage对象
        TextMessage textMessage = session.createTextMessage("风风光光");

        //第八步 使用producer对象发送消息
        producer.send(textMessage);

        //第九步 关闭资源
         producer.close();
         session.close();
         connection.close();

    }
}
ReceiveMsf.class:(接收端)
package com.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

import javax.jms.*;
import java.io.IOException;

public class ReceiveMsf {

    @Test
    public void testQueueConsumer() throws JMSException, IOException {

        //1.创建connectinfactory对象,需要指定服务的IP以及端口号
        //brokerURL服务器的ip以及端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://192.168.1.20:61616"
        );

        //2.使用ConnectionFactory创建
        Connection connection = connectionFactory.createConnection();

        //3.开启链接,调用connection对象的start的方法
        connection.start();

        //4.使用connection对创建一个session对象
        //[4.1] 第一参数:是否开启事务
        //[4.2] 第二参数:当第一个参数为false的时候 才有意义 消息的应答模式
        //1.自动应答2.手动应答 一般为自动
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //第五步:使用session对象创建一个destination对象(topic,queue) 此处创建一个queue对象
        //参数:队列名称

        Queue queue = session.createQueue("test-queue2");
        // 第六步:使用Session对象创建一个Consumer对象。
        MessageConsumer consumer = session.createConsumer(queue);

        consumer.setMessageListener(
                new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        try {
                            TextMessage textMessage = (TextMessage) message;
                            String text = null;
                            //取消的内容
                            text = textMessage.getText();
                            //第八步 打印消息
                            System.out.println(text);
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
        );

        //等待键盘输入 阻塞
        System.in.read();

        //第九步 关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

二.activmq的发布订阅模型

技术分享图片

TopicProducer.class
package com.demo.dingyue;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

import javax.jms.*;

public class TopicProducer {

    @Test
    public void testTopicProducer() throws JMSException {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://192.168.1.20:61616"
        );

        Connection connection = connectionFactory.createConnection();

        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic("huaYuanBaoBao");

        MessageProducer producer = session.createProducer(topic);

        TextMessage textMessage = session.createTextMessage("这个是发布订阅的");

        producer.send(textMessage);

        producer.close();
        session.close();
        connection.close();
    }

}
TopicCustomer.class:
package com.demo.dingyue;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

import javax.jms.*;
import java.io.IOException;

public class TopicCustomer {

    @Test
    public void testTopicCustomer() throws JMSException, IOException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://192.168.1.20:61616"
        );


        Connection connection = connectionFactory.createConnection();

        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic("huaYuanBaoBao");


        MessageConsumer consumer = session.createConsumer(topic);

        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try{

               TextMessage textMessage = (TextMessage) message;
               String text = null;
               //取出消息的内容
             text= textMessage.getText();

                    System.out.println(text);


                }catch (Exception e){

                    e.printStackTrace();
                }
            }
        });

        System.out.println("消费端03");
        System.in.read();

        //关闭资源
        connection.close();
        consumer.close();
        session.close();

    }
}

和Spring整合:

spring-amq.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd
        http://www.springframework.org/schema/aop
        http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

    <context:component-scan base-package="com.demo.spring"/>

    <bean id="amqSenderService" class="com.demo.spring.AMQSenderServiceImpl">
    <!--<bean id="user" class="com.demo.spring.User">-->
    </bean>

    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
          destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="trustAllPackages" value="true"/>
                <property name="brokerURL">
                    <value>tcp://192.168.1.20:61616</value>
                </property>
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>

    <!--使用缓存可以提升效率-->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="jmsFactory"/>
        <property name="sessionCacheSize" value="1"/>
    </bean>

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>

    <!--测试Queue,队列的名字是spring-queue-->
    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <!--<constructor-arg index="0" value="spring-queue"/>-->
        <constructor-arg name="name" value="spring-queue"/>
    </bean>

    <!--测试Topic-->
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="spring-topic"/>
    </bean>

</beans>
AMQSenderServiceImpl:
package com.demo.spring;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

@Service
public class AMQSenderServiceImpl  {

    private static final Logger logger = LoggerFactory.getLogger(AMQSenderServiceImpl.class);

    @Resource(name = "jmsTemplate")
    private JmsTemplate jmsTemplate;

    //目的地队列的明证,我们要向这个队列发送消息
    @Resource(name = "destinationQueue")
    private Destination destination;

    //向特定的队列发送消息
    public void sendMsg(final User user) {
//        final String msg = JSON.toJSONString(mqParamDto);
        user.setEmail("javaceshi@aa.com");
        user.setPassword("123456");
        user.setPhone("123456");
        user.setSex("M");
        user.setUsername("javaceshi");

        try {
            logger.info("将要向队列{}发送的消息msg:{}", destination, user);
            jmsTemplate.send(destination, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
//                    return session.createObjectMessage(user);
                    return  session.createTextMessage("2019/1/18message");
                }
            });

        } catch (Exception ex) {
            logger.error("向队列{}发送消息失败,消息为:{}", destination, user);
        }

    }
}
AMQReceiverServiceImpl:
package com.demo.spring;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

@Service
public class AMQReceiverServiceImpl {
    private static final Logger logger = LoggerFactory.getLogger(AMQSenderServiceImpl.class);

    @Resource(name = "jmsTemplate")
    private JmsTemplate jmsTemplate;

    //目的地队列的明证,我们要向这个队列接收消息
    @Resource(name = "destinationQueue")
    private Destination destination;

    //向特定的队列接收消息
    public void receiverMsg(final User user) {
//

        try {
            Object object = jmsTemplate.receive(destination);
            User msg = (User) object;
            System.out.println(msg);

        } catch (Exception ex) {
            ex.printStackTrace();
        }

    }
}

测试类:App

package com.demo.spring;

import com.demo.spring.User;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 主发送类
 *
 */
public class App
{
    public static void main( String[] args )
    {
        final  User user = new User();
        user.setEmail("javaceshi@aa.com");
        user.setPassword("123456");
        user.setPhone("123456");
        user.setSex("M");
        user.setUsername("javaceshi");

        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-amq.xml");
        AMQSenderServiceImpl sendService = (AMQSenderServiceImpl)context.getBean("amqSenderService");
        sendService.sendMsg(user);
//        sendService.send(user);
        System.out.println("send successfully, please visit http://192.168.1.20:8161/admin to see it");
    }
}

activitemq整合spring

标签:snap   ogr   beans   ice   mdt   listener   code   键盘输入   cee   

原文地址:https://www.cnblogs.com/charlypage/p/10306801.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!