标签:
public abstract class AbsBatchProcess<T> implements Runnable { // 记录条数 protected int recordNum; // 空闲时间(ms) protected long idleTime; // 轮询间隔 protected long interval; // 是否停止 private boolean stop; // 数据 private Queue<T> data; // 上次时间(ms) private long lastTime; // 监听 private SecScanf scanf; public AbsBatchProcess() { recordNum = 10; idleTime = 5000; interval = 1000; stop = false; data = new ConcurrentLinkedQueue<T>(); lastTime = System.currentTimeMillis(); scanf = new SecScanf(); } /** * 开始 */ public final void start(){ stop = false; scanf.stop = false; new Thread(this).start(); new Thread(scanf).start(); } /** * 终止 */ public final void stop(){ stop = true; scanf.stop = true; } /* (non-Javadoc) * @see java.lang.Runnable#run() */ @Override public final void run() { while(!stop){ synchronized (data) { if(data.size() <= 0){ try { data.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } T entity = null; while((entity = data.poll()) != null){ process(entity); } lastTime = System.currentTimeMillis(); } } /** * 处理实现 * @param entity */ protected abstract void process(T entity); /** * 添加要处理的数据 * @param entity */ public final void batch(T... entity){ if(entity != null){ data.addAll(Arrays.asList(entity)); } } private class SecScanf implements Runnable { boolean stop = false; @Override public void run() { while(!stop){ long current = System.currentTimeMillis(); if(data.size() >= recordNum || current - lastTime > idleTime){ synchronized (data) { data.notify(); } } try { Thread.sleep(interval); } catch (InterruptedException e) { e.printStackTrace(); } } } }}标签:
原文地址:http://www.cnblogs.com/xiongjianjunjava/p/4344911.html