Broker是如何接受网络请求,并且如何处理网络请求的?
Broker是如何接受网络请求,并且如何处理网络请求的?
因为rmq的底层通信是使用netty完成的,所以要研究broker和produer的通信交互逻辑,就应该看broker在收到网络请求的时候是如何读取、处理的。
而要找到处理逻辑,就需要先找到rmq的netty server hander,看一下 netty handler 是如何处理请求或者将请求委托给谁处理的。
1.寻找server handler
直接搜索ServerBootstrap看哪里使用
可以看到在
NettyRemotingServer
类中使用了,所以可以知道,这里是入口
1.1 netty 的handler
@ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
1.2 服务器启动堆栈跟踪
org.apache.rocketmq.broker.BrokerStartup#main
org.apache.rocketmq.broker.BrokerStartup#start
org.apache.rocketmq.broker.BrokerController#start
org.apache.rocketmq.remoting.netty.NettyRemotingServer#start
1.3 NettyRemotingServer 的启动
初始化线程组
创建handler
handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode); // 握手包 encoder = new NettyEncoder(); // 编码器 connectionManageHandler = new NettyConnectManageHandler(); // 连接管理 serverHandler = new NettyServerHandler(); // 实际业务处理
使用netty的模板方法启动服务器
ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) // 是否使用epoll,只有在linux环境下才可以使用epoll .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024)// tcp 握手队列 .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) // tcp 发送缓存区大小 .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) // 接收缓存区大小 .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) // 握手handler .addLast(defaultEventExecutorGroup, encoder, // 编码 new NettyDecoder(), // 解码 // 空闲检测 new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), // 链接管理 connectionManageHandler, // 业务处理 serverHandler ); } });
1.4 服务端请求处理分析
1.4.1 HandshakeHandler
org.apache.rocketmq.remoting.netty.NettyRemotingServer.HandshakeHandler
主要是监听消息是否是tls加密,如果加密的话,就添加加密处理器,没有其他功效
1.4.2 NettyEncoder
org.apache.rocketmq.remoting.netty.NettyEncoder
用于进行编码的工具
1.4.3 NettyDecoder
解码工具
org.apache.rocketmq.remoting.netty.NettyDecoder
1.4.4 IdleStateHandler
netty自带提供的handler,空闲检测
1.4.5 NettyConnectManageHandler
监听netty事件,将其转换成 rocketMq事件发布
1.4.6 *NettyServerHandler
业务消息处理器,只不过他并不直接处理业务消息,而是将业务消息转发给 NettyRemotingAbstract#processMessageReceived
进行处理
1.5 processMessageReceived
也不做事件的处理逻辑,根据传入的消息类型,request
、response
调用响应的处理函数
1.5.1 处理请求消息 REQUEST_COMMAND
使用函数processRequestCommand
来处理
该函数用于处理从远程对等方接收到的请求命令。它首先根据请求命令的代码从处理器表中获取对应的处理器和执行器,然后使用执行器执行处理器的请求处理方法。如果请求处理器存在,则根据请求类型选择同步或异步处理方式,并在处理完成后发送响应命令。如果请求处理器不存在,则发送请求代码不支持的响应命令
分析处理流程:
- 根据请求的编码寻找处理器和线程池的键值对
- 如果找不到处理器就使用默认的处理器
- 将当前请求对象封装成任务对象,提交到处理的线程池中
- 如果没有找到处理器就写回不支持的消息类型
- 异步执行消息的时候,先执行前置拦截
- 在根据处理器是同步或者异步不同去处理请求
- 执行后置拦截器
- 如果不是oneway消息类型,将响应结果写回到客户端
源码:
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
// 1. 根据请求的编码 获取对应的处理器和线程池
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
// 2. 如果找不到合适的处理器 就使用默认的处理器来处理这个请求
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
// request id
final int opaque = cmd.getOpaque();
/**
* 如果找到了对应的处理器 就使用对应的处理器来处理这个请求
* 如果找不到对应的处理器且也不存在默认的消息处理器,就发送响应 无法处理这种类型的消息、
*/
if (pair != null) {
// 1. 将当前请求封装成一个 线程对象
Runnable run = new Runnable() {
@Override
public void run() {
/*
执行客户端请求命令
分为几步:
1. 执行前置拦截 doBeforeRpcHooks
2. 如果是异步请求,就让处理器异步处理
3. 如果是同步请求,就让同力气同步处理
4. 执行callback 以及 后置拦截 将响应结果写回给客户端
*/
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
doBeforeRpcHooks(remoteAddr, cmd);
final RemotingResponseCallback callback = new RemotingResponseCallback() {
@Override
public void callback(RemotingCommand response) {
// 后置拦截
doAfterRpcHooks(remoteAddr, cmd, response);
// 写回请求
if (!cmd.isOnewayRPC()) {
response.setOpaque(opaque);
response.markResponseType();
ctx.writeAndFlush(response);
}
}
};
// 异步处理器
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
// 同步处理器
NettyRequestProcessor processor = pair.getObject1();
RemotingCommand response = processor.processRequest(ctx, cmd);
callback.callback(response);
}
}
};
// 2. 如果当前线程池的线程池拒绝服务,比如太忙,就发送拒绝服务的响应
if (pair.getObject1().rejectRequest()) {
ctx.writeAndFlush("[REJECTREQUEST]system busy, start flow control for a while");
return;
}
// 3. 将当前请求封装成一个任务对象 request Task
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
// 4. 将任务提交到线程池
pair.getObject2().submit(requestTask);
} else {
// 5. 如果找不到合适的处理器 发送无法处理的响应给客户端
ctx.writeAndFlush( " request type " + cmd.getCode() + " not supported");
}
}
总结
RocketMq会将收过来的请求,根据消息类型的不同,交给各自的处理器、各自的线程池去单独处理。
同时,还会根据是否配置了拦截器,去执行前置、后置拦截器。
2. NettyRemotingAbstract分析
2.1 处理不同消息的处理器
从上面知道,rocketMq会通过消息类型,寻找不同的消息处理器的时候,是从processorTable
这个map中获取的,那他是什么时候将处理器注册进去的呢?
子类也就是NettyRemotingServer
中有一个函数registerProcessor
提供了处理器的注册功能。
如下:
@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
executorThis = this.publicExecutor;
}
// 建立键值对,前面是code,即消息类型,后面是对应的消息处理器
Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
this.processorTable.put(requestCode, pair);
}
在brokerController初始化的时候 ,会调用一个注册函数org.apache.rocketmq.broker.BrokerController#registerProcessor
,将处理器注册进来
他将所有的他支持的消息类型,以及对应的处理器和线程池 pair 都注册进来了
End 流程图
