标签:rabbitmq
public class Consumer {
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{
ConnectionFactory connFactory = new ConnectionFactory();
connFactory.setHost("localhost");
Connection connection = connFactory.newConnection();
Channel channel = connection.createChannel();
//连接队列
// channel.queueDeclare("queue.lubby.test1", false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("queue.lubby.hello", true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("receive message is " + message);
System.out.println("receive class is " + delivery.getClass());
System.out.println("receive envelope is " + delivery.getEnvelope());
System.out.println("receive properties is " + delivery.getProperties());
doWork(message);
}
// channel.close();
// connection.close();
}
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(10000);
}
}
}启动: rabbitmq-server start
关闭:rabbitmqctl stop
2)生产者代码
public class Provider {
public static void main(String[] args){
ConnectionFactory connFactory = new ConnectionFactory();
connFactory.setHost("localhost");
try {
Connection connection = connFactory.newConnection();
Channel channel = connection.createChannel();
// queue为队列名称 如果队列不存在则创建 如果队列存在,则不允许修改队列属性
//durable 为是否可持久化
//创建队列
channel.queueDeclare("queue.lubby.hello2", true, false, false, null);
// String message = getMessage(args);
String message = "3.";
//如果要持久化则 BasicProperties 必须配上
channel.basicPublish("", "queue.lubby.hello2", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("sent : " + message);
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
} 3)消费者代码
标签:rabbitmq
原文地址:http://blog.csdn.net/liu00614/article/details/41091245