码迷,mamicode.com
首页 > 其他好文 > 详细

rabbitmq演示代码

时间:2018-08-23 19:20:57      阅读:155      评论:0      收藏:0      [点我收藏+]

标签:byte   sicp   bool   factory   设置   author   work   persist   rod   

简单使用:

package com.imooc.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* Producer代码 服务端-生产者
* @author cxsz-hp16
* @Title: Sender
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2315:02
*/
public class Sender {
//消息名
private final static String QUEUE_NAME = "MyQueue";

public static void main(String[] args) {
send();
}
//发送消息
public static void send(){
//创建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
//设置参数
factory.setHost("localhost");
//创建连接
connection = factory.newConnection();
//创建管道
channel = connection.createChannel();
/**
* 队列声明
* 参数:queue:队列名、durable:是否持久化、exclusive:是否排外、arguments:设置队列消息什么时候被删除
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//设置消息内容
String message = "my first message";
/**
*
* 参数:
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("已发消息:"+message);
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //关闭资源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}
}

package com.imooc.cusumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
* 客户端-消费者
* @author cxsz-hp16
* @Title: Receiver
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2315:31
*/
public class Receiver {
private final static String QUEUE_NAME = "MyQueue";

public static void main(String[] args) {
receiver();
}
public static void receiver(){
//创建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//回调消费消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received ‘" + message + "‘");
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //关闭资源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}
}

package com.imooc.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
* @author cxsz-hp16
* @Title: NewTask
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2316:30
*/
public class NewTask {
//消息名
private final static String QUEUE_NAME = "newTask";

public static void main(String[] args) {
send();
}
//发送消息
public static void send(){
//创建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
//设置参数
factory.setHost("localhost");
//创建连接
connection = factory.newConnection();
//创建管道
channel = connection.createChannel();

boolean durable = true;
channel.queueDeclare(QUEUE_NAME,durable,true,false,null);
//设置消息内容
String message = "2.";
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
System.out.println(" [x] Sent ‘" + message + "‘");
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //关闭资源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}
}

package com.imooc.cusumer;

import com.rabbitmq.client.*;
import org.junit.jupiter.api.Test;

import java.io.IOException;

/**
* @author cxsz-hp16
* @Title: Worker
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2316:31
*/
public class Worker {
private final static String QUEUE_NAME = "newTask";

public static void main(String[] args) {
receiver();
}
public static void receiver(){
//创建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//回调消费消息
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received ‘" + message + "‘");
try {
doWork(message);//设置一个任务
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[x] Done");
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,false,consumer);
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //关闭资源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}

/**
* 任务
* @param task
* @throws InterruptedException
*/
private static void doWork(String task) throws InterruptedException {
//将字符串转换为字符数组
for (char ch: task.toCharArray()) {
//当值为.时,阻塞线程来达到耗时的目的
if (ch == ‘.‘){
Thread.sleep(1000);
}
}
}


}



package com.imooc.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
* @author cxsz-hp16
* @Title: NewTask
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2316:30
*/
public class NewTask23 {
//消息名
private final static String QUEUE_NAME = "task_queue";

public static void main(String[] args) {
send();
}
//发送消息
public static void send(){
//创建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
//设置参数
factory.setHost("localhost");
//创建连接
connection = factory.newConnection();
//创建管道
channel = connection.createChannel();

boolean durable = true;
channel.queueDeclare(QUEUE_NAME,true,true,false,null);
//设置消息内容
String message = "2.";
channel.basicPublish("",QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN
,message.getBytes("UTF-8"));
System.out.println(" [x] Sent ‘" + message + "‘");
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //关闭资源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}
}

package com.imooc.cusumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
* @author cxsz-hp16
* @Title: Worker
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2316:31
*/
public class Worker3 {
private final static String QUEUE_NAME = "task_queue";

public static void main(String[] args) {
receiver();
}
public static void receiver(){
//创建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,true,false,false,null);

channel.basicQos(1);
//回调消费消息
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");

System.out.println(" [x] Received ‘" + message + "‘");
try {
doWork(message);//设置一个任务
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[x] Done");
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,false,consumer);
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //关闭资源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}

/**
* 任务
* @param task
* @throws InterruptedException
*/
private static void doWork(String task) throws InterruptedException {
//将字符串转换为字符数组
for (char ch: task.toCharArray()) {
//当值为.时,阻塞线程来达到耗时的目的
if (ch == ‘.‘){
Thread.sleep(1000);
}
}
}


}





rabbitmq演示代码

标签:byte   sicp   bool   factory   设置   author   work   persist   rod   

原文地址:https://www.cnblogs.com/zhangbLearn/p/9525201.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!