标签:zha extends operation rds cti text resource str evel
主要的消息管理者对象:
package com.rynk.mugua.trading.biz.service.impl;
import java.util.concurrent.DelayQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.rynk.commons.entity.QueueMessage;
import com.rynk.mugua.trading.biz.commons.RedisKeyResolver;
import com.rynk.mugua.trading.biz.commons.lock.DistributedLockHandler;
import com.rynk.mugua.trading.biz.eth.DelayedTack;
import lombok.extern.slf4j.Slf4j;
/**
* 延时消息管理员
* @author ZHANGYUKUNUP
*
*/
@Component
@Slf4j
public class QueueManger {
MessagePersistent messagePersistent;
/**
* 延时消息队列
*/
private DelayQueue<DelayedTack> dQueue = new DelayQueue<>();
/**
* 消息任务处理线程
*/
private Thread taskThread;
@Autowired
DistributedLockHandler lock;
public QueueManger() {
taskThread = new TaskThread();
taskThread.start();
}
/**
* 任务线程
* @author ZHANGYUKUNUP
*
*/
class TaskThread extends Thread{
@Override
public void run() {
while (true) {
try {
DelayedTack delayedTack = dQueue.take();
QueueMessage queueMessage = delayedTack.getQueueMessage();
if( queueMessage == null ) {
return ;
}
//简单的加个锁保证消息不被重复消费(需要保证解锁前 数据被提交到数据库,否者会出同步问题 ,也就是说不能有更加大的 事务范围 包裹当前方法 )
if( lock.tryLock( RedisKeyResolver.getMsgrKey( queueMessage.getId() ) ) ) {
//如果这个消息被正常消费,那么久标记消费成功,如果异常消费,那么久重试这个消息
try {
if( QueueManger.this.messageDispense(delayedTack.getQueueMessage()) ) {
messagePersistent.succeed( queueMessage );
}else {
QueueManger.this.reTry( queueMessage );
}
}catch (Exception e) {
e.printStackTrace();
QueueManger.this.reTry(queueMessage);
}finally {
lock.unLock( RedisKeyResolver.getMsgrKey( queueMessage.getId() ) );
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
/**
* 重试
* @param queueMessage
*/
protected void reTry(QueueMessage queueMessage) {
messagePersistent.reTry(queueMessage);
}
/**
* 分发消息
* @param queueMessage
*/
protected boolean messageDispense(QueueMessage queueMessage) {
return messagePersistent.consume(queueMessage);
}
/**
* 添加一个延时消息
* @param delayedTack
*/
public void put(DelayedTack delayedTack) {
dQueue.put(delayedTack);
}
/**
* 查询未处理的延时消息数量
* @return
*/
public int size() {
return dQueue.size();
}
/**
* 消息处理线程存活状态
* @return
*/
public boolean isAlive() {
return taskThread.isAlive();
}
}
消息对象:
package com.rynk.commons.entity;
import java.util.Date;
import org.springframework.data.mongodb.core.mapping.Document;
import com.rynk.commons.entity.em.QueueMessageType;
import com.rynk.commons.entity.em.TransferRecordStatus;
import com.rynk.commons.util.SnGeneratorUtil;
import lombok.Data;
@Data
@Document(collection = "mg_queue_message")
public class QueueMessage extends BaseEntity {
/**
* 唤醒时间
*/
private Date awakenDate;
/**
* 处理状态
*/
private TransferRecordStatus transferRecordStatus;
/**
* 消息体
*/
private String body;
/**
* 消息体类型
*/
private QueueMessageType type;
/**
* 重试次数
*/
private Integer tryTimes;
/**
* 最后一次核对时间
*/
private Date lastCheckDate;
/**
*
* @param body 消息体来源类型
* @param type 消息类型
* @param delayed 延时
* @return
*/
public static QueueMessage newInstance( String body , QueueMessageType type , long delayed ) {
QueueMessage item = new QueueMessage();
item.setId( SnGeneratorUtil.getId().toString() );
item.setCreateDate( new Date() );
item.setDr(false);
item.setTransferRecordStatus( TransferRecordStatus.WAIT );
item.setTryTimes(1);
item.setBody(body);
item.setType(type);
item.setAwakenDate( new Date( System.currentTimeMillis()+delayed ));
item.setLastCheckDate( item.getAwakenDate() );
return item;
}
}
基于redis 的 分布式锁对象:
package com.rynk.mugua.trading.biz.commons.lock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* 分布式锁
*
* @author ZHANGYUKUN
*
*/
@Component
public class DistributedLockHandler {
private static final Logger logger = LoggerFactory.getLogger(DistributedLockHandler.class);
/**
* 最大持有锁的时间(毫秒)
*/
private final static long LOCK_EXPIRE = 30 * 1000L;
/**
* 尝试获取锁的时间间隔(毫秒)
*/
private final static long LOCK_TRY_INTERVAL = 30L;
/**
* 获取锁最大等待时间( 毫秒 )
*/
private final static long LOCK_TRY_TIMEOUT = 20 * 1000L;
@Resource// (name = "customRedisTemplate")
private RedisTemplate<String, String> template;
/**
* 尝试获取 分布式锁
*
* @param lockKey
* 锁名
* @return true 得到了锁 ,false 获取锁失败
*/
public boolean tryLock(String lockKey) {
return getLock(lockKey, LOCK_TRY_TIMEOUT, LOCK_TRY_INTERVAL, LOCK_EXPIRE);
}
/**
* 尝试获取 分布式锁(不自动释放锁)
*
* @param lockKey
* 锁名
* @return true 得到了锁 ,false 获取锁失败
*/
public boolean tryLockNotAutoRelease(String lockKey) {
return getLock(lockKey, LOCK_TRY_TIMEOUT, LOCK_TRY_INTERVAL, -1);
}
/**
* 尝试获取 分布式锁
*
* @param lockKey
* 锁名
* @param timeout
* 获取锁最大等待时间
* @return true 得到了锁 ,false 获取锁失败
*/
public boolean tryLock(String lockKey, long timeout) {
return getLock(lockKey, timeout, LOCK_TRY_INTERVAL, LOCK_EXPIRE);
}
/**
* 尝试获取 分布式锁(不自动释放锁)
*
* @param lockKey
* 锁名
* @param timeout
* 获取锁最大等待时间
* @return true 得到了锁 ,false 获取锁失败
*/
public boolean tryLockNotAutoRelease(String lockKey, long timeout) {
return getLock(lockKey, timeout, LOCK_TRY_INTERVAL, -1);
}
/**
* 尝试获取 分布式锁
*
* @param lockKey
* 锁名
* @param timeout
* 获取锁最大等待时间
* @param tryInterval
* 获取锁尝试 时间间隔
* @return true 得到了锁 ,false 获取锁失败
*/
public boolean tryLock(String lockKey, long timeout, long tryInterval) {
return getLock(lockKey, timeout, tryInterval, LOCK_EXPIRE);
}
/**
* 尝试获取 分布式锁(不释放锁)
*
* @param lockKey
* 锁名
* @param timeout
* 获取锁最大等待时间
* @param tryInterval
* 获取锁尝试 时间间隔
* @return true 得到了锁 ,false 获取锁失败
*/
public boolean tryLockNotAutoRelease(String lockKey, long timeout, long tryInterval) {
return getLock(lockKey, timeout, tryInterval, -1);
}
/**
* 尝试获取 分布式锁
*
* @param lockKey
* 锁名
* @param timeout
* 获取锁最大等待时间
* @param tryInterval
* 获取锁尝试 时间间隔
* @param lockExpireTime
* 锁最大持有时间
* @return true 得到了锁 ,false 获取锁失败
*/
public boolean tryLock(String lockKey, long timeout, long tryInterval, long lockExpireTime) {
return getLock(lockKey, timeout, tryInterval, lockExpireTime);
}
/**
* 获取分布式锁
*
* @param lockKey
* 锁名
* @param timeout
* 获取锁最大等待时间
* @param tryInterval
* 获取锁尝试 时间间隔
* @param lockExpireTime
* 锁最大持有时间
* @return true 得到了锁 ,false 获取锁失败
*/
public boolean getLock(String lockKey, long timeout, long tryInterval, long lockExpireTime) {
try {
if (StringUtils.isEmpty(lockKey)) {
return false;
}
long startTime = System.currentTimeMillis();
do {
ValueOperations<String, String> ops = template.opsForValue();
SimpleDateFormat sd = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss");
if (ops.setIfAbsent(lockKey, sd.format(new Date()) )) {
if (lockExpireTime > 0) {
template.expire(lockKey, lockExpireTime, TimeUnit.MILLISECONDS);
}
return true;
}
Thread.sleep(tryInterval);
} while (System.currentTimeMillis() - startTime < timeout);
} catch (InterruptedException e) {
logger.error(e.getMessage());
return false;
}
return false;
}
/**
* 释放锁
*
* @param lockKey
*/
public void unLock(String lockKey) {
if (!StringUtils.isEmpty(lockKey)) {
template.delete(lockKey);
}
}
}
延时任务对象: 用来分装延时消息的
package com.rynk.mugua.trading.biz.eth;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import com.rynk.commons.entity.QueueMessage;
import com.rynk.mugua.trading.biz.mqMessage.ChainMessageDelayTimeLevel;
/**
* 延时任务
* @author zhangyukun
*
*/
public class DelayedTack implements Delayed{
/**
* 执行的时间
*/
Long runTime;
/**
* 消息对象
*/
QueueMessage queueMessage;
public QueueMessage getQueueMessage() {
return queueMessage;
}
public void setQueueMessage(QueueMessage queueMessage) {
this.queueMessage = queueMessage;
}
/**
*
* @param delay 延时毫秒数
* @param queueMessage 消息体
*/
public DelayedTack( QueueMessage queueMessage ) {
if( queueMessage.getTryTimes() == 1 ) {
this.runTime = queueMessage.getAwakenDate().getTime();
}else {
this.runTime =System.currentTimeMillis() + ChainMessageDelayTimeLevel.getDelayTimeLevel( queueMessage.getTryTimes() )*1000 ;
}
this.queueMessage = queueMessage;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert( runTime - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
}
}
持久化消息的对象: 三个 方式 按照自己的实现做就是了
package com.rynk.mugua.trading.biz.service.impl;
import com.rynk.commons.entity.QueueMessage;
public class MessagePersistent {
/**
* 消费这个消息要处理的业务逻辑
* @param queueMessage
* @return
*/
public boolean consume(QueueMessage queueMessage) {
return false;
}
/**
* 标记这个消息已经被正常消费
* @param queueMessage
*/
public void succeed(QueueMessage queueMessage) {
}
/**
* 重试消息(标记数据库的状态,然后把它重新放到延时队列中)
* @param queueMessage
*/
public void reTry(QueueMessage queueMessage) {
}
}
一个简单的 支持延时消息 ,持久化消息的消息队列 的Java实现
标签:zha extends operation rds cti text resource str evel
原文地址:https://www.cnblogs.com/cxygg/p/12162878.html