码迷,mamicode.com
首页 > 编程语言 > 详细

java线程学习

时间:2020-07-14 16:20:51      阅读:63      评论:0      收藏:0      [点我收藏+]

标签:executors   throw   lock   add   find   public   get   string   提交   

//线程池创建,线程池提交任务使用Callable

int corePoolSize = 3;
int maximumPoolSize = Runtime.getRuntime().availableProcessors() * 2;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512);
RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy(); //什么也不做,直接忽略
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS, queue, policy);
submitTisks(gatherDate, hbaseConfig, corePoolSize, threadPoolExecutor);

/************************************使用Callable接口**************************************/
    /**
     * 提交任务
     * @param gatherDate
     * @param hbaseConfig
     * @param corePoolSize
     * @param threadPoolExecutor
     */
    private static void submitTisks(String gatherDate, Configuration hbaseConfig, int corePoolSize, ExecutorService threadPoolExecutor) {
        List<Future<Boolean>> taskFutureList = new ArrayList<>(corePoolSize);
        for (int i = 0; i < corePoolSize; i++) {
            // 提交任务,任务的执行由线程池去调用执行并管理。
            // 这里获取结果任务的Future,并放到list中,供所有任务提交完后,通过每个任务的Future判断执行状态和结果。
            Future<Boolean> gpsfuture = threadPoolExecutor.submit(new GPSTask(hbaseConfig, gatherDate, xikangTidList, rootPath));
            taskFutureList.add(gpsfuture);
        }
        int done = 0; //完成任务的数量
        while (!taskFutureList.isEmpty()) {
            Iterator<Future<Boolean>> iter = taskFutureList.iterator();
            while (iter.hasNext()) {
                Future<Boolean> fut = iter.next();
                if (fut.isDone()) {
                    try{
                        Boolean flag = fut.get();
                        if (flag){  done++;}
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                    iter.remove();
                }
            }
            // 停留一会,避免一直循环。
            try {
                Thread.sleep(1000L);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }

    /**
     * gps线程
     */
    static class GPSTask implements Callable<Boolean>  {
        Configuration config;
        String recordDate;
        List<Long> xkDatanoList;
        String rootPath;
        Thread currentThread;
        public GPSTask(Configuration config, String recordDate, List<Long> xkDatanoList,String rootPath) {
            this.config = config;
            this.recordDate = recordDate;
            this.xkDatanoList = xkDatanoList;
            this.rootPath = rootPath;
        }
        @Override
        public Boolean call() throws Exception {
            this.currentThread = Thread.currentThread();
            return FindDataUtil.getData4HbaseGPSSQ(config, recordDate, xkDatanoList,rootPath,currentThread);
        }
    }

//线程池创建,线程池提交任务使用Thread

int corePoolSize = 3;
int maximumPoolSize = Runtime.getRuntime().availableProcessors() * 2;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512);
RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy(); //什么也不做,直接忽略
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS, queue, policy);
GPSRunTask gpsRunTask = new GPSRunTask(hbaseConfig, gatherDate, xikangTidList, rootPath);
        threadPoolExecutor.execute(gpsRunTask);
try {
            gpsRunTask.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 // 等待已提交的任务全部结束 不再接受新的任务
        threadPoolExecutor.shutdown();
 /**
    * gps线程
    */
  static class GPSRunTask extends Thread  {
    Configuration config;
    String recordDate;
    List<Long> xkDatanoList;
    String rootPath;
    Thread currentThread;
    public GPSRunTask(Configuration config, String recordDate, List<Long> xkDatanoList,String rootPath) {
        this.config = config;
        this.recordDate = recordDate;
        this.xkDatanoList = xkDatanoList;
        this.rootPath = rootPath;
    }
    @Override
    public void run(){
        this.currentThread = Thread.currentThread();
        //logger.info("gps线程:"+currentThread.getName());
        FindDataUtil.getData4HbaseGPSSQ(config, recordDate, xkDatanoList,rootPath,currentThread);
    }
  }

 

java线程学习

标签:executors   throw   lock   add   find   public   get   string   提交   

原文地址:https://www.cnblogs.com/zyanrong/p/13299641.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!