码迷,mamicode.com
首页 > 编程语言 > 详细

javase_并发库

时间:2015-09-16 20:05:56      阅读:231      评论:0      收藏:0      [点我收藏+]

标签:

一、传统线程技术

public static void main(String[] args) {
    Thread thread = new Thread(){
        @Override
        public void run() {
            while(true){
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("1:" + Thread.currentThread().getName());
            }
        }
    };
    thread.start();
    
    Thread thread2 = new Thread(new Runnable(){
        @Override
        public void run() {
            while(true){
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("1:" + Thread.currentThread().getName());
            }                
        }
    });
    thread2.start();
}

二、传统定时器技术

一种工具,线程用其安排以后在后台线程中执行任务,可安排任务执行一次,或者定期重复执行,与每个 Timer 对象相对应的是单个后台线程,用于顺序地执行所有计时器任务.

public static void main(String[] args) throws InterruptedException {//下面两个会同时执行,Timer也是一个线程
        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("bombing!");
            }
        }, 10000,1000);      //10s之后开始执行任务,之后每一秒执行一次
        new Thread(){
            @Override
            public void run() {
                while(true){
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {}
                    System.out.println("wangwei");
                }
            }
        }.start();
}

三、传统线程互斥技术

class Outputer{               //共享资源封装成类
    public  void output(String name){
        int len = name.length();
        synchronized (Outputer.class) {
            for(int i=0;i<len;i++){
                System.out.print(name.charAt(i));
            }
            System.out.println();
        }
    }
    public  synchronized void output2(String name){
        int len = name.length();
        for(int i=0;i<len;i++){
            System.out.print(name.charAt(i));
        }
        System.out.println();
    }
}

public class TraditionalThreadSynchronized {
    Outputer outputer = new Outputer();
    
    public static void main(String[] args) {
        new TraditionalThreadSynchronized().init();
    }
    
    private void init(){
        new Thread(new Runnable(){
            public void run() {
                while(true)
                    outputer.output("zhangxiaoxiang");
            }
        }).start();
        
        new Thread(new Runnable(){
            public void run() {
                while(true)
                    outputer.output("lihuoming");
            }
        }).start();
    }
}

四、传统线程同步通信技术

例:一个线程打印打印100次,另一个打印10次,两个线程交替打印50次.

public class TraditionalThreadCommunication {
    public static void main(String[] args) {
        final Business business = new Business();
        new Thread(new Runnable() {
                @Override
                public void run() {
                    for(int i=1;i<=50;i++){
                        business.sub(i);
                    }
                }
        }).start();
        for(int i=1;i<=50;i++){
            business.main(i);
        }
    }
}
  class Business {
      private boolean bShouldSub = true;
      public synchronized void sub(int i){
          while(!bShouldSub){
              try {
                this.wait();
            } catch (InterruptedException e) {}
          }
            for(int j=1;j<=10;j++){
                System.out.println("sub thread sequence of " + j + ",loop of " + i);
            }
          bShouldSub = false;
          this.notify();
      }
      
      public synchronized void main(int i){
              while(bShouldSub){
                  try {
                    this.wait();
                } catch (InterruptedException e) {}
              }
            for(int j=1;j<=100;j++){
                System.out.println("main thread sequence of " + j + ",loop of " + i);
            }
            bShouldSub = true;
            this.notify();
      }
  }

五、线程范围内共享变量的概念与作用

如果每个线程执行的代码相同,可以使用同一个Runnable对象,这个Runnable对象中有那个共享数据,例如,买票系统就可以这么做. 如果每个线程执行的代码不同,这时候需要用不同的Runnable对象,如下方式来实现这些Runnable对象之间的数据共享: 将共享数据封装在另外一个对象中,然后将这个对象逐一传递给各个Runnable对象.每个线程对共享数据的操作方法也分配到那个对象身上去完成,这样容易实现针对该数据进行的各个操作的互斥和通信. 总之,要同步互斥的几段代码最好是分别放在几个独立的方法中,这些方法再放在同一个类中,这样比较容易实现它们之间的同步互斥和通信. 极端且简单的方式,即在任意一个类中定义一个static的变量,这将被所有线程共享.

public class MultiThreadShareData {
    private static ShareData data1 = new ShareData();   //被所有线程共享
    public static void main(String[] args) {
        ShareData data = new ShareData();
        new Thread(new MyRunnable1(data)).start();
        new Thread(new MyRunnable2(data)).start();
    }
}
class MyRunnable1 implements Runnable{
    private ShareData data;
    public MyRunnable1(ShareData data){
        this.data = data;
    }
    public void run() {
        data.decrement();
    }
}
class MyRunnable2 implements Runnable{
    private ShareData data;
    public MyRunnable2(ShareData data){
        this.data = data;
    }
    public void run() {
        data.increment();
    }
}
//共享数据类
class ShareData{ /*implements Runnable 这种代码不好,没有将共享数据和线程分开,不利于维护 private int count = 100; @Override public void run() { // TODO Auto-generated method stub while(true){ count--; } }*/ private int j = 0; public synchronized void increment(){ j++; } public synchronized void decrement(){ j--; } }

六、ThreadLocal类及应用技巧

在同步机制中,通过对象的锁机制保证同一时间只有一个线程访问变量.这时该变量是多个线程共享的,使用同步机制要求程序慎密地分析什么时候对变量进行读写,什么时候需要锁定某个对象,什么时候释放对象锁等繁杂的问题,程序设计和编写难度相对较大.而ThreadLocal则从另一个角度来解决多线程的并发访问.ThreadLocal会为每一个线程提供一个独立的变量副本,从而隔离了多个线程对数据的访问冲突.因为每一个线程都拥有自己的变量副本,从而也就没有必要对该变量进行同步了.ThreadLocal提供了线程安全的共享对象,在编写多线程代码时,可以把不安全的变量封装进ThreadLocal.

当然ThreadLocal并不能替代同步机制,两者面向的问题领域不同.同步机制是为了同步多个线程对相同资源的并发访问,是为了多个线程之间进行通信 的有效方式;而ThreadLocal是隔离多个线程的数据共享,从根本上就不在多个线程之间共享资源(变量),这样当然不需要对多个线程进行同步了.所以,如果你需要进行多个线程之间进行通信,则使用同步机制;如果需要隔离多个线程之间的共享冲突,可以使用ThreadLocal,这将极大地简化你的程序,使程序更加易读、简洁.

public class ThreadLocalTest {
    private static ThreadLocal<Integer> x=new ThreadLocal<Integer>();
    private static ThreadLocal<MyThreadScopeData> myThreadScopeData=
            new ThreadLocal<MyThreadScopeData>(); public static void main(String[] args) { for(int i=0;i<2;i++){ new Thread(new Runnable(){ @Override public void run() { int data = new Random().nextInt(); System.out.println(Thread.currentThread().getName() + " has put data :" + data); x.set(data); MyThreadScopeData myData = new MyThreadScopeData(); myData.setName("name" + data); myData.setAge(data); myThreadScopeData.set(myData); int data1 = x.get(); System.out.println("A from " + getName()+" get data :" + data1); MyThreadScopeData myData1 = myThreadScopeData.get();; System.out.println("A from " + getName()+" getMyData: " + myData1.getName()); } }).start(); } } } class MyThreadScopeData{ private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } }

ThreadLocal底层实现

public class ThreadScopeShareData {

    private static int data = 0;
    private static Map<Thread, Integer> threadData = new HashMap<Thread, Integer>();
    public static void main(String[] args) {
        for(int i=0;i<2;i++){
            new Thread(new Runnable(){
                @Override
                public void run() {
                    data = new Random().nextInt();
                    System.out.println(Thread.currentThread().getName() + " has put data :" + data);
                    threadData.put(Thread.currentThread(), data);
                    new A().get();
                    new B().get();
                }
            }).start();
        }
    }
    
    static class A{
        public void get(){
            int data = threadData.get(Thread.currentThread());
            System.out.println("A from " + Thread.currentThread().getName() + " get data :" + data);
        }
    }
    
    static class B{
        public void get(){
            int data = threadData.get(Thread.currentThread());        
            System.out.println("B from " + Thread.currentThread().getName() + " get data :" + data);
        }        
    }
}

ThreadLocal用于session管理

private static final ThreadLocal threadSession = new ThreadLocal();
 
public static Session getSession() throws InfrastructureException {
    Session s = (Session) threadSession.get();
    try {
        if (s == null) {
            s = getSessionFactory().openSession();
            threadSession.set(s);
        }
    } catch (HibernateException ex) {
        throw new InfrastructureException(ex);
    }
    return s;
}

七、接口 Executor

执行已提交的 Runnable对象的任务.此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法.通常使用Executor而不是显式地创建线程.例如下所示,而不是为一组任务中的每个任务调用 new Thread(new(RunnableTask())).start().

import java.util.concurrent.Executor;

class TestRunnable implements Runnable{
    @Override
    public void run() {
        for(int i=0;i<10;i++)
            System.out.println("ww");
    }
}
class MyExecutor implements Executor{
    @Override
    public void execute(Runnable command) {
        new Thread(command).start();
    }
}
public class TestExecutor extends Thread{
    public static void main(String[] args) {
        TestRunnable t = new TestRunnable();
        Executor executor = new MyExecutor();
        executor.execute(t);
    }
}

八、线程池使用

    大多数服务器应用程序(如 Web 服务器、POP 服务器、数据库服务器或文件服务器)代表远程客户机处理请求,这些客户机通常使用 socket 连接到服务器.对于每个请求,通常要进行少量处理(获得该文件的代码块,并将其发送回 socket),但是可能会有大量(且不受限制)的客户机请求服务.

  用于构建服务器应用程序的简单化模型会为每个请求创建新的线程.下列代码段实现简单的 Web 服务器,它接受端口 80 的 socket 连接,并创建新的线程来处理请求.不幸的是,该代码不是实现 Web 服务器的好方法,因为在重负载条件下它将失败,停止整台服务器.

class UnreliableWebServer {
public static void main(String[] args) throws IOException {
    ServerSocket socket = new ServerSocket(80);
    while (true) {
      final Socket connection = socket.accept();
          Runnable r = new Runnable() {
          public void run() {
              handleRequest(connection);
          }
        };
      new Thread(r).start();
    }
  }
}

无论如何,这样使用资源可能会损害性能.如果创建过多线程,其中每个线程都将占用一些 CPU 时间,结果将使用许多内存来支持大量线程,每个线程都运行得很慢。这样就无法很好地使用计算资源,管理一大组小任务的标准机制是组合工作队列和线程池.工作队列就是要处理的任务的队列,前面描述的 Queue 类完全适合.线程池是线程的集合,每个线程都提取公用工作队列.当一个工作线程完成任务处理后,它会返回队列,查看是否有其他任务需要处理.如果有,它会转移到下一个任务,并开始处理,如下是一个使用线程池的简单网络服务:

class NetworkService implements Runnable {
    private final ServerSocket serverSocket;
    private final ExecutorService pool;

    public NetworkService(int port, int poolSize)
        throws IOException {
      serverSocket = new ServerSocket(port);
      pool = Executors.newFixedThreadPool(poolSize);
    }
 
    public void run() { // run the service
      try {
        for (;;) {
          pool.execute(new Handler(serverSocket.accept()));
        }
      } catch (IOException ex) {
        pool.shutdown();
      }
    }
  }

  class Handler implements Runnable {
    private final Socket socket;
    Handler(Socket socket) { this.socket = socket; }
    public void run() {
      // read and service request on socket
    }
 }

几种类型的线程池

public class ThreadPoolTest {
    public static void main(String[] args) {
        //可重用固定线程数的线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(3);   
        //可根据需要创建新线程的缓存线程池
        //ExecutorService threadPool = Executors.newCachedThreadPool(); 
        //单个线程的线程池
        //ExecutorService threadPool = Executors.newSingleThreadExecutor(); 
        //一个可以在给定延迟后运行命令或者定期地执行的线程池
        ScheduledExecutorService  ScheduledThreadPool = Executors.newScheduledThreadPool(3);
        for(int i=1;i<=10;i++){
            final int task = i;
            threadPool.execute(new Runnable(){
                @Override
                public void run() {
                    for(int j=1;j<=10;j++){
                        System.out.println(Thread.currentThread().getName() + " is looping of " + j 
              + " for task of " + task); } } }); } ScheduledThreadPool.scheduleAtFixedRate(new Runnable(){ public void run(){ System.out.println("bombing!"); } }, 10, 1, TimeUnit.SECONDS); } }

九、Callable与Future的应用,返回结果并且可能抛出异常的任务,略.

十、互斥锁

public class LockTest {
    public static void main(String[] args) {
        new LockTest().init();
    }
    private void init(){
        final Outputer outputer = new Outputer();
        new Thread(new Runnable(){
            @Override
            public void run() {
                while(true){
                    outputer.output("zhangxiaoxiang");
                }
            }
        }).start();
        
        new Thread(new Runnable(){
            @Override
            public void run() {
                while(true){
                    outputer.output("lihuoming");
                }
            }
        }).start();
    }
    
}
class Outputer{
    Lock lock = new ReentrantLock();
    public void output(String name){
        int len = name.length();
        lock.lock();
        try{
            for(int i=0;i<len;i++){
                System.out.print(name.charAt(i));
            }
            System.out.println();
        }finally{
            lock.unlock();
        }
    }
}

十、读写锁

与互斥锁相比,读-写锁允许对共享数据进行更高级别的并发访问.虽然一次只有一个线程(writer 线程)可以修改共享数据,但在许多情况下,任何数量的线程可以同时读取共享数据(reader 线程),读-写锁利用了这一点.使用读-写锁所允许的并发性增强将带来更大的性能提高.

public class ReadWriteLockTest {
    public static void main(String[] args) {
        final Queue q = new Queue();
        for(int i=0;i<3;i++){
            new Thread(){
                public void run(){
                    while(true){
                        q.get();                        
                    }
                }
            }.start();

            new Thread(){
                public void run(){
                    while(true){
                        q.put(new Random().nextInt(10000));
                    }
                }            
            }.start();
        }
    }
}

class Queue{
    private Object data = null;//共享数据,只能有一个线程能写该数据,但可以有多个线程同时读该数据.
    ReadWriteLock rwl = new ReentrantReadWriteLock();
    public void get(){
        rwl.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + "be ready to read data!");
            Thread.sleep((long)(Math.random()*100));
            System.out.println(Thread.currentThread().getName() + "have read data :" + data);            
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally{
            rwl.readLock().unlock();
        }
    }
    
    public void put(Object data){
        rwl.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + " be ready to write data!");                    
            Thread.sleep((long)(Math.random()*1000));
            this.data = data;        
            System.out.println(Thread.currentThread().getName() + " have write data: " + data);                    
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally{
            rwl.writeLock().unlock();
        }
    }
}

 十一、Condition

ConditionObject监视器方法wait,notify,notifyll分解成截然不同的对象,以便通过将这些对象与任意lock实现组合使用.

public class TestConditon {
    public static void main(String[] args) {
        final Business business = new Business();
        new Thread(new Runnable() {
            public void run() {
                for (int i = 1; i <= 50; i++) {
                    business.sub(i);
                }
            }
        }).start();
        for (int i = 1; i <= 50; i++) {
            business.main(i);
        }
    }
}

class Business {
    Lock lock = new ReentrantLock();
    Condition notFull = lock.newCondition();
    Condition notEmpty = lock.newCondition();

    private boolean bShouldSub = true;

    public void sub(int i) {
        try {
            lock.lock();
            while (!bShouldSub) {
                try {
                    notFull.await();
                } catch (InterruptedException e) {
                }
            }
            for (int j = 1; j <= 10; j++) {
                System.out.println("sub thread sequence of " + j + ",loop of " + i);
            }
            bShouldSub = false;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public void main(int i) {
        try {
            lock.lock();
            while (bShouldSub) {
                try {
                    notEmpty.await();
                } catch (InterruptedException e) {
                }
            }
            for (int j = 1; j <= 100; j++) {
                System.out.println("main thread sequence of " + j + ",loop of " + i);
            }
            bShouldSub = true;
            notFull.signal();
        } finally {
            lock.unlock();
        }
    }
}

改写生产者消费者模式

class Product {
    private int id;

    Product(int id) {
        this.id = id;
    }

    public String toString() {
        return "Product [id=" + id + "]";
    }
}

class Box {
    Lock lock = new ReentrantLock();
    Condition putConditon = lock.newCondition();
    Condition popConditon = lock.newCondition();
    Product[] p = new Product[10]; // 此处可以定义为一个队列
    int index = 0;

    public void put(Product pro) {
        try {
            lock.lock();
            while (index == p.length) {
                try {
                    putConditon.await();
                } catch (InterruptedException e) {
                }
            }
            p[index] = pro;
            index++;
            popConditon.signal();
        } finally {
            lock.unlock();
        }
    }

    public Product pop() {
        try {
            lock.lock();
            while (index == 0) {
                try {
                    popConditon.await();
                } catch (InterruptedException e) {
                }
            }
            putConditon.signal();
            index--;
            return p[index];
        } finally {
            lock.unlock();
        }
    }
}

class Producter implements Runnable {
    Box box = null;

    Producter(Box box) {
        this.box = box;
    }

    public void run() {
        for (int i = 0; i < 20; i++) { // 每个生产者生产20个
            Product pro = new Product(i);
            box.put(pro);
            System.out.println(Thread.currentThread().getName() + "生产了:" + pro);
        }
    }
}

class Customer implements Runnable {
    Box box = null;

    Customer(Box box) {
        this.box = box;
    }

    public void run() {
        while (true) {
            Product pro = box.pop();
            System.out.println(Thread.currentThread().getName() + "消费了:" + pro);
        }
    }
}

public class TestCondions {
    public static void main(String[] args) {
        Box box = new Box();
        Producter p = new Producter(box);
        Customer c = new Customer(box);
        new Thread(p).start();
        new Thread(p).start();
        new Thread(p).start();
        new Thread(c).start();
        new Thread(c).start();
        new Thread(c).start();
    }
}

十二、Semaphore

通常用于限制可以访问某些资源(物理或逻辑的)的线程数目,可以和线程池组合使用.

public class SemaphoreTest {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final  Semaphore sp = new Semaphore(3);
        for(int i=0;i<10;i++){
            Runnable runnable = new Runnable(){
                    public void run(){
                    try {
                        sp.acquire();
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                    try {
                        //处理业务
                        Thread.sleep((long)(Math.random()*10000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    sp.release();
                }
            };
            service.execute(runnable);            
        }
    }
}

十二、CyclicBarrier、Exchanger、CountDownLatch、阻塞队列,略。

javase_并发库

标签:

原文地址:http://www.cnblogs.com/wangweiNB/p/4813896.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!