哪个处理器来处理生产消息的请求
思考一个问题,既然弄清楚了服务端是如何接收网络请求,并且他是如果根据请求类型去匹配消息处理器的,那么生产者生产消息的时候,他是如何具体处理这个消息的呢?是使用那个处理器,哪个线程池来处理这个消息的?
具体的处理流程是什么?
哪个处理器来处理生产消息的请求
1. 确定消息类型
根据生产者发送的消息类型来确定生产者客户端发送的消息的类型是什么?
所以需要追踪源码来确认。
堆栈
org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message)
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message, long)
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage
org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
可见,如果是批量消息 MessageBatch
,消息类型是:RequestCode.SEND_BATCH_MESSAGE
否则就是:RequestCode.SEND_MESSAGE_V2
code=310
2. 确认处理消息的处理器是哪个?
broker 获取处理器的位置:
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
在broker获取处理器的位置打上条件断点,可以看到:


code 310 对应的处理器是 SendMessageProcessor
3. SendMessageProcessor 如何处理请求的?
SendMessageProcessor的类图

broker选择处理器是异步还是同步处理:
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
NettyRequestProcessor processor = pair.getObject1();
RemotingCommand response = processor.processRequest(ctx, cmd);
callback.callback(response);
}
根据上面类继承图可以看到,SendMessageProcessor 继承了 AsyncNettyRequestProcessor ,因此他是异步处理的。
4. SendMessageProcessor异步处理
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor());
}
上面代码是SendMessageProcessor的处理函数,可以看出他进行了异步任务的编排
- 先进行
asyncProcessRequest
处理业务逻辑 - 然后
thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor())
在线程池中执行回调函数
4.1 处理逻辑
比较复杂
4.1.1 解析请求头
org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#parseRequestHeader
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
4.1.2 前置拦截
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
4.1.3 执行实际逻辑
批量消息:
return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
非批量消息:
return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
5. 非批量消息的处理
org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncSendMessage
5.1 将消息转换成内部消息格式:MessageExtBrokerInner
转换过程没必要写了吧
5.2 是否死信或者延迟消息?
是的话就直接处理,然后结束,不是的话就继续往下执行。。。
TODO: 等待以后在看这个
5.3 是否事务消息?以及事务消息的处理
事务消息的处理见:this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
5.4 普通消息的处理
见代码:this.brokerController.getMessageStore().asyncPutMessage(msgInner)
因为我们的消息是普通消息,因此肯定走的是普通消息流程
即通过DefaultMessageStore将消息存储到commitLog中,下面再说如何存储消息
6. 消息存储
org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage
defaultMessageStore也是委托commitLog对象处理消息的追加
即:this.commitLog.asyncPutMessage(msg);
6.1 commitLog 对消息的存储
org.apache.rocketmq.store.CommitLog#asyncPutMessage
6.1.1 写文件
- 先加写锁
- 获取最新的mapFile
- 如果mapedFile是空的就创建一个新的文件,如果mapedFile满了就在创建一个文件
- 将消息追加到mapedFile上
6.1.2 mapedFile 消息追加
org.apache.rocketmq.store.MappedFile#appendMessage(org.apache.rocketmq.store.MessageExtBrokerInner, org.apache.rocketmq.store.AppendMessageCallback, org.apache.rocketmq.store.CommitLog.PutMessageContext)
org.apache.rocketmq.store.MappedFile#appendMessagesInner
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
// 获取 mappedFile的buffer对象、
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
// 位置标记
byteBuffer.position(currentPos);
// 普通消息
AppendMessageResult result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBrokerInner) messageExt, putMessageContext);
return result;
}
可见,mappedFile的追加函数 实际追加逻辑委托给了 cb 对象,通过代码回溯,知道cb 是:DefaultAppendMessageCallback
因此,需要想要知道消息是如何写入到了mappedFile中,需要看 CommitLog.DefaultAppendMessageCallback#doAppend()
将消息写入了 MappedByteBuffer 中,等待 page cache 刷新 或者 force 才会将数据写入到磁盘中。