标签:多线程 blockingqueue arrayblockingqueue xmpp
在研究Smack的源代码的时候,我对它的连接Connection以及派生类XMPPConnection的关注是最多的,因为一个即时通信程序,它的网络模块必是它的核心。而我很在乎它是如何实现的。
在收发数据包的时候,我看到了队列的身影。BlockingQueue和ArrayBlockingQueue。所以,我觉得用到什么然后去查阅,去记录,这种方法是比较高效率的。
BlockingQueue是在Java的新的Concurrent包中的。
Reference:
http://www.cnblogs.com/jackyuj/archive/2010/11/24/1886553.html
在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。

如上图所示:当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author jackyuj
*/
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
// 声明一个容量为10的缓存队列
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer = new Consumer(queue);
// 借助Executors
ExecutorService service = Executors.newCachedThreadPool();
// 启动线程
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer);
// 执行10s
Thread.sleep(10 * 1000);
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(2000);
// 退出Executor
service.shutdown();
}
}import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 消费者线程
*
* @author jackyuj
*/
public class Consumer implements Runnable {
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
System.out.println("启动消费者线程!");
Random r = new Random();
boolean isRunning = true;
try {
while (isRunning) {
System.out.println("正从队列获取数据...");
String data = queue.poll(2, TimeUnit.SECONDS);
if (null != data) {
System.out.println("拿到数据:" + data);
System.out.println("正在消费数据:" + data);
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
} else {
// 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
isRunning = false;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出消费者线程!");
}
}
private BlockingQueue<String> queue;
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
}
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 生产者线程
*
* @author jackyuj
*/
public class Producer implements Runnable {
public Producer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
String data = null;
Random r = new Random();
System.out.println("启动生产者线程!");
try {
while (isRunning) {
System.out.println("正在生产数据...");
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
data = "data:" + count.incrementAndGet();
System.out.println("将数据:" + data + "放入队列...");
if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
System.out.println("放入数据失败:" + data);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出生产者线程!");
}
}
public void stop() {
isRunning = false;
}
private volatile boolean isRunning = true;
private BlockingQueue queue;
private static AtomicInteger count = new AtomicInteger();
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
}package org.jivesoftware.smack;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.packet.Packet;
/**
* Provides a mechanism to collect packets into a result queue that pass a
* specified filter. The collector lets you perform blocking and polling
* operations on the result queue. So, a PacketCollector is more suitable to
* use than a {@link PacketListener} when you need to wait for a specific
* result.<p>
*
* Each packet collector will queue up a configured number of packets for processing before
* older packets are automatically dropped. The default number is retrieved by
* {@link SmackConfiguration#getPacketCollectorSize()}.
*
* @see Connection#createPacketCollector(PacketFilter)
* @author Matt Tucker
*/
public class PacketCollector {
private PacketFilter packetFilter;
private ArrayBlockingQueue<Packet> resultQueue;
private Connection connection;
private boolean cancelled = false;
/**
* Creates a new packet collector. If the packet filter is <tt>null</tt>, then
* all packets will match this collector.
*
* @param conection the connection the collector is tied to.
* @param packetFilter determines which packets will be returned by this collector.
*/
protected PacketCollector(Connection conection, PacketFilter packetFilter) {
this(conection, packetFilter, SmackConfiguration.getPacketCollectorSize());
}
/**
* Creates a new packet collector. If the packet filter is <tt>null</tt>, then
* all packets will match this collector.
*
* @param conection the connection the collector is tied to.
* @param packetFilter determines which packets will be returned by this collector.
* @param maxSize the maximum number of packets that will be stored in the collector.
*/
protected PacketCollector(Connection conection, PacketFilter packetFilter, int maxSize) {
this.connection = conection;
this.packetFilter = packetFilter;
this.resultQueue = new ArrayBlockingQueue<Packet>(maxSize);
}
/**
* Explicitly cancels the packet collector so that no more results are
* queued up. Once a packet collector has been cancelled, it cannot be
* re-enabled. Instead, a new packet collector must be created.
*/
public void cancel() {
// If the packet collector has already been cancelled, do nothing.
if (!cancelled) {
cancelled = true;
connection.removePacketCollector(this);
}
}
/**
* Returns the packet filter associated with this packet collector. The packet
* filter is used to determine what packets are queued as results.
*
* @return the packet filter.
*/
public PacketFilter getPacketFilter() {
return packetFilter;
}
/**
* Polls to see if a packet is currently available and returns it, or
* immediately returns <tt>null</tt> if no packets are currently in the
* result queue.
*
* @return the next packet result, or <tt>null</tt> if there are no more
* results.
*/
public Packet pollResult() {
return resultQueue.poll();
}
/**
* Returns the next available packet. The method call will block (not return)
* until a packet is available.
*
* @return the next available packet.
*/
public Packet nextResult() {
try {
return resultQueue.take();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* Returns the next available packet. The method call will block (not return)
* until a packet is available or the <tt>timeout</tt> has elapased. If the
* timeout elapses without a result, <tt>null</tt> will be returned.
*
* @param timeout the amount of time to wait for the next packet (in milleseconds).
* @return the next available packet.
*/
public Packet nextResult(long timeout) {
try {
return resultQueue.poll(timeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* Processes a packet to see if it meets the criteria for this packet collector.
* If so, the packet is added to the result queue.
*
* @param packet the packet to process.
*/
protected void processPacket(Packet packet) {
if (packet == null) {
return;
}
if (packetFilter == null || packetFilter.accept(packet)) {
while (!resultQueue.offer(packet)) {
// Since we know the queue is full, this poll should never actually block.
resultQueue.poll();
}
}
}
}
标签:多线程 blockingqueue arrayblockingqueue xmpp
原文地址:http://blog.csdn.net/minimicall/article/details/39028045