.png)
/**
* 表示一个方法是否启用本地缓存,可以指定本地缓存的时间间隔,默认为一个小时
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface LocalCacheOperation {
long localCacheInterval() default 1000 * 60 * 60;
String localCacheKey() default "";
}//为缓存对象包上一层时间戳
public class CacheObject implements Serializable {
private static final long serialVersionUID = 4873268779348802945L;
private long timestamp;
private Object object;
.....
} public interface ConfigService {
public BloomFilter<String> getAllPassengerNames();
public BloomFilter<Long> getAllTrades();
}
public class ConfigServiceImpl implements ConfigService {
private PassengerManager passengerManager;
private TradeManager tradeManager;
@LocalCacheOperation(localCacheInterval=1000*60*60*24)
public BloomFilter<String> getAllPassengerNames() {
return passengerManager.getAllPassengerNames();
}
@LocalCacheOperation
public BloomFilter<Long> getAllTrades() {
return tradeManager.getAllTrades();
}
}/**
* 本地缓存切面实现
*/
@Aspect
public class LocalCacheAspect {
private static final Log log = LogFactory.getLog(LocalCacheAspect.class);
private final ConcurrentMap<String, Future<CacheObject>>
localCache = new ConcurrentHashMap<String, Future<CacheObject>>();
//控制台
private ConsoleBean consoleBean;
private final ConcurrentMap<String, SoftReference<CacheObject>>
concurrentLocalCache = new ConcurrentHashMap<String, SoftReference<CacheObject>>();
/**
* 对于执行时间较长的读数据操作,需要在这里相应的添加锁,对于操作添加锁后的函数的线程
* 如果本地缓存为空,且读数据的锁已被其他线程占据,将直接返回null
*/
private final static Map<String, Lock> localCacheLocks = new HashMap<String, Lock>();
static {
localCacheLocks.put("getAllTrades", new ReentrantLock());
}
/**
* Advice aound audit operations
*
* @param pjpParam
* @return
*/
@Around("execution(@LocalCacheOperation * *(..))")
public Object doCache(ProceedingJoinPoint pjpParam) throws Throwable {
if(!getConsoleBean().isLocalCacheSwitchOn()) {
log.warn("localCache not switch on, please pay attention");
return pjpParam.proceed(pjpParam.getArgs());
}
final ProceedingJoinPoint pjp = pjpParam;
Signature sig = pjp.getSignature();
if (sig instanceof MethodSignature) {
MethodSignature mSig = (MethodSignature) sig;
LocalCacheOperation co = mSig.getMethod().getAnnotation(
LocalCacheOperation.class);
long localCacheInterval = 0;
String localCacheKey = null;
/**
* AOP在拦截子类的Annotataion时,无法获取该Annotation,导致co可能为空
* @author chenlei.cl
*/
if( co == null ){
localCacheInterval = consoleBean.getLocalCacheInterval();
localCacheKey = mSig.getName();
} else {
localCacheInterval = co.localCacheInterval();
localCacheKey = StringUtils.isNotBlank(co.localCacheKey()) ?
co.localCacheKey() : mSig.getName();
}
if (localCacheLocks.containsKey(mSig.getMethod().getName())) { //使用本地互斥锁
return doConcurrentLocalCache(pjp, localCacheInterval, localCacheKey);
}
while (true) {// 等待某个线程将数据获取到本地缓存
Future<CacheObject> f = localCache.get(localCacheKey);
try {
long currentTime = System.currentTimeMillis();
if (f != null && f.get() != null && currentTime - f.get().getTimestamp()
> localCacheInterval) {
localCache.remove(localCacheKey, f);
f = null;
}
if (f == null) {
Callable<CacheObject> eval = new Callable<CacheObject>() {
public CacheObject call() throws InterruptedException {
Object res;
try {
res = pjp.proceed(pjp.getArgs());
}
catch (Throwable e) {
log.error("Fail to process method", e);
throw new ServiceException(e.getMessage());
}
CacheObject cacheObject = new CacheObject();
cacheObject.setObject(res);
cacheObject.setTimestamp(System.currentTimeMillis());
return cacheObject;
}
};
FutureTask<CacheObject> ft =
new FutureTask<CacheObject>(eval);
f = localCache.putIfAbsent(localCacheKey, ft);
if (f == null) {
f = ft;
ft.run();
}
}
CacheObject obj = f.get();
if (obj != null)
return obj.getObject();
}
catch (CancellationException e) {
localCache.remove(localCacheKey, f);
}
catch (ExecutionException e) {
throw new ServiceException(e.getMessage());
}
}
}
return pjp.proceed(pjp.getArgs());
}
@SuppressWarnings("static-access")
public Object doConcurrentLocalCache(ProceedingJoinPoint pjp,
long localCacheInterval, String localCacheKey) throws Throwable {
try {
long currentTime = System.currentTimeMillis();
SoftReference<CacheObject> weakRefCacheObj =
concurrentLocalCache.get(localCacheKey);
if (weakRefCacheObj != null && weakRefCacheObj.get() != null &&
currentTime - weakRefCacheObj.get().getTimestamp() > localCacheInterval) {
// 缓存过期
weakRefCacheObj.get().setObject(null); // 清空引用
concurrentLocalCache.remove(localCacheKey, weakRefCacheObj);
weakRefCacheObj = null;
} else if (weakRefCacheObj != null && weakRefCacheObj.get() != null) {
return weakRefCacheObj.get().getObject();
}
if (this.localCacheLocks.get(localCacheKey).tryLock()) {
weakRefCacheObj = concurrentLocalCache.get(localCacheKey);
if (weakRefCacheObj != null && weakRefCacheObj.get() != null) {
// double check
return weakRefCacheObj.get().getObject();
}
try {
Object res = pjp.proceed(pjp.getArgs());
CacheObject cacheObject = new CacheObject();
cacheObject.setObject(res);
cacheObject.setTimestamp(System.currentTimeMillis());
weakRefCacheObj = new SoftReference<CacheObject>(
cacheObject);
concurrentLocalCache.put(localCacheKey, weakRefCacheObj);
return res;
} finally {
this.localCacheLocks.get(localCacheKey).unlock();
}
} else {
return null; // make the other part wait
}
} catch (Exception e) {
throw new ServiceException(e.getMessage(), e);
}
}
}原文地址:http://blog.csdn.net/troy__/article/details/39320699