码迷,mamicode.com
首页 > 其他好文 > 详细

Rocketmq消息持久化

时间:2017-06-27 18:38:23      阅读:183      评论:0      收藏:0      [点我收藏+]

标签:inner   blog   bat   stp   row   dex   context   callback   cts   

本文编写,参考:https://my.oschina.net/bieber/blog/725646

producer Send()的Message最终将由broker处理,处理类为:SendMessageProcessor ,处理方法:processRequet.

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

private List<ConsumeMessageHook> consumeMessageHookList;

public SendMessageProcessor(final BrokerController brokerController) {
super(brokerController);
}

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {}
上述方法,并不是直接处理消息,而是交由MessageStore处理,相关代码如下:
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
//......
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

然而MessageStore也不直接持久化消息,转交给 CommitLog
long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessages(messageExtBatch);

从MappedFileQueue中取出最新的一条:
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
//写消息
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
//持久化到磁盘
handleDiskFlush(result, putMessageResult, messageExtBatch);

handleHA(result, putMessageResult, messageExtBatch);


cousumer 从broker读消息。


Rocketmq消息持久化

标签:inner   blog   bat   stp   row   dex   context   callback   cts   

原文地址:http://www.cnblogs.com/itdev/p/7086322.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!