一、linux下安装rabbitmq
1、安装erlang环境
wget http://erlang.org/download/otp_src_18.2.1.tar.gz tar xvfz otp_src_18.2.1.tar.gz cd otp_src_18.2.1 ./configure make install
2、安装RabbitMQ
wget http://www.rabbitmq.com/releases/rabbitmq-server/vx.x.x/rabbitmq-server-generic-unix-x.x.x.tar.xz //xy文件压缩工具 yum install xz //解压 xz -d rabbitmq-server-generic-unix-x.x.x.tar.xz tar -xvf rabbitmq-server-generic-unix-x.x.x.tar //将其移动至/usr/local/下 按自己习惯 cp -r rabbitmq_server-x.x.x /usr/local/rabbitmq //改变环境变量 vi /etc/profile export PATH=/usr/local/rabbitmq/sbin:$PATH source /etc/profile //启用MQ管理方式 rabbitmq-plugins enable rabbitmq_management #启动后台管理 rabbitmq-server -detached #后台运行rabbitmq //设置端口号 可供外部使用 iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
3、添加用户和权限
//添加用户 rabbitmqctl add_user admin admin //添加权限 rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" //添加用户角色 rabbitmqctl set_user_tags admin administrator
二、Spring mvc整合RabbitMQ
1、添加pom.xml依赖jar包
<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.7.5.RELEASE</version>
        </dependency>
2、添加配置applicationContext.xml
<!--配置rabbitmq开始-->
    <bean id="connectionFactoryMq" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="192.168.181.201"/>
        <property name="username" value="admin"/>
        <property name="password" value="admin"/>
        <property name="host" value="192.168.181.201"/>
        <property name="port" value="5672"/>
    </bean>
    <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
        <constructor-arg ref="connectionFactoryMq"/>
    </bean>
    <!--创建rabbitTemplate消息模板类-->
    <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
        <constructor-arg ref="connectionFactoryMq"/>
    </bean>
   <!--创建消息转换器为SimpleMessageConverter-->
    <bean id="serializerMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter">
    </bean>
    <!--创建持久化的队列-->
    <bean  id="queue" class="org.springframework.amqp.core.Queue">
        <constructor-arg index="0" value="testQueue"></constructor-arg>
        <constructor-arg index="1" value="true"></constructor-arg>
        <constructor-arg index="2" value="false"></constructor-arg>
        <constructor-arg index="3" value="true"></constructor-arg>
    </bean>
    <!--创建交换器的类型 并持久化-->
    <bean id="topicExchange" class="org.springframework.amqp.core.TopicExchange">
        <constructor-arg index="0" value="testExchange"></constructor-arg>
        <constructor-arg index="1" value="true"></constructor-arg>
        <constructor-arg index="2" value="false"></constructor-arg>
    </bean>
    <util:map id="arguments">
    </util:map>
    <!--绑定交换器 队列-->
    <bean id="binding" class="org.springframework.amqp.core.Binding">
        <constructor-arg index="0" value="testQueue"></constructor-arg>
        <constructor-arg index="1" value="QUEUE"></constructor-arg>
        <constructor-arg index="2" value="testExchange"></constructor-arg>
        <constructor-arg index="3" value="testQueue"></constructor-arg>
        <constructor-arg index="4" value="#{arguments}"></constructor-arg>
    </bean>
    <!--用于接收消息的处理类-->
    <bean id="rqmConsumer" class="com.slp.mq.RmqConsumer"></bean>
    <bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
        <constructor-arg ref="rqmConsumer" />
        <property name="defaultListenerMethod" value="rmqProducerMessage"></property>
        <property name="messageConverter" ref="serializerMessageConverter"></property>
    </bean>
    <!-- 用于消息的监听的容器类SimpleMessageListenerContainer,监听队列  queues可以传多个-->
    <bean id="listenerContainer"  class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
        <property name="queues" ref="queue"></property>
        <property name="connectionFactory" ref="connectionFactoryMq"></property>
        <property name="messageListener" ref="messageListenerAdapter"></property>
    </bean>
    <bean id="rmqProducer" class="com.slp.mq.RmqProducer"></bean>
    <!--配置rabbitmq结束-->
3、消息实体类
package com.slp.mq;
import java.io.*;
/**
 * @author sanglp
 * @create 2018-02-06 14:00
 * @desc rabbit消息类
 **/
public class RabbitMessage implements Serializable {
    /**
     * 参数类型
     */
    private Class<?>[] paramTypes ;
    /**
     *  交换器
     */
    private String exchange;
    private Object[] params;
    /**
     * 路由key
     */
    private String routekey;
    public RabbitMessage() {
    }
    public RabbitMessage(String exchange,  String routekey,Object...params) {
        this.exchange = exchange;
        this.params = params;
        this.routekey = routekey;
    }
    @SuppressWarnings("rawtypes")
    public RabbitMessage(String exchange,String routeKey,String methodName,Object...params)
    {
        this.params=params;
        this.exchange=exchange;
        this.routekey=routeKey;
        int len=params.length;
        Class[] clazzArray=new Class[len];
        for(int i=0;i<len;i++) {
            clazzArray[i] = params[i].getClass();
        }
        this.paramTypes=clazzArray;
    }
    public byte[] getSerialBytes(){
        byte[] res = new byte[0];
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       ObjectOutput oos ;
       try {
           oos = new ObjectOutputStream(baos);
           oos.writeObject(this);
           oos.close();
           res = baos.toByteArray();
       } catch (IOException e) {
           e.printStackTrace();
       }
       return res;
   }
    public Class<?>[] getParamTypes() {
        return paramTypes;
    }
    public void setParamTypes(Class<?>[] paramTypes) {
        this.paramTypes = paramTypes;
    }
    public String getExchange() {
        return exchange;
    }
    public void setExchange(String exchange) {
        this.exchange = exchange;
    }
    public Object[] getParams() {
        return params;
    }
    public void setParams(Object[] params) {
        this.params = params;
    }
    public String getRoutekey() {
        return routekey;
    }
    public void setRoutekey(String routekey) {
        this.routekey = routekey;
    }
}
4、生产者
package com.slp.mq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import javax.annotation.Resource;
/**
 * @author sanglp
 * @create 2018-02-06 14:19
 * @desc 生产者
 **/
public class RmqProducer {
    @Resource
    private RabbitTemplate rabbitTemplate;
    /**
     * 发送信息
     * @param msg
     */
    public void sendMessage(RabbitMessage msg){
        System.out.println(rabbitTemplate.getConnectionFactory().getHost());
        System.out.println(rabbitTemplate.getConnectionFactory().getPort());
        System.out.println("msg"+msg);
        rabbitTemplate.convertAndSend(msg.getExchange(),msg.getRoutekey(),msg);
        System.out.println("发送完成");
    }
}
5、消费者
package com.slp.mq;
/**
 * @author sanglp
 * @create 2018-02-06 14:23
 * @desc 消费者
 **/
public class RmqConsumer {
    public void rmqProducerMessage(Object object){
        System.out.println("消费前");
        RabbitMessage rabbitMessage = (RabbitMessage) object;
        System.out.println(rabbitMessage.getExchange());
        System.out.println(rabbitMessage.getRoutekey());
        System.out.println(rabbitMessage.getParams().toString());
    }
}
6、测试类
package com.slp;
import com.slp.mq.RabbitMessage;
import com.slp.mq.RmqConsumer;
import com.slp.mq.RmqProducer;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import java.util.HashMap;
import java.util.Map;
/**
 * @author sanglp
 * @create 2018-02-06 14:36
 * @desc mq测试类
 **/
public class MqTest {
    private RmqProducer rmqProducer ;
    private RmqConsumer rqmConsumer ;
    @Before
    public void setUp() throws Exception {
        //ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("D:/web-back/web-back/myweb/web/WEB-INF/applicationContext.xml");
        //context.start();
        String path="web/WEB-INF/applicationContext.xml";
        ApplicationContext context = new FileSystemXmlApplicationContext(path);
        rmqProducer = (RmqProducer) context.getBean("rmqProducer");
        rqmConsumer = (RmqConsumer)context.getBean("rqmConsumer");
    }
    @Test
    public void test(){
        String exchange = "testExchange";
        String routeKey ="testQueue";
        String methodName = "test";
       //参数
        for (int i=0;i<10;i++){
            Map<String,Object> param=new HashMap<String, Object>();
            param.put("data","hello");
            RabbitMessage  msg=new RabbitMessage(exchange,routeKey, methodName, param);
            //发送消息
            rmqProducer.sendMessage(msg);
        }
       // rqmConsumer.rmqProducerMessage(msg);
    }
}
运行结果:
没有开启消费者之前:
 