码迷,mamicode.com
首页 > 编程语言 > 详细

Java多线程之JUC

时间:2021-01-02 10:50:33      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:@param   系统   lan   multi   定时   manager   无法   影响   string   

1.JUC

JUC:java.util.concurrent

  • 在并发编程中使用的工具类
  • java.util.concurrent 并发包
  • java.util.concurrent.atomic 并发原子包
  • java.util.concurrent.locks 并发lock包

2.多线程编程

  • 模板

    • 线程 操作 资源类(资源类不实现Runnable接口)
      • 操作:资源对外暴露的调用方法
    • 高内聚低耦合
  • 步骤

    • 创建资源类
    • 资源类里创建同步方法、同步代码块
  • synchronized

    class X {
       public synchronized void m() {
         //todo
       }
     }
    
/**
 * 线程 操作 资源类
 */
public class SaleTicket {
    public static void main(String[] args) {
        //资源
        Ticket ticket = new Ticket();
        //线程
        new Thread(() -> { for (int i = 0; i < 100; i++) ticket.sellTicket(); }, "A").start();
        new Thread(() -> { for (int i = 0; i < 100; i++) ticket.sellTicket(); }, "B").start();
        new Thread(() -> { for (int i = 0; i < 100; i++) ticket.sellTicket(); }, "C").start();

    }
}

//资源类
class Ticket {

    private Integer number = 100;

    //操作:对外暴露操作资源的方法
    public synchronized void sellTicket() {
        if (number > 0) {
            System.out.println(Thread.currentThread().getName() + "\t 卖出第" + number-- + "张票,还剩"
                    + number + "张");
        }
    }
}

3.Lock

3.1 Lock

技术图片

  • java.util.concurrent.locks.Lock

    Lock implementations provide more extensive locking operations than can be obtained using synchronized methods and statements. They allow more flexible structuring, may have quite different properties, and may support multiple associated Condition objects.

    锁实现提供了比使用同步方法和同步代码块更广泛的锁操作。它们允许更灵活的结构,可能具有完全不同的属性,并且可能支持多个关联的条件对象。

3.2 ReentrantLock可重入锁

  • 可重入锁:某个线程已经获得某个锁,可以再次获取锁而不会出现死锁,获取和释放的次数需要一致

  • Lock接口的实现类,java.util.concurrent.locks.ReentrantLock

  • 使用

    class X {
       private final Lock lock = new ReentrantLock();
       // ...
       public void m() {
         lock.lock();  // block until condition holds
         try {
           // ... method body
         } finally {
           lock.unlock()
         }
       }
     }
    
  • 构造方法

    /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();//非公平锁
    }
    
    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {//FairSync 公平锁
        sync = fair ? new FairSync() : new NonfairSync();
    }
    

    公平锁:十分公平,按先来后到顺序
    非公平锁:十分不公平,可以插队(默认,如线程A在前,需要1小时,线程B在后,需要3秒,非公平锁允许线程B插队,先完成,而不需要等待线程A完成后再执行)

3.3 synchronized&&Lock的区别

  • synchronized是java内置关键字,在jvm层面,Lock是个java类;
  • synchronized无法判断是否获取锁的状态,Lock可以判断是否获取到锁;
  • synchronized会自动释放锁(线程执行完同步代码会释放锁、线程执行过程中发生异常会释放锁),Lock需在finally中手工释放锁(unlock()方法释放锁),否则容易造成线程死锁;
  • 用synchronized关键字的两个线程1和线程2,如果当前线程1获得锁,线程2线程等待。如果线程1阻塞,线程2则会一直等待下去,而Lock锁就不一定会等待下去,如果尝试获取不到锁,线程可以不用一直等待就结束了;
  • synchronized的锁可重入、不可中断、非公平,而Lock锁可重入、可判断、可公平(公平非公平两者皆可)
  • Lock锁适合大量同步的代码的同步问题,synchronized锁适合代码少量的同步问题。

4. 线程间的通信

  • 线程间通信

    • 生产者+消费者
    • 通知等待唤醒机制
  • 多线程编程模板

    • 判断 干活 通知
    • 判断需使用while,以防止中断和虚假唤醒(见java.lang.Object的API)

    A thread can also wake up without being notified, interrupted, or timing out, a so-called spurious wakeup. While this will rarely occur in practice, applications must guard against it by testing for the condition that should have caused the thread to be awakened, and continuing to wait if the condition is not satisfied. In other words, waits should always occur in loops, like this one

       synchronized (obj) {
          	while (<condition does not hold>)
          		obj.wait(timeout);
          	... // Perform action appropriate to condition
       }
    

    线程也可以在没有通知、中断或超时的情况下被唤醒,这就是所谓的假唤醒。虽然这种情况在实践中很少发生,但应用程序必须通过测试导致线程被唤醒的条件来防止这种情况发生,如果条件不满足,则继续等待。换句话说,等待应该总是出现在循环中,就像这个循环一样

4.1 synchronized版

  • synchronized实现

    • 先2个线程操作资源类,资源中的操作判断使用if,如线程A和线程B,可以正常运行1 0 1 0 1 0...
    • 增加2个线程C和D,模拟虚假唤醒,判断依旧是if,运行的结果数字不是全部为1、0
      • 原因:在java多线程判断时,不能用if,程序出事出在了判断上面,突然有一添加的线程进到if了,突然中断了交出控制权,没有进行验证,而是直接走下去了,加了两次,甚至多次
    • 在4线程中,将资源类的if判断改为while判断,while是只要唤醒就要拉回来再判断一次
    package juc.demo;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * @Description:
     *  现在两个线程,
     *  可以操作初始值为零的一个变量,
     *  实现一个线程对该变量加1,一个线程对该变量减1,
     *  交替,来10轮。
     * @Package: juc.demo
     * @ClassName NotifyWaitDemo
     * @author: wuwb
     * @date: 2020/10/19 13:30
     */
    public class NotifyWaitDemo {
        public static void main(String[] args) {
            int turn = 1000;
            //资源类
            ShareData data = new ShareData();
            new Thread(() -> {
                for (int i = 0; i < turn; i++) {
                    try {
                        data.increment();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, "A").start();
    
            new Thread(() -> {
                for (int i = 0; i < turn; i++) {
                    try {
                        data.decrement();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, "B").start();
    
            new Thread(() -> {
                for (int i = 0; i < turn; i++) {
                    try {
                        data.increment();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, "C").start();
    
            new Thread(() -> {
                for (int i = 0; i < turn; i++) {
                    try {
                        data.decrement();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, "D").start();
        }
    }
    
    //资源类
    class ShareData{
        private int number = 0;
    
        public synchronized void increment() throws InterruptedException {
            //判断  if换while
            while (number != 0) {
                this.wait();
            }
            //干活
            number++;
            System.out.println(Thread.currentThread().getName() + ":" + number);
            //通知
            this.notifyAll();
        }
    
        public synchronized void decrement() throws InterruptedException {
            while (number == 0) {
                this.wait();
            }
            number--;
            System.out.println(Thread.currentThread().getName() + ":" + number);
            this.notifyAll();
        }
    
    }
    
    

4.2 JUC版

技术图片

  • Lock 及 Condition

    package juc.demo;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * @Description:
     *  现在两个线程,
     *  可以操作初始值为零的一个变量,
     *  实现一个线程对该变量加1,一个线程对该变量减1,
     *  交替,来10轮。
     * @Package: juc.demo
     * @ClassName NotifyWaitDemo
     * @author: wuwb
     * @date: 2020/10/19 13:30
     */
    public class NotifyWaitDemo {
        public static void main(String[] args) {
            int turn = 1000;
            //资源类
            ShareData data = new ShareData();
            new Thread(() -> {
                for (int i = 0; i < turn; i++) {
                    try {
                        data.increment();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, "A").start();
    
            new Thread(() -> {
                for (int i = 0; i < turn; i++) {
                    try {
                        data.decrement();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, "B").start();
    
            new Thread(() -> {
                for (int i = 0; i < turn; i++) {
                    try {
                        data.increment();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, "C").start();
    
            new Thread(() -> {
                for (int i = 0; i < turn; i++) {
                    try {
                        data.decrement();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, "D").start();
        }
    }
    
    //资源类
    class ShareData{
        private int number = 0;
        private Lock lock = new ReentrantLock();
        private Condition condition = lock.newCondition();
    
        public void increment() {
            lock.lock();
            try {
                //判断
                while (number != 0) {
                    condition.await();//this.wait();
                }
                //干活
                number++;
                System.out.println(Thread.currentThread().getName() + ":" + number);
                //通知
                condition.signalAll();//this.notifyAll();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public void decrement() {
            lock.lock();
            try {
                while (number == 0) {
                    condition.await();
                }
                number--;
                System.out.println(Thread.currentThread().getName() + ":" + number);
                condition.signalAll();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
    }
    
    

4.3 定制化调用通信

  • 使用Lock、Condition指定调用顺序,指定唤醒哪个线程
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Description
 * 多线程之间按顺序调用,实现A->B->C
 * ......来10轮
 */
public class ThreadOrderAccess {
    public static void main(String[] args) {
        ShareResource resource = new ShareResource();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                resource.printA();
            }
        }, "A").start();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                resource.printB();
            }
        }, "B").start();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                resource.printC();
            }
        }, "C").start();
    }
}

class ShareResource{
    /**标志位*/
    private int number = 1;
    private Lock lock = new ReentrantLock();
    /**3把钥匙*/
    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();
   
    public void printA() {
        lock.lock();
        try {
            while (number != 1) {
                condition1.await();
            }
            System.out.println(Thread.currentThread().getName()+"==>AAAAAAAAAA");
            number = 2;
            condition2.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void printB() {
        lock.lock();
        try {
            while (number != 2) {
                condition2.await();
            }
            System.out.println(Thread.currentThread().getName()+"==>BBBBBBBBBB");
            number = 3;
            condition3.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void printC() {
        lock.lock();
        try {
            while (number != 3) {
                condition3.await();
            }
            System.out.println(Thread.currentThread().getName()+"==>CCCCCCCCCC");
            number = 1;
            condition1.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

5. 多线程锁

5.1 八锁现象

  • 问题一:标准访问,先打印短信还是邮件?
public class Lock_8_1 {
    public static void main(String[] args) throws Exception {
        Phone phone = new Phone();
        new Thread(() -> { phone.sendSMS(); }, "AA").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> { phone.sendEmail(); }, "BB").start();
    }
}

class Phone {
    public synchronized void sendSMS() {
        System.out.println("------sendSMS");
    }
    public synchronized void sendEmail() {
        System.out.println("------sendEmail");
    }
}/*sendSMS*/
  • 问题二:停4秒在短信方法内,先打印短信还是邮件?
public class Lock_8_2 {
    public static void main(String[] args) throws Exception {
        Phone phone = new Phone();
        new Thread(() -> { phone.sendSMS(); }, "AA").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> { phone.sendEmail(); }, "BB").start();
    }
}

class Phone {
    public synchronized void sendSMS() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("------sendSMS");
    }
    public synchronized void sendEmail() {
        System.out.println("------sendEmail");
    }
}/*sendSMS*/
  • 问题三:新增普通的getHello方法,是先打印短信还是hello?
public class Lock_8_3 {
    public static void main(String[] args) throws Exception {
        Phone phone = new Phone();
        new Thread(() -> { phone.sendSMS(); }, "AA").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> { phone.getHello(); }, "BB").start();
    }
}

class Phone {
    public synchronized void sendSMS() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("------sendSMS");
    }
    public synchronized void sendEmail() {
        System.out.println("------sendEmail");
    }
    public void getHello() {
        System.out.println("------getHello");
    }
}/*getHello*/
  • 问题四:现在有两部手机,先打印短信还是邮件?
public class Lock_8_4 {
    public static void main(String[] args) throws Exception {
        Phone phone1 = new Phone();
        Phone phone2 = new Phone();
        new Thread(() -> { phone1.sendSMS(); }, "AA").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> { phone2.sendEmail(); }, "BB").start();
    }
}

class Phone {
    public synchronized void sendSMS() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("------sendSMS");
    }
    public synchronized void sendEmail() {
        System.out.println("------sendEmail");
    }
}/*sendEmail*/
  • 问题五:两个静态同步方法,1部手机,先打印短信还是邮件?
public class Lock_8_5 {
    public static void main(String[] args) throws Exception {
        Phone phone = new Phone();
        new Thread(() -> { phone.sendSMS(); }, "AA").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> { phone.sendEmail(); }, "BB").start();
    }
}

class Phone {
    public static synchronized void sendSMS() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("------sendSMS");
    }
    public static synchronized void sendEmail() {
        System.out.println("------sendEmail");
    }
}/*sendSMS*/
  • 问题六:两个静态同步方法,2部手机,先打印短信还是邮件?
public class Lock_8_6 {
    public static void main(String[] args) throws Exception {
        Phone phone = new Phone();
        Phone phone2 = new Phone();
        new Thread(() -> { phone.sendSMS(); }, "AA").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> { phone2.sendEmail(); }, "BB").start();
    }
}

class Phone {
    public static synchronized void sendSMS() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("------sendSMS");
    }
    public static synchronized void sendEmail() {
        System.out.println("------sendEmail");
    }
}/*sendSMS*/
  • 问题七:1个静态同步方法,1个普通同步方法,1部手机,先打印短信还是邮件?
public class Lock_8_7 {
    public static void main(String[] args) throws Exception {
        Phone phone = new Phone();
        new Thread(() -> { phone.sendSMS(); }, "AA").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> { phone.getHello(); }, "BB").start();
    }
}

class Phone {
    public static synchronized void sendSMS() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("------sendSMS");
    }
    public synchronized void sendEmail() {
        System.out.println("------sendEmail");
    }
}/*sendEmail*/
  • 问题八:1个静态同步方法,1个普通同步方法,2部手机,先打印短信还是邮件
public class Lock_8_8 {
    public static void main(String[] args) throws Exception {
        Phone phone = new Phone();
        Phone phone2 = new Phone();
        new Thread(() -> { phone.sendSMS(); }, "AA").start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> { phone2.sendEmail(); }, "BB").start();
    }
}

class Phone {
    public static synchronized void sendSMS() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("------sendSMS");
    }
    public synchronized void sendEmail() {
        System.out.println("------sendEmail");
    }
}/*sendEmail*/

5.2 多线程锁

  • synchronized实现同步的基础:Java中的每一个对象都可以作为锁。具体表现为以下3种形式。
    • 对于普通同步方法,锁是当前实例对象。
    • 对于静态同步方法,锁是当前类的Class对象。
    • 对于同步方法块,锁是Synchonized括号里配置的对象
  • 当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。
  • 如果一个实例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方法必须等待获取锁的方法释放锁后才能获取锁
    • 一个对象里面如果有多个synchronized方法,某一个时刻内,只要一个线程去调用其中的一个synchronized方法了,其它的线程都只能等待,换句话说,某一个时刻内,只能有唯一一个线程去访问这些synchronized方法,锁的是当前对象this,被锁定后,其它的线程都不能进入到当前对象的其它的synchronized方法(问题一、问题二)
  • 不同的实例对象的非静态同步方法因为锁的是各自的实例对象,所以互不影响(问题四)
  • 加个普通方法后发现和同步锁无关(问题三)
  • 所有的静态同步方法用的也是同一把锁——类对象本身,一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁,而不管是同一个实例对象的静态同步方法之间,还是不同的实例对象的静态同步方法之间,它们只有同一个类的实例对象class!(问题五、问题六)
  • 实例对象锁与类的class对象锁,是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有竞态条件的。(问题七、问题八)

6. 集合类不安全

6.1 List不安全

  • 集合线程的不安全性,如多线程操作ArrayList时,ArrayList在迭代的时候如果同时对其进行修改就会抛出java.util.ConcurrentModificationException异常,并发修改异常

    List<String> list = new ArrayList<>();
    for (int i = 0; i < 30; i++) {
        new Thread(() -> {
            list.add(UUID.randomUUID().toString().substring(0,8));
            System.out.println(list);
        }, String.valueOf(i)).start();
    }
    
  • 解决方案

    • Vector
      • List<String> list = new Vector<>();
      • 加锁,安全,但性能差
    • Collections
      • List<String> list = Collections.synchronizedList(new ArrayList<>());
        Collections提供了方法synchronizedList保证list是同步线程安全的
      • HashSet 与 HashMap 也是非线程安全的,Collections也提供了对应的方法
    • 写时复制CopyOnWriteArrayList
      • 类似读写分离的思想

6.2 写时复制

  • CopyOnWrite理论

    • CopyOnWrite容器即写时复制的容器。往一个容器添加元素的时候,不直接往当前容器Object[]添加,而是先将当前容器Object[]进行Copy,复制出一个新的容器Object[] newElements,然后向新的容器Object[] newElements里添加元素。添加元素后,再将原容器的引用指向新的容器setArray(newElements)。这样做的好处是可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
  • CopyOnWriteArrayList 源码

     /**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return {@code true} (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }
    

6.3 Set不安全

  • HashSet线程不安全,底层结构是HashMap,set添加时调用的是map的put方法,值作为key,而value为固定值

    public HashSet() {
        map = new HashMap<>();
    }
    private static final Object PRESENT = new Object(); 
    public boolean add(E e) {
        return map.put(e, PRESENT) == null;
    }
    
  • Set<String> set = new HashSet<>();//线程不安全

  • Set<String> set = Collections.synchronizedSet(new HashSet<>());//线程安全

  • Set<String> set = new CopyOnWriteArraySet<>();//线程安全

6.4 Map不安全

6.4.1 HashMap

  • 初始容器 16 / 0.75
  • 根据键的hashCode值存储数据
  • 允许一个键为null,允许多个值为null
  • HashMap线程不安全,线程安全Collections.synchronizedMap或ConcurrentHashMap
  • Java7,数组+单向链表,每个Entry包含4个值:key、value、hash值和用于单向链表的next
    扩容:当前的2倍,负载因子:0.75
  • Java8,数组+链表+红黑树,链表节点数超过8后,链表转为红黑树,减少查找链表数据的时间
  • map.put(k,v)实现原理
    • 第一步首先将k,v封装到Node对象当中(节点)。
    • 第二步它的底层会调用K的hashCode()方法得出hash值。
    • 第三步通过哈希表函数/哈希算法,将hash值转换成数组的下标,下标位置上如果没有任何元素,就把Node添加到这个位置上。如果说下标对应的位置上有链表。此时,就会拿着k和链表上每个节点的k进行equal。如果所有的equals方法返回都是false,那么这个新的节点将被添加到链表的表头,next指向之前的表头节点。如其中有一个equals返回了true,那么这个节点的value将会被覆盖。
  • JDK8之后,如果哈希表单向链表中元素超过8个,那么单向链表这种数据结构会变成红黑树数据结构。当红黑树上的节点数量小于6个,会重新把红黑树变成单向链表数据结构。

6.4.2 ConcurrentHashMap

  • 整个 ConcurrentHashMap 由一个个 Segment 组成,ConcurrentHashMap 是一个 Segment 数组
  • 线程安全,Segment 通过继承ReentrantLock 来进行加锁,加锁锁住的是Segment
  • JDK1.7分段锁,1.8CAS(compare and swap的缩写,即我们所说的比较交换)
  • JDK1.7版本的ReentrantLock+Segment+HashEntry,到JDK1.8版本中synchronized+CAS+HashEntry+红黑树

7. Callable接口

7.1 Callable接口

  • java.util.concurrent包下的函数式接口

  • 与Runnable()对比

    • 方法是否有返回值

    • 是否抛出异常

    • 落地方法不同,call() / run()

      //创建新类MyThread实现runnable接口
      class MyThread implements Runnable{
      	@Override
      	public void run() {
       
      	}
      }
      //新类MyThread2实现callable接口
      class MyThread2 implements Callable<Integer>{
      	@Override
      	public Integer call() throws Exception {
      		return 200;
      	} 
      }
      

7.2 FutureTask

  • thread构造方法中只能接受Runnable,寻找中间桥梁

    ![1603262924681](juc.assets/1603262924681.png)
      
    ![1603262989939](juc.assets/1603262989939.png)
    

A cancellable asynchronous computation. This class provides a base implementation of Future, with methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the computation. The result can only be retrieved when the computation has completed; the get methods will block if the computation has not yet completed. Once the computation has completed, the computation cannot be restarted or cancelled (unless the computation is invoked using runAndReset()).

A FutureTask can be used to wrap a Callable or Runnable object. Because FutureTask implements Runnable, a FutureTask can be submitted to an Executor for execution.

  • 通过FutureTask的有参构造方法将Callable传入

  • 运行成功后,通过futureTask.get()获取返回值

  • 在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成,当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法,直到任务转入完成状态,然后会返回结果或者抛出异常。一旦计算完成,就不能再重新开始或取消计算

    只计算一次,get方法放到最后

public class FutureTaskDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<Integer> futureTask = new FutureTask<Integer>(() -> {
            System.out.println(Thread.currentThread().getName()+"******");
            return 1024;
        });
        FutureTask<Integer> futureTask1 = new FutureTask<>(new MyThread());
        new Thread(futureTask,"A").start();// 结果会被缓存,效率高
        new Thread(futureTask,"B").start();
        new Thread(futureTask1,"C").start();
        /*while(!futureTask.isDone()){
            System.out.println("***wait");
        }*/
        System.out.println(futureTask.get());
        System.out.println(futureTask1.get());
    }
}

class MyThread implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println(Thread.currentThread().getName()+"******");
        return 200;
    }
}
/*
执行结果:
A******
C******
1024
200
*/

  • 有缓存

8. JUC辅助类

8.1 CountDownLatch

  • 减少计数
  • 方法
    • await() / await(long timeout, TimeUnit unit) (unit 时间单位)
      • 阻塞,直到count为0,或线程被中断,或达到给定的时间
    • countDown()
      • 计数减一,直到为0,释放等待线程
  • 当一个或多个线程调用await方法时,这些线程会阻塞,其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch count = new CountDownLatch(6);
        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName()+"\t 离开");
                count.countDown();
            }, String.valueOf(i)).start();
        }
        count.await();//阻塞,直到count=0,main线程才继续向下执行
        System.out.println("全部离开");
    }
}

8.2 CyclicBarrier

  • 可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。线程进入屏障通过CyclicBarrier的await()方法
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7,() -> {
            System.out.println("召唤神龙");
        });

        for (int i = 1; i <= 7; i++) {
            new Thread(() -> {
                System.out.println("获得第"+Thread.currentThread().getName()+"颗龙珠!");
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("线程"+Thread.currentThread().getName()+"====>"+
                        Thread.currentThread().getState());
            }, String.valueOf(i)).start();
        }

    }
}
/*
获得第1颗龙珠!
获得第4颗龙珠!
获得第5颗龙珠!
获得第3颗龙珠!
获得第2颗龙珠!
获得第6颗龙珠!
获得第7颗龙珠!
召唤神龙
线程5====>RUNNABLE
线程3====>RUNNABLE
线程2====>RUNNABLE
线程1====>RUNNABLE
线程6====>RUNNABLE
线程4====>RUNNABLE
线程7====>RUNNABLE
*/

8.3 Semaphore

  • 在信号量上我们定义两种操作:
    • acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时。
    • release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
  • 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
public class SemaphoreDemo {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 1; i <= 6; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "\t抢到车位!");
                    TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                    System.out.println(Thread.currentThread().getName() + "\t离开");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            }, String.valueOf(i)).start();
        }
    }
}
/*
2	抢到车位!
1	抢到车位!
6	抢到车位!
1	离开
6	离开
4	抢到车位!
3	抢到车位!
3	离开
5	抢到车位!
2	离开
4	离开
5	离开
*/

9. ReentrantReadWriteLock读写锁

  • ReadWriteLock读写锁(java.util.concurrent.locks包)
    • 读锁:共享锁,多个线程可以同时占有,读的时候可以被多个线程同时读
    • 写锁:独占锁,一次只能被一个线程占有,写的时候只能是一个线程写,写锁为排他锁
  • ReentrantReadWriteLock 实现 ReadWriteLock接口
    • 读锁:ReentrantReadWriteLock.ReadLock
    • 写锁:ReentrantReadWriteLock.WriteLock
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @Description 读写锁
 * @Package: juc.demo
 * @ClassName ReadWriteLockDemo
 * @author: wuwb
 * @date: 2020/10/21 17:42
 */
public class ReadWriteLockDemo {
    public static void main(String[] args) {
        CacheData cacheData = new CacheData();

        for (int i = 1; i <= 5; i++) {
            final int num = i;
            new Thread(() -> {
                cacheData.put(num + "", num + "");
            }, String.valueOf(i)).start();
        }

        for (int i = 1; i <= 5; i++) {
            final int num = i;
            new Thread(() -> {
                cacheData.get(num + "");
            }, String.valueOf(i)).start();
        }
    }
}

class CacheData{
    private volatile Map<String, Object> map = new HashMap<>();
    private ReadWriteLock lock = new ReentrantReadWriteLock();

    public void put(String key, Object value) {
        lock.writeLock().lock();//写锁
        try {
            System.out.println(Thread.currentThread().getName()+"\t 开始写入"+key);
            map.put(key, value);
            System.out.println(Thread.currentThread().getName()+"\t 写入完成");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.writeLock().unlock();
        }
    }

    public void get(String key) {
        lock.readLock().lock();//读锁
        try {
            System.out.println(Thread.currentThread().getName()+"\t 开始读取"+key);
            Object result = map.get(key);
            System.out.println(Thread.currentThread().getName()+"\t 读取完成"+result);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.readLock().unlock();
        }
    }
}

10. 阻塞队列

10.1 BlockingQueue阻塞队列

  • 栈与队列

    • 栈:先进后出,后进先出
    • 队列:先进先出FIFO
  • BlockingQueue阻塞队列

    • 阻塞:必须要阻塞/不得不阻塞
    • 是一个队列

    技术图片

    • 当队列是空的,从队列中获取元素的操作将会被阻塞
      • 试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
    • 当队列是满的,从队列中添加元素的操作将会被阻塞
      • 试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增
  • 架构

技术图片

技术图片

  • BlockingQueue核心方法

    添加、移除

    方法类型 抛出异常 不抛异常 阻塞等待 超时等待
    插入 add(e) offer(e) put(e) offer(e,time,unit)
    移除 remove() poll() take() poll(time,unit)
    检查队首元素 element() peek() - -

    *检查队首元素,可以获取到队首元素,但不是移除

    • 抛出异常
      • 当阻塞队列满时,再往队列里add插入元素会抛IllegalStateException:Queue full
      • 当阻塞队列空时,再往队列里remove移除元素会抛NoSuchElementException
      • 当阻塞队列空时,再往队列里element检查队首元素会抛NoSuchElementException
    • 有返回值,不抛异常
      • 插入方法,成功ture失败false
      • 移除方法,成功返回出队列的元素,队列里没有就返回null
    • 阻塞等待
      • 当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产者线程直到put数据or响应中断退出
      • 当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用
    • 超时等待
      • 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过限时后生产者线程会退出
      • 当阻塞队列空时,队列会阻塞消费者线程一定时间,超过限时后消费者线程会退出
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @Description 阻塞队列
     * @Package: juc.juc
     * @ClassName BlockingQueueDemo
     * @author: wuwb
     * @date: 2020/12/17 10:41
     */
    public class BlockingQueueDemo {
    
        public static void main(String[] args) throws InterruptedException {
            //testOne();
    
            //testTwo();
    
            //testThree();
    
            testFour();
        }
    
        /**
         * 超时等待
         */
        private static void testFour() throws InterruptedException {
            BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
            System.out.println(blockingQueue.offer("a"));
            System.out.println(blockingQueue.offer("b"));
            System.out.println(blockingQueue.offer("c"));
            System.out.println(blockingQueue.offer("a",3L, TimeUnit.SECONDS));
    
            System.out.println(blockingQueue.poll());
            System.out.println(blockingQueue.poll());
            System.out.println(blockingQueue.poll());
            System.out.println(blockingQueue.poll(3L, TimeUnit.SECONDS));
        }
    
        /**
         * 阻塞等待
         * @throws InterruptedException
         */
        private static void testThree() throws InterruptedException {
            BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
            blockingQueue.put("a");
            blockingQueue.put("b");
            blockingQueue.put("c");
            //blockingQueue.put("x");
            System.out.println(blockingQueue.take());
            System.out.println(blockingQueue.take());
            System.out.println(blockingQueue.take());
            System.out.println(blockingQueue.take());
    
        }
    
        /**
         * 不抛出异常
         */
        private static void testTwo() {
            BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
            System.out.println(blockingQueue.offer("a"));
            System.out.println(blockingQueue.offer("b"));
            System.out.println(blockingQueue.offer("c"));
            System.out.println(blockingQueue.offer("d"));
            System.out.println(blockingQueue.peek());
    
            System.out.println(blockingQueue.poll());
            System.out.println(blockingQueue.poll());
            System.out.println(blockingQueue.poll());
            System.out.println(blockingQueue.poll());
            /*true/true/true/false/a/a/b/c/null*/
        }
    
        /**
         * 抛出异常
         */
        private static void testOne() {
            BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
            System.out.println(blockingQueue.add("a"));
            System.out.println(blockingQueue.add("b"));
            System.out.println(blockingQueue.add("c"));
            //System.out.println(blockingQueue.add("d"));
            System.out.println(blockingQueue.element());
    
            System.out.println(blockingQueue.remove());
            System.out.println(blockingQueue.remove());
            System.out.println(blockingQueue.remove());
            System.out.println(blockingQueue.remove());
        }
    }
    

10.2 SynchronousQueue 同步队列

  • 没有容量,进去一个元素,必须等待取出来之后,才能再往里面放一个元素
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
* 同步队列
* 和其他的BlockingQueue 不一样, SynchronousQueue 不存储元素
* put了一个元素,必须从里面先take取出来,否则不能在put进去值!
*/
public class SynchronousQueueDemo {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new SynchronousQueue<>();

        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName()+" put 1");
                blockingQueue.put("1");
                System.out.println(Thread.currentThread().getName()+" put 2");
                blockingQueue.put("2");
                System.out.println(Thread.currentThread().getName()+" put 3");
                blockingQueue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "AA").start();

        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "BB").start();
    }
}

11. ThreadPool线程池

11.1 线程池的使用

  • 线程复用、控制最大并发数、管理线程

    • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
    • 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。
    • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
  • 线程池架构

    技术图片

    ? Executor,Executors,ExecutorService,ThreadPoolExecutor

11.1 三大方法

  • Executors.newFixedThreadPool(int)

    • 执行长期任务性能好,创建一个线程池,一池有N个固定的线程,有固定线程数的线程
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    • newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的是LinkedBlockingQueue
  • Executors.newSingleThreadExecutor()

    • 一个任务一个任务的执行,一池一线程
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    • newSingleThreadExecutor 创建的线程池corePoolSize和maximumPoolSize值都是1,它使用的是LinkedBlockingQueue
  • Executors.newCachedThreadPool()

    • 执行很多短期异步任务,线程池根据需要创建新线程,但在先前构建的线程可用时将重用它们。可扩容,遇强则强
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    • newCachedThreadPool创建的线程池将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,它使用的是SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
  • 从以上源码可以看出,三大方法底层均是使用ThreadPoolExecutor()来创建线程池

  • 代码

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 线程池
 * Arrays
 * Collections
 * Executors
 */
public class MyThreadPoolDemo {

    public static void main(String[] args) {
        //List list = new ArrayList();
        //List list = Arrays.asList("a","b");
        //固定数的线程池,一池五线程
		//一个银行网点,5个受理业务的窗口
		//ExecutorService threadPool =  Executors.newFixedThreadPool(5); 
        //一个银行网点,1个受理业务的窗口
		//ExecutorService threadPool =  Executors.newSingleThreadExecutor(); 
        //一个银行网点,可扩展受理业务的窗口
        ExecutorService threadPool =  Executors.newCachedThreadPool(); 

        //10个顾客请求
        try {
            for (int i = 1; i <=10; i++) {
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+"\t 办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}
  • 工作中均不使用以上三大方法来创建线程池,而是直接使用ThreadPoolExecutor()来创建线程池

    技术图片

public static void main(String[] args) {
    ExecutorService threadPool = new ThreadPoolExecutor(
        2,
        5,
        2L,
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(3),
        Executors.defaultThreadFactory(),
        //new ThreadPoolExecutor.AbortPolicy()
        //new ThreadPoolExecutor.CallerRunsPolicy()
        //new ThreadPoolExecutor.DiscardOldestPolicy()
        new ThreadPoolExecutor.DiscardOldestPolicy()
    );
    //10个顾客请求
    try {
        for (int i = 1; i <= 10; i++) {
            threadPool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + "\t 办理业务");
            });
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        threadPool.shutdown();
    }
}

11.2 七大参数

  • ThreadPoolExecutor()源码
/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
        null :
    AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • 七大参数
    • corePoolSize:线程池中的常驻核心线程数
    • maximumPoolSize:线程池中能够容纳同时执行的最大线程数,此值必须大于等于1
    • keepAliveTime:多余的空闲线程的存活时间,当前池中线程数量超过corePoolSize时,当空闲时间达到keepAliveTime时,多余线程会被销毁直到只剩下corePoolSize个线程为止
    • unit:keepAliveTime的单位
    • workQueue:任务队列,被提交但尚未被执行的任务
    • threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程,一般默认的即可
    • handler:拒绝策略,表示当队列满了,并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时如何来拒绝请求执行的runnable的策略

11.3 四种拒绝策略

  • 拒绝策略:等待队列已经排满了,再也塞不下新任务了,同时,线程池中的max线程也达到了,无法继续为新任务服务。这个是时候我们就需要拒绝策略机制合理的处理这个问题。

  • JDK内置四种拒绝策略

    技术图片

    • AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
    • CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
    • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中尝试再次提交当前任务。
    • DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。
    /**
    * new ThreadPoolExecutor.AbortPolicy() // 银行满了,还有人进来,不处理这个人的,抛出异常
    * new ThreadPoolExecutor.CallerRunsPolicy() // 哪来的去哪里!
    * new ThreadPoolExecutor.DiscardPolicy() //队列满了,丢掉任务,不会抛出异常!
    * new ThreadPoolExecutor.DiscardOldestPolicy() //队列满了,尝试去和最早的竞争,也不会抛出异常!
    */
    
  • 以上内置拒绝策略均实现了RejectedExecutionHandle接口

11.4 线程池底层运行原理

技术图片

技术图片

  • 在创建了线程池后,开始等待请求。

  • 当调用execute()方法添加一个请求任务时,线程池会做出如下判断:

    • 1如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
    • 2如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
    • 3如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务(队列中的依旧在等待);
    • 4如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
  • 当一个线程完成任务时,它会从队列中取下一个任务来执行。

  • 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:

    • 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。
  • 举例:

    • 银行有5个窗口(maximumPoolSize),2个启用(corePoolSize),3个暂停服务,且等待区有5张椅子供等待使用(workQueue),开始时前面进来的2个客户直接到启用的2个窗口办理业务,后面来的客户,先到等待区椅子上等待,当等待区5张椅子坐满后,又有人进来办业务,于是银行就启用另外3个窗口进行服务,办理完业务的窗口,直接喊等待区的人去那个窗口办理,当5个窗口都在服务,且等待区也满的时候,银行只能让保安在门口堵着(RejectedExecutionHandler),拒绝后面的人员进入(毕竟疫情期间不能挤在一起嘛!!!)
    import java.util.concurrent.*;
    
    /**
     * @Description TODO
     * @Package: juc.juc
     * @ClassName ThreadPoolDemo
     * @author: wuwb
     * @date: 2020/12/18 9:12
     */
    public class ThreadPoolDemo {
        public static void main(String[] args) {
            ExecutorService threadPool = new ThreadPoolExecutor(
                    2,
                    5,
                    2L,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<Runnable>(5),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.CallerRunsPolicy()
            );
            try {
                for (int i = 1; i <= 10; i++) {
                    final int j = i;
                    threadPool.execute(()->{
                        System.out.println(Thread.currentThread().getName() + " run " + j);
                        try {
                            TimeUnit.SECONDS.sleep(2);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    });
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                threadPool.shutdown();
            }
        }
    }
    /*
    pool-1-thread-1 run 1
    pool-1-thread-2 run 2
    pool-1-thread-3 run 8
    pool-1-thread-4 run 9
    pool-1-thread-5 run 10
    pool-1-thread-1 run 3
    pool-1-thread-4 run 4
    pool-1-thread-2 run 5
    pool-1-thread-3 run 6
    pool-1-thread-5 run 7
    
    1、2进核心线程,3、4、5、6、7进队列等待,8、9、10启用非核心线程先于队列中任务运行
    */
    

    *CPU密集型 最大线程数为CPU核数,CPU核数Runtime.getRuntime().availableProcessors();

12. 函数式接口

  • 位置:java.util.function包下

  • 接口中只能有一个抽象方法的接口,称函数式接口

  • java内置核心四大函数式接口:
    | 函数式接口 | 参数类型 | 返回类型 | 用途 |
    | --------------------- | -------- | -------- | ------------------------------------------------------------ |
    | Consumer 消费型 | T | void | 对类型为T的对象应用操作,包含方法:void accept(T t) |
    | Supplier 供给型 | 无 | T | 返回类型为T的对象,包含方法:T get(); |
    | Function<T, R> 函数型 | T | R | 对类型为T的对象应用操作,并返回结果;
    结果是R类型的对象,包含方法:R apply(T t) |
    | Predicate 断定型 | T | boolean | 确定类型为T的对象是否满足某约束,并返回boolean值,包含方法:boolean test(T t) |

    • 消费型Consumer

      • 对类型为T的对象应用操作

      技术图片

          /**
           * 消费型
           */
          public static void testConsumer() {
              /*Consumer consumer = new Consumer<String>() {
                  @Override
                  public void accept(String s) {
                      System.out.println("输入的值是:" + s);
                  }
              };*/
              Consumer consumer = (s) -> {
                  System.out.println("输入的值是:" + s);
              };
              consumer.accept("abc");
          }
      
    • 供给型Supplier

      • 返回类型为T的对象

      技术图片

          /**
           * 供给型
           */
          public static void testSupplier() {
              /*Supplier<String> supplier = new Supplier<String>() {
                  @Override
                  public String get() {
                      return "供给型接口";
                  }
              };*/
              Supplier<String> supplier = () -> {
                  return "供给型接口lambda";
              };
      
              System.out.println(supplier.get());
          }
      
  • 函数型Function<T, R>

    • 对类型为T的对象应用操作,并返回结果,结果是R类型的对象

    技术图片

        /**
         * 函数型
         */
        public static void testFunction() {
            /*Function<String, String> function = new Function<String, String>() {
                @Override
                public String apply(String s) {
                    s += "123";
                    return s;
                }
            };*/
            Function<String, String> function = (s) -> {
                s += "123lambda";
                return s;
            };
            System.out.println(function.apply("abc"));
        }
    
  • 断定型Predicate

    • 确定类型为T的对象是否满足某约束,并返回boolean值

    技术图片

        /**
         * 断定型
         */
        public static void testPredicate() {
            /*Predicate<String> predicate = new Predicate<String>() {
                @Override
                public boolean test(String s) {
                    return "1024".equals(s);
                }
            };*/
            Predicate<String> predicate = (s) -> {
                return "1024".equals(s);
            };
            System.out.println(predicate.test("1024"));
        }
    

Java多线程之JUC

标签:@param   系统   lan   multi   定时   manager   无法   影响   string   

原文地址:https://www.cnblogs.com/wuweibincqu/p/14204702.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!