【中】Dubbo生产者处理一次请求的过程 (Dubbo源码三)
开始
相较于Dubbo消费者一次请求的过程,生产者的流程相对复杂一些,主要是因为触发点不好找。
这篇文章通过解决以下三个问题来学习源码
- 请求的触发点(消费者发送一个请求,生产者如何收到并解析?如何找到对应的服务)
- 再详细了解一下@DubboService生成的代理对象
- 基于生成的代理对象,看看执行流程是怎样的
一、请求的触发点
Dubbo底层通讯是基于Netty,请求第一步肯定是从Netty收到消息开始的。Netty收到消息也肯定是不是明文的,这一节要解决如下问题
- Netty接收消息的入口
- 如何把消息解析成明文
- 怎么通过消息找到对应的服务
1-1、接收消息的入口
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)
暂且把这里理解为入口(它前面还有一些流程,个人认为这里好点),从方法名和参数,很容易理解,这里的代码很简单,循环从管道去读取数据,并处理
可以看到msg此时并非一个明文的数据

重要
这里并不是线性执行的,从上面的图片可以看到有一个循环(next不停的去执行下一个操作),每一个节点只做一件事。和Dubbo消费者一次请求的过程和invoker的执行流程类似,一层一层的
1-2、解码入口
上面循环处理操作很多,有一个是解码

调用的链路大致如下,可以看到从Netty到Dubbo
- io.netty.handler.codec.ByteToMessageDecoder#channelRead
- org.apache.dubbo.remoting.transport.netty4.NettyCodecAdapter.InternalDecoder#decode
- org.apache.dubbo.rpc.protocol.dubbo.DubboCountCodec#decode
- org.apache.dubbo.remoting.exchange.codec.ExchangeCodec#decodeBody
- org.apache.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody
org.apache.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody 方法很长参数是流,这里面会把流解析成明文数据,下一步仔细看如何解析
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
// ...
req.setData(data);
// ...
}
1-2-1、如何把消息解析成明文
解析的本质就是反序列化,Dubbo中支持很多序列化(Hessian2、Java、FastJson2等),消费者会把序列化方式放在header中,这样生产者就知道该用哪种序列化了
org.apache.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
// 获取序列化id
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// 获取 请求id
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
} else {
// decode request.
Request req;
try {
Object data;
if ((flag & FLAG_EVENT) != 0) {
// ...
} else {
req = new Request(id);
req.setVersion(Version.getProtocolVersion());
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
// get data length.
int len = Bytes.bytes2int(header, 12);
req.setPayload(len);
DecodeableRpcInvocation inv;
if (isDecodeDataInIoThread(channel)) {
if (customByteAccessor != null) {
// ...
} else {
// 这里会把 proto(这个就是指定序列化的方式) 传递过去,赋值给 serializationType, 下一步可以看到
inv = new DecodeableRpcInvocation(frameworkModel, channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
// 开始序列化的
inv.decode();
} else {
// ...
}
data = inv;
}
// 设置序列化的结果
req.setData(data);
} catch (Throwable t) {
// ...
}
return req;
}
}
org.apache.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation#decode(org.apache.dubbo.remoting.Channel, java.io.InputStream)
public Object decode(Channel channel, InputStream input) throws IOException {
int contentLength = input.available();
getAttributes().put(Constants.CONTENT_LENGTH_KEY, contentLength);
// 找到序列化的方式,然后反序列化数据
ObjectInput in = CodecSupport.getSerialization(serializationType)
.deserialize(channel.getUrl(), input);
this.put(SERIALIZATION_ID_KEY, serializationType);
// ... 这下面就是反序列化各种数据
return this;
}
org.apache.dubbo.remoting.transport.CodecSupport#getSerialization(java.lang.Byte)
public static Serialization getSerialization(Byte id) throws IOException {
Serialization result = getSerializationById(id);
if (result == null) {
throw new IOException("Unrecognized serialize type from consumer: " + id);
}
return result;
}
// 这个Map里面存了各种的序列化方式
public static Serialization getSerializationById(Byte id) {
return ID_SERIALIZATION_MAP.get(id);
}
序列化方式的加载是通过SPI的方式
感兴趣的可以参看 Dubbo自定义过滤器,过滤器源码详解 里面介绍了SPI
org.apache.dubbo.common.serialize.Serialization
通过上面的一系列解析操作,可以得到请求的明文 【Request】
- 包含这次请求的方法,参数等等
- 但request里面的 invoker还是为空,真正执行的肯定是这个 invoker

1-3、找到此次请求的 invoker
当解析完请求的Request的时候,又回到了最开始解析数据的时候,这时候会发现有一个 NettyServerHandler,在这里会通过此次请求的参数找到对应的 invoker

大致流程如下
- io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)
- org.apache.dubbo.remoting.transport.netty4.NettyServerHandler#channelRead
- org.apache.dubbo.remoting.transport.AbstractPeer#received
- org.apache.dubbo.remoting.transport.MultiMessageHandler#received
- org.apache.dubbo.remoting.exchange.support.header.HeartbeatHandler#received
- org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received
- org.apache.dubbo.rpc.protocol.dubbo.DubboIsolationExecutorSupport#getProviderModel
- org.apache.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation#fillInvoker
- org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#getInvoker
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#getInvoker
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
// ...
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
// ...
Invoker<?> invoker = exporter.getInvoker();
inv.setServiceModel(invoker.getUrl().getServiceModel());
return invoker;
}
可以看到此次请求组装了一个唯一serviceKey。exporterMap 里面存了每个服务的数据,和每个服务的 metadata 的数据。

1-3-1、invoker的执行点
大概节点如下:
org.apache.dubbo.remoting.transport.DecodeHandler#received org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter#reply
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#DubboProtocol#reply
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
// ...
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}

二、@DubboService 代理对象的生成
上面的流程大概是:
- Netty收到请求
- 反序列化请求,得到明文数据
- 基于请求找到invoker (从exporterMap 中获取)
- 执行invoker
2-1、代理对象生成入口
既如此就来看看 exporterMap 中的数据是何时存入的? 通过@DubboService、@DubboReference解析原理 得知生成invoker的方法如下
org.apache.dubbo.config.ServiceConfig#doExportUrl
private void doExportUrl(URL url, boolean withMetaData, RegisterTypeEnum registerType) {
// ...
// 使用 javassist 生成代理对象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
// 一个@DubboService会生成两个代理对象,一个正常的,一个 MetaData
if (withMetaData) {
invoker = new DelegateProviderMetaDataInvoker(invoker, this);
}
// 对 invoker进行包装处理,在这一步存入 exporterMap
Exporter<?> exporter = protocolSPI.export(invoker);
exporters.computeIfAbsent(registerType, k -> new CopyOnWriteArrayList<>()).add(exporter);
}
2-2、使用Filter包装代理对象
step 3 很重要
消费者和生产者都一样,最终的invoker都被封装到CallbackRegistrationInvoker里面,而CallbackRegistrationInvoker包含了自定义过滤器,下一节要讲的自定义过滤器这里就是起点。
org.apache.dubbo.config.ServiceConfig#doExportUrl
private void doExportUrl(URL url, boolean withMetaData, RegisterTypeEnum registerType) {
// ...
Exporter<?> exporter = protocolSPI.export(invoker);
// ...
}
org.apache.dubbo.rpc.cluster.filter.ProtocolFilterWrapper#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
FilterChainBuilder builder = getFilterChainBuilder(invoker.getUrl());
return protocol.export(builder.buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
org.apache.dubbo.rpc.cluster.filter.DefaultFilterChainBuilder#buildInvokerChain
@Override
public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {
Invoker<T> last = originalInvoker;
URL url = originalInvoker.getUrl();
List<ModuleModel> moduleModels = getModuleModelsFromUrl(url);
List<Filter> filters;
// ... 找到合适的 filters
// 开启地狱嵌套包装Filter
if (!CollectionUtils.isEmpty(filters)) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new CopyOfFilterChainNode<>(originalInvoker, next, filter);
}
return new CallbackRegistrationInvoker<>(last, filters);
}
return last;
}
2-3、存入 exporterMap
默认使用的是Dubbo协议,所以走的是 DubboProtocol
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
checkDestroyed();
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// ...
openServer(url);
optimizeSerialization(url);
return exporter;
}
public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
// 存入了
exporterMap.put(key, this);
}
三、 invoker的执行
它的执行可以参看 Dubbo消费者一次请求的过程,一样的,都是先走完过滤器,最终再执行对应的方法