码迷,mamicode.com
首页 > 其他好文 > 详细

并发编程实践六:ReentrantReadWriteLock

时间:2014-05-21 16:09:03      阅读:304      评论:0      收藏:0      [点我收藏+]

标签:java   并发   线程   算法   

ReentrantReadWriteLock是一个读写锁,它提供了一个读锁和一个写锁,读锁用于只读操作,而写锁用于写入操作,读操作可以并行进行,而写操作则是互斥的。读锁和写锁的分离在一些写少读多的应用中可以带来性能上的提升,例如:一个hashmap在构造之后很少修改,却经常进行查找操作,这样查找操作就可以并发进行从而提高性能。这篇文章首先为你介绍读写锁的基本特性,在具体应用中需要解决的问题,然后介绍ReentrantReadWriteLock的功能和具体实现。

读写锁

在某些应用场合,大量的读者只会读取数据而不会修改数据,只有少量写者会修改数据,在这种情况下,可以考虑使用读写锁,读写锁可以为读者提供读锁而为写着提供写锁,定义如下:

public interface ReadWriteLock {
    Lock readLock();

    Lock writeLock();
}

读锁和写锁之间满足如下的约束:
 1)当任一线程持有写锁或读锁时,其他线程不能获得写锁;
 2)当任一线程持有写锁时,其他线程不能获取读锁;
 3)多个线程可以同时持有读锁。
这样,应用就可以做到互斥的写和并发的读,由于读的需求大大超过写,就可以达到让很多读操作并行进行的目地,从而提高性能。当然,这只有在多处理器的环境中才能有效。
满足上面约束的锁理论上就可以称为读写锁了,但在实际应用中,还有许多问题需要解决:
 1)当一个线程获取了读锁后,其它请求读锁的线程也可以获取到读锁,如果一直存在请求读锁的线程,请求写锁的线程就会一直等待,因此,我们需要考虑一个策略来避免这个问题;
 2)通常,我们都要求锁能够支持重入,那么,在读写锁中,获取了写锁的线程能够再获取读锁吗?获取了读锁的线程能够获取写锁吗?
 3)如果读写锁支持重入,那么,可以直接将一个写锁降级为读锁吗?可以直接将读锁升级为写锁吗?还是说需要考虑等待队列中的其它线程?
带着这些问题,我们开始ReentrantReadWriteLock的学习。

ReentrantReadWriteLock介绍

ReentrantReadWriteLock实现了接口ReadWriteLock,首先通过一个例子对ReentrantReadWriteLock有一个宏观的了解:

public class ReentrantReadWriteLockTest {
	private final Map<String, Object> m = new TreeMap<String, Object>();
	private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
	private final Lock r = rwl.readLock();
	private final Lock w = rwl.writeLock();

	public Object get(String key) {
		r.lock();
		try {
			return m.get(key);
		} finally {
			r.unlock();
		}
	}

	public String[] allKeys() {
		r.lock();
		try {
			return (String[]) m.keySet().toArray();
		} finally {
			r.unlock();
		}
	}

	public Object put(String key, Object value) {
		w.lock();
		try {
			return m.put(key, value);
		} finally {
			w.unlock();
		}
	}

	public void clear() {
		w.lock();
		try {
			m.clear();
		} finally {
			w.unlock();
		}
	}
}

ReentrantReadWriteLock提供了读锁r和写锁w,做读取操作的方法使用读锁r,而修改数据的操作则使用写锁w。下面具体看看ReentrantReadWriteLock具有的属性:

锁获取顺序

ReentrantReadWriteLock支持两种公平策略:非公平模式(默认)和公平模式。
非公平模式不按照线程请求锁的顺序分配锁,而是当前请求锁的线程和等待队列中的线程一起竞争锁。连续竞争的非公平锁可能会导致等待队列中的线程长时间等待,但吞吐量要高于公平锁。非公平模式中,如果等待队列的头节点是写者线程(当前执行的可能是一个写者线程或者是多个读者线程),那么新到的读者线程将进入等待队列中阻塞,写者线程在当前正在执行的一个写者线程或者多个读者线程执行完成后就会得到执行(如果存在新到写者线程,则需要竞争),防止等待队列中的写者线程一直等待。
公平模式采用近似FIFO的策略获取锁(与ReentrantLock存在同样的问题,线程调度可能导致后到的线程先获取锁),当一个线程释放了锁后,等待队列中等待时间最长的线程(单个写线程或者多个读线程)将获取锁(写锁或者读锁)。由于公平模式采用FIFO的策略获取锁,因此不存在写线程一直等待的问题。

重入

ReentrantReadWriteLock允许写者和读者按照ReentrantLock的方式多次获取读锁或写锁,但读锁和写锁的混用则有一些限制:获取写锁的线程可以再次获取读锁,但获取读锁的线程不能再次获取写锁。也就是说,如果你先获取写锁,然后获取读锁,可以成功:

//可以这样做
w.lock();
try {
	r.lock();
	try {
		// do something
	} finally {
		r.unlock();
	}
} finally {
	w.unlock();
}

而如果你先获取读锁,再获取写锁,你的线程将永远无法成功:

//线程将永远阻塞,无法完成
r.lock();
try {
	w.lock();
	try {
		// do something
	} finally {
		w.unlock();
	}
} finally {
	r.unlock();
}

由于写锁可以获取读锁,因此ReentrantReadWriteLock支持锁降级,即线程获取了写锁后,再获取读锁,然后释放写锁,线程的锁就从写锁降级为了读锁:

w.lock();
try {
	// 做写入操作
} catch (Exception e) {
	// process exception
}
r.lock();
try {
	w.unlock();
	// 任然持有读锁
} finally {
	r.unlock();
}

但由于读锁不能获取写锁,因此读锁是无法升级到写锁的。

锁的数量

ReentrantReadWriteLock支持65535个递归写入锁和65535个读取锁,我们知道AQS中使用了一个整型变量来保存同步状态,后面我们将看到ReentrantReadWriteLock是怎么通过一个整型变量来保存写入锁和读取锁的。

condition支持

写入锁提供了一个Condition实现,对于写入锁来说,该实现的行为与 ReentrantLock.newCondition()提供的Condition 实现对ReentrantLock所做的行为相同,但是此Condition只能用于写入锁。
读取锁不支持 Condition,readLock().newCondition()会抛出UnsupportedOperationException。

监测

ReentrantReadWriteLock提供了一些方法用与监视系统状态,在检测和监视时很有帮组。

了解了ReentrantReadWriteLock的特性后,我们下面就深入到它的内部去看看这些功能都是怎么实现的。

ReentrantReadWriteLock的实现

同步状态

在AQS中,同步状态是使用一个整型变量来保存,而在ReentrantReadWriteLock中,由于锁分化为了读锁和写锁,就需要两个同步状态,一个保存读锁的状态,一个保存写锁的状态,ReentrantReadWriteLock是将AQS中同步状态的整型变量分为了两个部分来实现的,低位的16位用于保存写锁状态,而高位的16位用于保存读锁状态,具体定义如下:

	static final int SHARED_SHIFT   = 16;
	static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
	static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
	static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

	/** 返回共享锁(读锁)的数量 */
	static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
	/** 返回排它锁(写锁)的数量 */
	static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

SHARED_SHIFT表示读锁和写锁占用的位数;SHARED_UNIT表示每增加一个读锁需要增加的值;MAX_COUNT表述读锁和写锁的最大值,即65535;EXCLUSIVE_MASK用于写锁占用位。
sharedCount用于获取共享锁(读锁)数量,因为读锁是共享锁;
exclusiveCount用于获取排它锁(写锁)数量,因为写锁是排它锁。

读锁和写锁

ReentrantReadWriteLock实现了ReadWriteLock接口,提供了两个方法,一个用于获取读锁,一个用于获取写锁:

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

在ReentrantReadWriteLock中分别定义了读锁和写锁,读锁的定义如下:

public static class ReadLock implements Lock, java.io.Serializable {
	private final Sync sync;

	protected ReadLock(ReentrantReadWriteLock lock) {
		sync = lock.sync;
	}

	public void lock() {
		sync.acquireShared(1);
	}

	public void unlock() {
		sync.releaseShared(1);
	}

	public Condition newCondition() {
		throw new UnsupportedOperationException();
	}
	。。。。。。
}

lock.sync是AQS在ReentrantReadWriteLock中的实现类,lock方法调用了AQS的acquireShared方法,而unlock方法调用了AQS的releaseShared方法,因此ReadLock使用的是AQS的共享模式(参考“并发编程实践二:AbstractQueuedSynchronizer”),且ReadLock不支持newCondition操作。
下面是写锁的定义:

public static class WriteLock implements Lock, java.io.Serializable {
	private final Sync sync;

	protected WriteLock(ReentrantReadWriteLock lock) {
		sync = lock.sync;
	}

	public void lock() {
		sync.acquire(1);
	}

	public void unlock() {
		sync.release(1);
	}

	public Condition newCondition() {
		return sync.newCondition();
	}
	。。。。。。
}

WriteLock的lock方法调用了AQS的acquire方法,而unlock调用了AQS的release方法,因此WriteLock使用了AQS的排它模式(参考“并发编程实践五:ReentrantLock”),WriteLock支持newCondition操作。

非公平模式

ReentrantReadWriteLock默认就是非公平模式,下面我们从写锁开始。

写锁

写锁使用的是AQS的互斥模式,线程通过WriteLock的lock方法请求锁,WriteLock的lock将调用AQS的acquire方法,在AQS的acquire中将调用tryAcquire来尝试获取锁,下面就是tryAcquire的实现:

protected final boolean tryAcquire(int acquires) {
	Thread current = Thread.currentThread();
	int c = getState();		//1
	int w = exclusiveCount(c);		//2
	if (c != 0) {		//3
		if (w == 0 || current != getExclusiveOwnerThread())	//4
			return false;
		if (w + exclusiveCount(acquires) > MAX_COUNT)	//5
			throw new Error("Maximum lock count exceeded");
		setState(c + acquires);		//6
		return true;
	}
	if (writerShouldBlock() ||		//7
		!compareAndSetState(c, c + acquires))	//8
		return false;
	setExclusiveOwnerThread(current);	//9
	return true;
}

在步骤1中,获取到当前的同步状态c,步骤2从同步状态c中获取到写锁数量;如果c不为0,则表示锁被占用,进入步骤4;步骤4中如果写锁数量w为0,则表示当前已经存在读者线程获取了锁,写者线程获取锁失败(不允许获取了读锁的线程再次获取写锁),否则,则表示已经有写者线程获取了锁,如果获取锁的线程为当前线程,则可以再次获取锁(获取了写锁的线程可以再次获取写锁),否则,写者线程获取锁失败;步骤5判断写锁的数量是否已经超出限制(只有c大于0后才会出现,为什么?);步骤6表示写锁获取成功,修改锁状态(这里只会有一个线程能够进入,为什么?),返回true。
如果c为0(表示锁空闲),则进入步骤7,writerShouldBlock判断写者线程是否应该阻塞:

final boolean writerShouldBlock() {
	return false; // writers can always barge
}

具体实现是直接返回false,也就是写者总是不阻塞,然后在步骤8中修改锁状态,成功后在步骤9设置当前的owner线程为自己(只会有一个线程能到达这里,为什么?),返回true。
如果tryAcquire返回false,接下去的执行流程就和ReentrantLock中描述的是一致的了,接下来我们看看unlock操作。
线程通过调用WriteLock的unlock方法释放锁,WriteLock的unlock方法调用AQS的release方法,AQS的release方法将使用tryRelease来释放锁:

protected final boolean tryRelease(int releases) {
	if (!isHeldExclusively())	//1
		throw new IllegalMonitorStateException();
	int nextc = getState() - releases;	//2
	boolean free = exclusiveCount(nextc) == 0;	//3
	if (free)
		setExclusiveOwnerThread(null);	//4
	setState(nextc);	//5
	return free;
}

步骤1中isHeldExclusively将判断当前线程是否是锁的owner:

protected final boolean isHeldExclusively() {
	return getExclusiveOwnerThread() == Thread.currentThread();
}

只有锁的owner才能进入下面的步骤,因此步骤2只会有一个线程能够到达,步骤2获取释放后的锁数量,步骤3判断判断当前的写锁数量是否已经为0,为0则表示写锁已经完全释放(由于锁重入,同一个线程需要多次unlock才能释放完锁),进入步骤4将当前owner设置为空,然后在步骤5设置锁状态,返回free(锁是否释放完的标志)。
tryRelease结束后的处理就和ReentrantLock中的处理一致了。

读锁

读锁使用的AQS中的共享模式,线程通过ReadLock的lock方法请求锁,ReadLock的lock方法调用AQS的acquireShared方法,在AQS的acquireShared方法中通过tryAcquireShared请求锁:

protected final int tryAcquireShared(int unused) {
	Thread current = Thread.currentThread();
	int c = getState();		//1
	if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)	//2
		return -1;
	int r = sharedCount(c);		//3
	if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {//4
		if (r == 0) {
			firstReader = current;	
			firstReaderHoldCount = 1;
		} else if (firstReader == current) {
			firstReaderHoldCount++;	
		} else {
			HoldCounter rh = cachedHoldCounter;		//10
			if (rh == null || rh.tid != getThreadId(current))
				cachedHoldCounter = rh = readHolds.get();
			else if (rh.count == 0)
				readHolds.set(rh);	//11
			rh.count++;
		}
		return 1;
	}
	return fullTryAcquireShared(current);
}

首先步骤1获取锁状态,步骤2判断如果写锁数量不为0且当前线程不是写锁的owner(获取了写锁的线程可以再获取读锁,因此如果当前线程是写锁的owner,则允许成功),则失败。
步骤2通过后,在步骤3中将获取读锁数量,在步骤4中首先通过readerShouldBlock判断读锁是否应该阻塞:

final boolean readerShouldBlock() {
	return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
	Node h, s;
	return (h = head) != null &&
		(s = h.next)  != null &&
		!s.isShared()         &&
		s.thread != null;
}

由于读锁不应该让写锁始终等待,因此apparentlyFirstQueuedIsExclusive判断等待队列中的第一个节点如果存在且是写锁(!s.isShared())(是否可能存在且是读锁?如果等待队列中存在写锁但不在队列首,会出现什么情况?),则readerShouldBlock返回true(表示请求读锁的线程请求失败)。
再思考这样的问题:如果apparentlyFirstQueuedIsExclusive判断的过程中,h发生了变化(h已经出队列),会发生什么?(提示:结合AQS中的doAcquireShared方法来分析,整个共享模式的流程参考“并发编程实践二:AbstractQueuedSynchronizer”)。
如果readerShouldBlock返回false,且读锁数量小于MAX_COUNT,就可以尝试将读锁数量修改到c + SHARED_UNIT(为什么加SHARED_UNIT?SHARED_UNIT值在“同步状态”有描述)。如果步骤4中的条件都通过了,就表示读锁获取成功了,接下来的操作就是设置计数信息了,这些计数信息表示线程获取到的读锁数量。
首先当r为0的时候(会否出现两个线程都获取到r为0的情况?),设置firstReader为自己,且设置firstReaderHoldCount为1,看这两个变量的定义:

private transient Thread firstReader = null; //第一个读者线程
private transient int firstReaderHoldCount;  //第一个读者线程持有的读锁数量

这里有个问题,为什么这两个变量不用申明为volatile?主要原因是如果是firstReader线程访问这两个变量,总能取到正确的值,而其它线程只会访问firstReader,判断firstReader是否等于自己,即使取到不正确的值判断的结果任然是正确的。
而下面需要注意的就是HoldCounter和ThreadLocalHoldCounter:

static final class HoldCounter {
	int count = 0;
	final long tid = getThreadId(Thread.currentThread());
}
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
	public HoldCounter initialValue() {
		return new HoldCounter();
	}
}

HoldCounter用于保存线程的锁计数,getThreadId是获取线程的id,每个线程的id都是唯一的(即Thread的tid字段的值);而ThreadLocalHoldCounter是一个ThreadLocal,ThreadLocal为每个线程提供一个对应值,initialValue用于当线程第一次调用get方法时调用,返回的值将作为线程的对应值保存在线程中,以后每次这个线程调用ThreadLocal的get方法,都会返回这个值。
tryAcquireShared的步骤10就是为除了firstReader外的所有线程分配一个HoldCounter保存它们的读锁数量,而步骤11之所以存在是因为在对应的release操作中,如果rh.count为0,rh就会被从ThreadLocal中remove掉。计数信息设置完成后线程就返回成功了。
我们再次回到步骤4,如果步骤4失败,则进入fullTryAcquireShared操作:

final int fullTryAcquireShared(Thread current) {
	HoldCounter rh = null;
	for (;;) {
		int c = getState();
		if (exclusiveCount(c) != 0) {  //1
			if (getExclusiveOwnerThread() != current)  //2
				return -1;
		} else if (readerShouldBlock()) {  //3
			//当前线程是firstReader,已经获取过读锁
			if (firstReader == current) {  //4
			} else {
				//取当前线程的读锁计数
				if (rh == null) {  //5
					rh = cachedHoldCounter;
					if (rh == null || rh.tid != getThreadId(current)) {
						rh = readHolds.get();
						if (rh.count == 0)
							readHolds.remove();
					}
				}
				//计数为0,则失败,否则表示已经获取过读锁,允许再次获取
				if (rh.count == 0)  //6
					return -1;
			}
		}
		if (sharedCount(c) == MAX_COUNT) //7
			throw new Error("Maximum lock count exceeded");
		if (compareAndSetState(c, c + SHARED_UNIT)) { //8
			if (sharedCount(c) == 0) {
				firstReader = current;
				firstReaderHoldCount = 1;
			} else if (firstReader == current) {
				firstReaderHoldCount++;
			} else {
				if (rh == null)
					rh = cachedHoldCounter;
				if (rh == null || rh.tid != getThreadId(current))
					rh = readHolds.get();
				else if (rh.count == 0)
					readHolds.set(rh);
				rh.count++;
				cachedHoldCounter = rh; // cache for release
			}
			return 1;
		}
	}
}

在fullTryAcquireShared中是一个循环,直到某些判定条件满足后才退出。首先线程做一些预判条件看是否能够获取读锁(步骤1-6):
 1)写锁数量不为0(步骤1),且当前线程不是写锁owner(步骤2),则失败;
 2)readerShouldBlock返回true(步骤3),若线程没有获取过读锁,则失败。
通过上面的预判断后,步骤7将确保锁的数量没有超过限制。
步骤8为线程分配锁,如果步骤8成功,则开始设置线程读锁的计数信息,这里同tryAcquireShared中是一致的;步骤8如果失败,则会导致循环重试。
当一个线程要释放读锁,则调用ReadLock的unlock方法,ReadLock的unlock方法将调用AQS的releaseShared,而给方法使用tryReleaseShared来释放锁:

protected final boolean tryReleaseShared(int unused) {
	Thread current = Thread.currentThread();
	if (firstReader == current) {
		if (firstReaderHoldCount == 1)
			firstReader = null;
		else
			firstReaderHoldCount--;
	} else {
		HoldCounter rh = cachedHoldCounter;
		if (rh == null || rh.tid != getThreadId(current))
			rh = readHolds.get();
		int count = rh.count;
		if (count <= 1) {
			readHolds.remove();
			if (count <= 0) //1
				throw unmatchedUnlockException();
		}
		--rh.count;
	}
	for (;;) {
		int c = getState();
		int nextc = c - SHARED_UNIT;
		if (compareAndSetState(c, nextc))
			return nextc == 0;
	}
}

tryReleaseShared中首先修改线程对应的读锁计数信息,如果线程未获取过读锁,那么在步骤1的地方判断将成功,然后抛出unmatchedUnlockException异常,由于每个线程只会修改自己的计数信息,因此整个过程是安全的。
修改完计数信息后,线程将进入一个循环中尝试修改锁状态,直到成功,然后返回读锁是否已经全部释放(nextc == 0),如果读锁全部释放,则等待队列首位的写者线程(如果存在)将被唤醒(为什么是写者线程?)。
整个非公平锁的流程到这里就分析完成了,接下来我们来看看公平锁与非公平锁的差异。

公平模式

写锁

公平锁的写锁流程和非公平锁的写锁流程是一致的,唯一的差异就在于在tryAcquire中对writerShouldBlock的判断:

final boolean writerShouldBlock() {
	return hasQueuedPredecessors();
}
public final boolean hasQueuedPredecessors() {
	Node t = tail; 
	Node h = head;
	Node s;
	return h != t &&
		((s = h.next) == null || s.thread != Thread.currentThread());
}

hasQueuedPredecessors方法在ReentrantLock已经讲过,用于判断等待队列中是否存在等待线程,如果存在等待线程,则厚道的线程将进入等待队列中阻塞等待。

读锁

公平锁的读锁流程和非公平锁的读锁流程中唯一的差别就在于tryAcquireShared中对readerShouldBlock的判断:

final boolean readerShouldBlock() {
	return hasQueuedPredecessors();
}

和公平锁的写锁的writerShouldBlock实现一致,如果等待队列中存在等待线程,则自己进入等待队列中阻塞等待。
ReentrantReadWriteLock的公平锁也和ReentrantLock的公平锁一样,线程的调度可能导致后来的请求线程比先到的请求线程先获取到锁。

结束语

读写锁中提供了读锁和写锁,它们之间需要满足一定的约束关系,这篇文章通过对ReentrantReadWriteLock的源码分析介绍了读写锁实现中会遇到的问题,以及解决办法。
文章中提到了一些问题,帮助你阅读过程中思考,通过仔细分析应该都可以解决。

并发编程实践六:ReentrantReadWriteLock,布布扣,bubuko.com

并发编程实践六:ReentrantReadWriteLock

标签:java   并发   线程   算法   

原文地址:http://blog.csdn.net/tomato__/article/details/26373605

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!