标签:arrayblockingqueue concurrenthashmap
package com;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
/**
* MQ
* @author pengbo
*
*/
public class QueueManager {
public static final int MAX_QUEUE_SIZE = 200;
//创建一个Map缓存 用来存储 BlockingQueued队列
public static Map<QueueType, ArrayBlockingQueue<Object>> queueMap = new ConcurrentHashMap<QueueManager.QueueType, ArrayBlockingQueue<Object>>();
//创建一个endMap缓存 用来存储 读取文件的最后一段信息 用来标记 当前文档结束
public static Map<QueueType, ArrayBlockingQueue<Object>> queueEndMap = new ConcurrentHashMap<QueueManager.QueueType, ArrayBlockingQueue<Object>>();
//创建一个 状态缓存 value 存储的是 读取文件的数量
public static Map<QueueType, Integer> m = new HashMap<QueueType, Integer>();
public static enum QueueType {
INFO, //txt堆栈的Key
INFOEND, //txt文件 结束堆栈的Key
SET, //加入堆栈的状态的Key
GET //从堆栈取出信息的状态的Key
}
/**
* 初始化堆栈 将空的Q 和空的状态设置到缓存中
*/
public static void init() {
ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(MAX_QUEUE_SIZE);
ArrayBlockingQueue<Object> queueEnd = new ArrayBlockingQueue<Object>(MAX_QUEUE_SIZE);
queueMap.put(QueueType.INFO,queue);
queueEndMap.put(QueueType.INFOEND,queueEnd);
m.put(QueueType.SET,0);
m.put(QueueType.GET,0);
}
/**
* 将信息推送到MQ中 存在缓存中
* @param type Map的Key
* @param obj 需要推进去的信息
* @throws InterruptedException
*/
public static void startPut(QueueType type, Object obj) throws InterruptedException {
ArrayBlockingQueue<Object> queue = queueMap.get(type);
if (queue != null)
queue.put(obj);
}
/**
* 将获取文件最末位的信息推送到 endMQ中 存在缓存中
* @param type Map的Key
* @param obj 需要推进去的信息
* @throws InterruptedException
*/
public static void endPut(QueueType type, Object obj) throws InterruptedException {
ArrayBlockingQueue<Object> queue = queueEndMap.get(type);
if (queue != null)
queue.put(obj);
}
/**
* 从缓存中 取走 Q中首个信息 如果队列是空的 返回 null
* @param type
* @return
*/
public static Object startPoll(QueueType type) {
ArrayBlockingQueue<Object> queue = queueMap.get(type);
Object o = null;
if (queue != null) {
o = queue.poll();
}
return o;
}
/**
* 从缓存中 取走 endQ中首个信息 如果队列是空的 返回 null
* @param type
* @return
*/
public static Object endPoll(QueueType type) {
ArrayBlockingQueue<Object> queue = queueEndMap.get(type);
Object o = null;
if (queue != null) {
o = queue.poll();
}
return o;
}
/**
* 在缓存中 查看Q中首个信息 如果队列是空的 返回 null
* @param type
* @return
*/
public static Object startPeek(QueueType type) {
ArrayBlockingQueue<Object> queue = queueMap.get(type);
Object o = null;
if (queue != null) {
o = queue.peek();
}
return o;
}
/**
* 从缓存中 查看 endQ中首个信息 如果队列是空的 返回 null
* @param type
* @return
*/
public static Object endPeek(QueueType type) {
ArrayBlockingQueue<Object> queue = queueEndMap.get(type);
Object o = null;
if (queue != null) {
o = queue.peek();
}
return o;
}
/**
* 结束标识 true:结束 false 未结束
* @param i 需要操作的文件总数
* @return
*/
public static boolean over(int i){
boolean tf =false;
if (QueueManager.m.get(QueueType.SET) == i
&& QueueManager.m.get(QueueType.GET) == i
&& QueueManager.startPeek(QueueType.INFO) == null
&& QueueManager.endPeek(QueueType.INFOEND) == null) {
tf = true;
}
return tf;
}
}标签:arrayblockingqueue concurrenthashmap
原文地址:http://jueshizhanhun.blog.51cto.com/4372226/1441529