码迷,mamicode.com
首页 > 编程语言 > 详细

Java之多线程中的Master-Worker模式

时间:2016-08-20 16:02:18      阅读:204      评论:0      收藏:0      [点我收藏+]

标签:

该模式的好处是,将大任务拆解成若干小任务并并行执行,从而提高系统吞吐量。

定义Worker进程,负责处理实际任务。
/*具体工作对象*/
static abstract class Worker<T, R> implements Runnable {
private static final UtilsLog lg = UtilsLog.getLogger(Worker.class);
protected Queue<T> workQueue;//持有Master的任务队列
protected Map<String, R> resultMap;//用于存储结果集,key为任务对应的唯一标识符

public void setWorkQueue(Queue<T> workQueue) {
this.workQueue = workQueue;
}

public void setResultMap(Map<String, R> resultMap) {
this.resultMap = resultMap;
}

public abstract R handler(T entity);

@Override
public void run() {
while (true) {
T childWork = workQueue.poll();
if (childWork == null) {
lg.e("已经没有任务在队列中等待执行");
break;
}
//处理子任务
R result = handler(childWork);
resultMap.put(Integer.toString(childWork.hashCode()), result);
}
}
}
义Master进程,负责接收和分配任务。Master会在提交任务的同时立即返回结果集,由于此处的getResultMap属于引用传递,因此属性resultMap的修改会同步至业务层。
public static class Master<T, R> {
private static final UtilsLog lg = UtilsLog.getLogger(Master.class);
protected Queue<T> workQueue;//用于存储任务集
protected Map<String, Thread> threadMap;//存储执行任务的线程集
protected Map<String, R> resultMap;//存储相关结果

@TargetApi(Build.VERSION_CODES.LOLLIPOP)
public Master(Worker<T, R> work, int threadCount) {
workQueue = new ConcurrentLinkedDeque<T>();
threadMap = new HashMap<>();
resultMap = new HashMap<>();

work.setWorkQueue(workQueue);
work.setResultMap(resultMap);
for (int i = 0; i < threadCount; i++) {
threadMap.put(Integer.toString(i), new Thread(work, "thread tag with " + Integer.toString(i)));
}
}

//是否所有的子任务都结束了
public boolean isComplete() {
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
if (entry.getValue().getState() != Thread.State.TERMINATED) {
return false;
}
}
return true;
}

public Map<String, R> getResultMap() {
return resultMap;
}

public Master addJob(T job) {
workQueue.add(job);
return this;
}

public void execute() {
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
entry.getValue().start();
}
}
}
业务层调用方案如下,
        Master<Integer, Integer> master = new Master<>(new Worker<Integer, Integer>() {
@Override
public Integer handler(Integer entity) {
int max = 50, min = 0;
UtilsThread.sleepIgnoreInteruptedException(new Random().nextInt(max) % (max - min + 1) + min);//随机模拟耗时操作
lg.e("执行handler程序 with value:" + entity);
return entity * entity;
}
}, 3);
int jobCount = 10;//任务数
for (int i = 0; i < jobCount; i++) {
master.addJob(i);
}
master.execute();

Map<String, Integer> resultMap = master.getResultMap();
while (true) {
int resultMapSize = resultMap.size();
// lg.e("并行执行结果集中已有数据量:" + resultMapSize);//此处resultMap持有Master中结果集的引用,因此在线程不断执行的过程中不断刷新结果姐,会连带导致这里值的改变
if (master.isComplete()) {
break;
}
}
执行结果如下,
技术分享
另外,也应注意到,在执行handler方式,到结果集中写入数据是有延时的,这在开发中需要格外注意,务必使用master.isComplete判断任务完成状况
技术分享






Java之多线程中的Master-Worker模式

标签:

原文地址:http://www.cnblogs.com/linux007/p/5790490.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!