码迷,mamicode.com
首页 > 编程语言 > 详细

【原创】Java并发编程系列17 | 读写锁八讲(上)

时间:2020-11-24 12:50:31      阅读:5      评论:0      收藏:0      [点我收藏+]

标签:getname   after   ali   next   调度   接口   alt   准备   rgs   

【原创】Java并发编程系列17 | 读写锁八讲(上)
收录于话题
#进阶架构师 | 并发编程专题
12个

点击上方“java进阶架构师”,选择右上角“置顶公众号”
20大进阶架构专题每日送达
技术图片
技术图片

写在前面


本文为何适原创并发编程系列第 17 篇,文末有本系列文章汇总。
通过以下几部分来分析Java提供的读写锁ReentrantReadWriteLock:
为什么需要读写锁
读写锁的使用Demo
ReentrantReadWriteLock类结构
记录读写锁状态
源码分析读锁的获取与释放
源码分析写锁的获取与释放
锁降级
读写锁应用
本文涉及到上下文联系较多,经常需要上下滑动查看,篇幅太多很不方便,而且文章太长阅读体验也不好,所以分成读写锁(上)和读写锁(下)两篇。本文是上篇,只写到“源码分析读锁的获取与释放”。

1. 为什么需要读写锁


在并发编程中解决线程安全的问题,通常使用的都是java提供的关键字synchronized或者重入锁ReentrantLock。它们都是独占式获取锁,也就是在同一时刻只有一个线程能够获取锁。
但是在大多数场景下,大部分时间都是读取共享资源,对共享资源的写操作很少。然而读服务不存在数据竞争问题,如果一个线程在读时禁止其他线程读势必会导致性能降低。
针对这种读多写少的情况,java还提供了另外一个实现Lock接口的ReentrantReadWriteLock(读写锁)。读写锁允许共享资源在同一时刻可以被多个读线程访问,但是在写线程访问时,所有的读线程和其他的写线程都会被阻塞。

2. 使用Demo


直接上代码:

public class ReadWriteLockTest {
    public static void main(String[] args) {
        final Data data = new Data();

        for (int i = 0; i < 3; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        data.get();
                    }
                }
            }, "读锁线程-" + i).start();

        }

        for (int i = 0; i < 3; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        data.put(new Random().nextInt(10000));
                    }
                }
            }, "写锁线程-" + i).start();

        }

    }
}

class Data {
    private Object data = 0;// 共享数据,只能有一个线程能写该数据,但可以有多个线程同时读该数据。
    private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    public void get() {
        rwl.readLock().lock();// 上读锁,其他线程只能读不能写
        System.out.println(Thread.currentThread().getName() + " 开始读取数据");

        try {
            Thread.sleep((long) (Math.random() * 1000));
            System.out.println(Thread.currentThread().getName() + " 读取数据完成 " + data);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            rwl.readLock().unlock(); // 释放读锁
        }
    }

    public void put(Object data) {
        rwl.writeLock().lock();// 上写锁,不允许其他线程读也不允许写
        System.out.println(Thread.currentThread().getName() + " 开始写数据");

        try {
            Thread.sleep((long) (Math.random() * 1000));
            this.data = data;
            System.out.println(Thread.currentThread().getName() + " 写数据完成 " + data);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            rwl.writeLock().unlock();// 释放写锁
        }
    }
}

结果:
读锁线程-0 开始读取数据
读锁线程-1 开始读取数据
读锁线程-2 开始读取数据
读锁线程-0 读取数据完成 0
读锁线程-1 读取数据完成 0
读锁线程-2 读取数据完成 0
写锁线程-0 开始写数据
写锁线程-0 写数据完成 4306
...
写锁线程-1 开始写数据
写锁线程-1 写数据完成 9114
...
写锁线程-2 开始写数据
写锁线程-2 写数据完成 7709
Data类的共享数据data,get()方法上读锁读data,put()方法上写锁写data。启动3个线程读data,3个线程写data。
从结果可以看出,读锁是共享的,读锁的三个线程是同时读取共享数据data的;写锁是互斥的,写锁的三个线程是依次写共享数据data的。

3. 类结构


public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    // 属性
    private final ReentrantReadWriteLock.ReadLock readerLock; // 读锁
    private final ReentrantReadWriteLock.WriteLock writerLock; // 写锁
    final Sync sync; // 锁的主体AQS

    // 内部类
    abstract static class Sync extends AbstractQueuedSynchronizer {}
    static final class FairSync extends Sync {}
    static final class NonfairSync extends Sync {}
    public static class ReadLock implements Lock, java.io.Serializable {}
    public static class WriteLock implements Lock, java.io.Serializable {}

    // 构造
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
}

ReentrantReadWriteLock与ReentrantLock一样,其锁主体依然是Sync,读写锁其实就是两个属性:readerLock、writerLock。
一个ReentrantReadWriteLock对象都对应着读锁和写锁两个锁,而这两个锁是通过同一个sync(AQS)实现的。

4. 记录读写锁状态


我们知道AQS.state使用来表示同步状态的。ReentrantLock中,state=0表示没有线程占用锁,state>0时state表示线程的重入次数。但是读写锁ReentrantReadWriteLock内部维护着两个锁,需要用state这一个变量维护多种状态,应该怎么办呢?
读写锁采用“按位切割使用”的方式,将state这个int变量分为高16位和低16位,高16位记录读锁状态,低16位记录写锁状态,并通过位运算来快速获取当前的读写锁状态。

abstract static class Sync extends AbstractQueuedSynchronizer {
    // 将state这个int变量分为高16位和低16位,高16位记录读锁状态,低16位记录写锁状态
    static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    /**
     * 获取读锁的状态,读锁的获取次数(包括重入)
     * c无符号补0右移16位,获得高16位
     */
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

    /**
     * 获取写锁的状态,写锁的重入次数
     * c & 0x0000FFFF,将高16位全部抹去,获得低16位
     */
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}

记录获取锁的线程

线程获取写锁后,和重入锁一样,将AQS.exclusiveOwnerThread置为当前线程。但是读锁是共享的,可以多个线程同时获取读锁,那么如何记录获取读锁的多个线程以及每个线程的重入情况呢?
sycn中提供了一个HoldCounter类,类似计数器,用于记录一个线程读锁的重入次数。将HoldCounter通过ThreadLocal与线程绑定。
源码如下:

abstract static class Sync extends AbstractQueuedSynchronizer {
    // 这个嵌套类的实例用来记录每个线程持有的读锁数量(读锁重入)
    static final class HoldCounter {
        int count = 0;// 读锁重入次数
        final long tid = getThreadId(Thread.currentThread());// 线程 id
    }

    // ThreadLocal 的子类
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }

    // 组合使用上面两个类,用一个 ThreadLocal 来记录当前线程持有的读锁数量
    private transient ThreadLocalHoldCounter readHolds;

    private transient HoldCounter cachedHoldCounter;// 记录"最后一个获取读锁的线程"的读锁重入次数,用于缓存提高性能
    private transient Thread firstReader = null;// 第一个获取读锁的线程(并且其未释放读锁)
    private transient int firstReaderHoldCount;// 第一个获取读锁的线程重入的读锁数量
}
注:属性cachedHoldCounter、firstReader、firstReaderHoldCount都是为了提高性能,目前不用太关注。
(ThreadLocal在之后的文章中会专门讲解)

线程与HoldCounter的存储结构如下图:
技术图片

5. 读锁获取


查看使用示例中代码rwl.readLock().lock()的实现

/**
 * rwl.readLock().lock()-->ReadLock.lock()
 */
public void lock() {
    sync.acquireShared(1);
}

/**
 * ReadLock.lock()-->AQS.acquireShared(int)
 */
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

doAcquireShared():

doAcquireShared():尝试获取读锁,获取到锁返回1,获取不到返回-1。
首先来分析一下可以获取读锁的条件:
当前锁的状态
1)读锁写锁都没有被占用
2)只有读锁被占用
3)写锁被自己线程占用
简单总结,只有在其它线程持有写锁时,不能获取读锁,其它情况都可以去获取。
AQS队列中的情况,如果是公平锁,同步队列中有线程等锁时,当前线程是不可以先获取锁的,必须到队列中排队。
读锁的标志位只有16位,最多只能有2^16-1个线程获取读锁或重入
看源码:

/**
 * 尝试获取读锁,获取到锁返回1,获取不到返回-1
 */
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    /*
     * 根据锁的状态判断可以获取读锁的情况:
     * 1. 读锁写锁都没有被占用
     * 2. 只有读锁被占用
     * 3. 写锁被自己线程占用
     * 总结一下,只有在其它线程持有写锁时,不能获取读锁,其它情况都可以去获取。
     */
    if (exclusiveCount(c) != 0 && // 写锁被占用
        getExclusiveOwnerThread() != current) // 持有写锁的不是当前线程
        return -1;

    int r = sharedCount(c);
    if (!readerShouldBlock() && // 检查AQS队列中的情况,看是当前线程是否可以获取读锁,下文有详细讲解。
        r < MAX_COUNT &&        // 读锁的标志位只有16位,最多之能有2^16-1个线程获取读锁或重入
        compareAndSetState(c, c + SHARED_UNIT)) {// 在state的第17位加1,也就是将读锁标志位加1
        /*
         * 到这里已经获取到读锁了
         * 以下是修改记录获取读锁的线程和重入次数,以及缓存firstReader和cachedHoldCounter
         */
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }

    /*
     * 到这里
     * 没有获取到读锁,因为上面代码获取到读锁的话已经在上一个if里返回1了
     * 锁的状态是满足获取读锁的,因为不满足的上面返回-1了
     * 所以没有获取到读锁的原因:AQS队列不满足获取读锁条件,或者CAS失败,或者16位标志位满了
     * 像CAS失败这种原因,是一定要再尝试获取的,所以这里再次尝试获取读锁,fullTryAcquireShared()方法下文有详细讲解
     */
    return fullTryAcquireShared(current);
}

readerShouldBlock():

readerShouldBlock():检查AQS队列中的情况,看是当前线程是否可以获取读锁,返回true表示当前不能获取读锁。
分别看下公平锁和非公平锁的实现。
公平锁FairSync:
对于公平锁来说,如果队列中还有线程在等锁,就不允许新来的线程获得锁,必须进入队列排队。
hasQueuedPredecessors()方法在重入锁的文章中分析过,判断同步队列中是否还有等锁的线程,如果有其他线程等锁,返回true当前线程不能获取读锁。


final boolean readerShouldBlock() {
    return hasQueuedPredecessors();
}

非公平锁NonfairSync:
对于非公平锁来说,原本是不需要关心队列中的情况,有机会直接尝试抢锁就好了,这里问什么会限制获取锁呢?
这里给写锁定义了更高的优先级,如果队列中第一个等锁的线程请求的是写锁,那么当前线程就不能跟那个马上就要获取写锁的线程抢,这样做很好的避免了写锁饥饿。


/**
 * 队列中第一个等锁的线程请求的是写锁时,返回true,当前线程不能获取读锁
 */
final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}
// 返回true-队列中第一个等锁的线程请求的是写锁
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         && // head后继节点线程请求写锁
        s.thread != null;

}
fullTryAcquireShared()

tryAcquireShared()方法中因为CAS抢锁失败等原因没有获取到读锁的,fullTryAcquireShared()再次尝试获取读锁。此外,fullTryAcquireShared()还处理了读锁重入的情况。

/**
 * 再次尝试获取读锁
 */
final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {// 注意这里是循环
        int c = getState();
        if (exclusiveCount(c) != 0) {
            // 仍然是先检查锁状态:在其它线程持有写锁时,不能获取读锁,返回-1
            if (getExclusiveOwnerThread() != current)
                return -1;
        } else if (readerShouldBlock()) {
            /*
             * exclusiveCount(c) == 0 写锁没有被占用
             * readerShouldBlock() == true,AQS同步队列中的线程在等锁,当前线程不能抢读锁
             * 既然当前线程不能抢读锁,为什么没有直接返回呢?
             * 因为这里还有一种情况是可以获取读锁的,那就是读锁重入。
             * 以下代码就是检查如果不是重入的话,return -1,不能继续往下获取锁。
             */
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }

        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");

        // CAS修改读锁标志位,修改成功表示获取到读锁;CAS失败,则进入下一次for循环继续CAS抢锁
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            /*
             * 到这里已经获取到读锁了
             * 以下是修改记录获取读锁的线程和重入次数,以及缓存firstReader和cachedHoldCounter
             */
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

doAcquireShared()

再回到最开始的acquireShared(),tryAcquireShared()抢锁成功,直接返回,执行同步代码;如果tryAcquireShared()抢锁失败,调用doAcquireShared()。
doAcquireShared()应该比较熟悉了吧,类似AQS那篇中分析过acquireQueued():
将当前线程构成节点node
如果node是head的后继节点就可以继续尝试抢锁
如果node不是head的后继节点,将node加入队列的队尾,并将当前线程阻塞,等待node的前节点获取、释放锁之后唤醒node再次抢锁。
node抢到读锁之后执行setHeadAndPropagate()方法,setHeadAndPropagate()是获取读锁的特殊之处,下文分析。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);// 把当前线程构造成节点,Node.SHARED表示共享锁
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {// 前驱节点是head,node才能去抢锁
                int r = tryAcquireShared(arg);// 抢锁,上文分析了
                if (r >= 0) {// r>0表示抢锁成功
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 判断node前驱节点状态,将当前线程阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

setHeadAndPropagate()

试想一种情况:当线程1持有写锁时,线程2、线程3、线程4、线程5...来获取读锁是获取不到的,只能排进同步队列。当线程1释放写锁时,唤醒线程2来获取锁。因为读锁是共享锁,当线程2获取到读锁时,线程3也应该被唤醒来获取读锁。
setHeadAndPropagate()方法就是在一个线程获取读锁之后,唤醒它之后排队获取读锁的线程的。该方法可以保证线程2获取读锁后,唤醒线程3获取读锁,线程3获取读锁后,唤醒线程4获取读锁,直到遇到后继节点是要获取写锁时才结束。

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    setHead(node);// 因为node获取到锁了,所以设置node为head
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())// node后继节点线程要获取读锁,此时node就是head
            doReleaseShared();// 唤醒head后继节点(也就是node.next)获取锁
    }
}

6. 读锁释放


理解了上文读锁的获取过程,读锁的释放过程不看源码也应该可以分析出来:
处理firstReader、cachedHoldCounter、readHolds获取读锁线程及读锁重入次数。
修改读锁标志位state的高16位。
释放读锁之后,如果队列中还有线程等锁,唤醒同步队列head后继节点等待写锁的线程。
这里为什么是写锁?因为线程持有读锁时会把它之后要获取读锁的线程全部唤醒直到遇到写锁。
使用示例中释放读锁代码 rwl.readLock().unlock()


/**
 * rwl.readLock().unlock()-->ReadLock.unlock()
 */
public void unlock() {
    sync.releaseShared(1);
}

/**
 * sync.releaseShared(1)-->AQS.releaseShared(int)
 */
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {// 当前线程释放读锁,下文介绍
        /*
         * 到这里,已经没有任何线程占用锁,调用doReleaseShared()唤醒之后获取写锁的线程
         * 如果同步队列中还有线程在排队,head后继节点的线程一定是要获取写锁,因为线程持有读锁时会把它之后要获取读锁的线程全部唤醒
         */
        doReleaseShared();// 唤醒head后继节点获取锁
        return true;
    }
    return false;
}

/**
 * 释放读锁
 * 当前线程释放读锁之后,没有线程占用锁,返回true
 */
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    // 处理firstReader、cachedHoldCounter、readHolds获取读锁线程及读锁重入次数
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }

    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;// state第17位-1,也就是读锁状态标志位-1
        if (compareAndSetState(c, nextc))// CAS设置state,CAS失败自旋进入下一次for循环
            return nextc == 0;// state=0表示没有线程占用锁,返回true
    }
}

总结


大多数业务场景,都是读多写少的,采用互斥锁性能较差,所以提供了读写锁。读写锁允许共享资源在同一时刻可以被多个读线程访问,但是在写线程访问时,所有的读线程和其他的写线程都会被阻塞。
一个ReentrantReadWriteLock对象都对应着读锁和写锁两个锁,而这两个锁是通过同一个sync(AQS)实现的。
读写锁采用“按位切割使用”的方式,将state这个int变量分为高16位和低16位,高16位记录读锁状态,低16位记录写锁状态。
读锁获取时,需要判断当时的写锁没有被其他线程占用即可,锁处于的其他状态都可以获取读锁。
参考资料
《Java 并发编程之美》
《Java 并发编程实战》
《Java 并发编程的艺术》

并发系列文章汇总


【原创】01|开篇获奖感言
【原创】02|并发编程三大核心问题
【原创】03|重排序-可见性和有序性问题根源
【原创】04|Java 内存模型详解
【原创】05|深入理解 volatile
【原创】06|你不知道的 final
【原创】07|synchronized 原理
【原创】08|synchronized 锁优化
【原创】09|基础干货
【原创】10|线程状态
【原创】11|线程调度
【原创】13|LockSupport
【原创】14|AQS 源码分析
【原创】15|重入锁 ReentrantLock
【原创】16|公平锁与非公平锁

之前,给大家发过三份Java面试宝典,这次新增了一份,目前总共是四份面试宝典,相信在跳槽前一个月按照面试宝典准备准备,基本没大问题。
《java面试宝典5.0》(初中级)
《350道Java面试题:整理自100+公司》(中高级)
《资深java面试宝典-视频版》(资深)
《Java[BAT]面试必备》(资深)
分别适用于初中级,中高级,资深级工程师的面试复习。
内容包含java基础、javaweb、mysql性能优化、JVM、锁、百万并发、消息队列,高性能缓存、反射、Spring全家桶原理、微服务、Zookeeper、数据结构、限流熔断降级等等。
技术图片
获取方式:点“在看”,V信关注上述单号并回复 【面试】即可领取,更多精彩陆续奉上。

看到这里,证明有所收获
必须点个在看支持呀,喵

【原创】Java并发编程系列17 | 读写锁八讲(上)

标签:getname   after   ali   next   调度   接口   alt   准备   rgs   

原文地址:https://blog.51cto.com/15009303/2552741

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!