【中】Dubbo消费者一次请求的过程 (Dubbo源码二)
开始
上篇学习了@DubboService、@DubboReference解析原理,原本想在这篇文章来讲解它们运行的过程。但遇到滑铁卢:Dubbo的源码是我目前看的最复杂的源码,链路跳转太多了,所以分开,先来看看消费者的一次请求经历了哪些过程
下面的这些内容我花了好几天学习,边看边断点,所以一下看不懂没关系,还是需要自己去断点慢慢调试,关键节点都给出来了
上篇讲@DubboReference解析的时候,只讲到使用了@DubboReference注解的最终会生成一个代理对象ReferenceBean。其实bean还会做很多初始化的工作,就先来看看它做了哪些重要的初始化动作。
回忆一下,生成代理对象的时候用的是 LazyTargetInvocationHandler
private void generateFromJavassistFirst(List<Class<?>> interfaces) {
try {
this.lazyProxy = Proxy.getProxy(interfaces.toArray(new Class[0])).newInstance(new LazyTargetInvocationHandler(new DubboReferenceLazyInitTargetSource()));
} catch (Throwable fromJavassist) {
// ...
}
}
org.apache.dubbo.config.spring.util.LazyTargetInvocationHandler#invoke
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// ...
if (target == null) {
synchronized (this) {
if (target == null) {
target = lazyTargetSource.getTarget();
}
}
}
// ...
return method.invoke(target, args);
}
正如它的名字一样,它里面只是做了一个懒加载的动作,在第一次执行的时候会对 target 初始化赋值
一、对代理对象进一步初始化,为 target 赋值
1-1、代理对象生成的入口
所谓入口就是从 lazyTargetSource.getTarget()
一步步点进去看看最终是在什么地方初始化的
org.apache.dubbo.config.spring.ReferenceBean.DubboReferenceLazyInitTargetSource
private class DubboReferenceLazyInitTargetSource implements LazyTargetSource {
@Override
public Object getTarget() throws Exception {
return getCallProxy();
}
}
org.apache.dubbo.config.spring.ReferenceBean#getCallProxy
private Object getCallProxy() throws Exception {
if (referenceConfig == null) {
throw new IllegalStateException("ReferenceBean is not ready yet, please make sure to call reference interface method after dubbo is started.");
}
// ReferenceBean 初始化完成之后这个就已经准备好了,所以这里直接走这个get(感兴趣的可以这里断点,然后启动服务)
if (referenceConfig.configInitialized()) {
return referenceConfig.get();
}
// ...
}
org.apache.dubbo.config.ReferenceConfig#get
public T get(boolean check) {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
// ensure start module, compatible with old api usage
getScopeModel().getDeployer().start();
init(check);
}
return ref;
}
这个方法就是消费者初始化的核心方法,如果以后你想知道消费者的某个配置什么时候初始化的,都应该来看这个方法。
这里为简化了,只看创建Proxy的地方
org.apache.dubbo.config.ReferenceConfig#init(boolean)
protected synchronized void init(boolean check) {
// ...
ref = createProxy(referenceParameters);
// ...
initialized = true;
}
1-2、创建代理对象
org.apache.dubbo.config.ReferenceConfig#createProxy
private T createProxy(Map<String, String> referenceParameters) {
// 创建 invoker, invoker的创建很复杂,也很重要,下面再讲
createInvoker();
// 创建 proxy
return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper#getProxy
@Override
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
T proxy = proxyFactory.getProxy(invoker, generic);
// ...
return proxy;
}
org.apache.dubbo.rpc.proxy.AbstractProxyFactory#getProxy
@Override
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
// ...
return getProxy(invoker, interfaces.toArray(new Class<?>[0]));
// ...
}
org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getProxy
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
代理对象执行的是 InvokerInvocationHandler (参看setp 3)
1-3、创建 Invoker
invoker的创建是最难,同时也是最重要的
如果把所有的代码都贴出来那会非常多、且杂乱,不利于理解,简单来说这里的创建生成了2个 invoker
- MigrationInvoker
- CallbackRegistrationInvoker
这两个Invoker不太好理解的原因有3点
- 生成的过程很复杂,我没办法给出全貌,只能给出几个关键点,有兴趣的小伙伴自己去看看
- 执行不是线性的,都是过滤器类型,一层层执行,这大大的加深了理解的难度
- 大部分invoker就是直接去干嘛了,它们不是,它们是 invoker里面嵌套invoker
- MigrationInvoker 可以理解为一个系统的invoker,它里面有八个过滤器(暂时还没研究是干嘛的),最后一个过滤器就转到执行CallbackRegistrationInvoker
- CallbackRegistrationInvoker 里面包含了 DubboInvoker这个是发起请求的invoker,同时它包含了我们自定义的过滤器,如果有就会在执行请求之前执行我们的过滤器(关于自定义过滤器后面会单独抽一篇文章来讲,因为我们的大部分实践就是基于过滤器的)
1-3-1、MigrationInvoker、CallbackRegistrationInvoker 对象
先来看看这两个对象的内容,一下理解不了没关系,先看看它们长啥样


1-3-2、MigrationInvoker 生成、链路组装
这里的链路不打算给出代码,太复杂了,但给出关键的触发点,这样就可以打断点慢慢看了,下面并不全部的过程,只是一些核心点
- org.apache.dubbo.config.ReferenceConfig#createProxy
- org.apache.dubbo.config.ReferenceConfig#createInvoker (invoker = protocolSPI.refer(interfaceClass, curUrl))
- org.apache.dubbo.rpc.Protocol#refer
- org.apache.dubbo.rpc.protocol.ProtocolSerializationWrapper#refer
- org.apache.dubbo.rpc.cluster.filter.ProtocolFilterWrapper#refer
- org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper#refer
- org.apache.dubbo.qos.protocol.QosProtocolWrapper#refer
- org.apache.dubbo.registry.integration.RegistryProtocol#doRefer (生成 MigrationInvoker)
- org.apache.dubbo.registry.client.migration.MigrationInvoker#refreshInterfaceInvoker
- org.apache.dubbo.registry.integration.RegistryProtocol#doCreateInvoker (链路的生成,这里就是上面看到的 nextNode组装)
- org.apache.dubbo.qos.protocol.QosProtocolWrapper#refer
1-3-2、CallbackRegistrationInvoker 生成、链路组装
CallbackRegistrationInvoker 的生成也是上面的一部分,这里同样也只给出核心的链路
- org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#protocolBindingRefer
- org.apache.dubbo.rpc.cluster.filter.DefaultFilterChainBuilder#buildInvokerChain (自定义过滤器就是在这里组装的)
提示
MigrationInvoker 可以理解为是系统,Dubbo的系统过滤器都是在这里,CallbackRegistrationInvoker 自定义过滤器是放在这个里面的
buildInvokerChain 这个方法就是组装自定义过滤器的,生产者过滤器也是在这里组装的,后面会将
二、执行过程
提示
- 执行过程就是一步步走,从LazyTargetInvocationHandler 的 invoker开始,走到MigrationInvoker执行里面的所有nextNode节点(系统过滤),再到CallbackRegistrationInvoker 执行自定义的过滤器,最后再发起请求
- 如果理解了上面的invoker,这里就还好,不然这里的链路调用还是很复杂
先来看看核心执行流程图

2-1、入口
org.apache.dubbo.config.spring.util.LazyTargetInvocationHandler#invoke
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// ...
if (target == null) {
synchronized (this) {
if (target == null) {
target = lazyTargetSource.getTarget();
}
}
}
// ...
return method.invoke(target, args);
// ...
}
org.apache.dubbo.rpc.proxy.InvokerInvocationHandler#invoke
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// ...
RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method.getName(), invoker.getInterface().getName(), protocolServiceKey, method.getParameterTypes(), args);
if (serviceModel instanceof ConsumerModel) {
rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel);
rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method));
}
return InvocationUtil.invoke(invoker, rpcInvocation);
}
org.apache.dubbo.rpc.proxy.InvocationUtil
public static Object invoke(Invoker<?> invoker, RpcInvocation rpcInvocation) throws Throwable {
RpcContext.RestoreServiceContext originServiceContext = RpcContext.storeServiceContext();
try {
// ...
if (ProfilerSwitch.isEnableSimpleProfiler()) {
// ...
rpcInvocation.put(Profiler.PROFILER_KEY, bizProfiler);
try {
return invoker.invoke(rpcInvocation).recreate();
} finally {
// ...
}
}
return invoker.invoke(rpcInvocation).recreate();
} finally {
RpcContext.restoreServiceContext(originServiceContext);
}
}
org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker#invoke
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result;
String value = getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), MOCK_KEY, Boolean.FALSE.toString()).trim();
if (ConfigUtils.isEmpty(value)) {
//no mock
result = this.invoker.invoke(invocation);
}
// ...
return result;
}
org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster.ClusterFilterInvoker#invoke
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filterInvoker.invoke(invocation);
}
org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder.CallbackRegistrationInvoker#invoke
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult = filterInvoker.invoke(invocation);
asyncResult.whenCompleteWithContext((r, t) -> {
RuntimeException filterRuntimeException = null;
for (int i = filters.size() - 1; i >= 0; i--) {
// ...
}
if (filterRuntimeException != null) {
throw filterRuntimeException;
}
});
return asyncResult;
}
开启过滤链,把 nextNode传递下去,不断的递归执行 nextNode
org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder.CopyOfFilterChainNode#invoke
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Filter " + filter.getClass().getName() + " invoke.");
// 开启过滤链,把 nextNode传递下去,不断的递归执行 nextNode
asyncResult = filter.invoke(nextNode, invocation);
// ...
} catch (Exception e) {
// ...
} finally {
}
return asyncResult;
}
下图是step 7
的 debug图,可以看到最后一个nextNode是 FailoverClusterInvoker

2-2、CallbackRegistrationInvoker 执行

和上面一样,它一样会一层层执行 nextNode,最后到 DubboInvoker
自定义过滤器就是在这个步骤进行执行的 DistinctDubboConsumerFilter 就是自定义过滤器
2-3、发起请求
org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
// ...
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = RpcUtils.calculateTimeout(getUrl(), invocation, methodName, DEFAULT_TIMEOUT);
if (timeout <= 0) {
return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
"No time left for making the following call: " + invocation.getServiceName() + "."
+ RpcUtils.getMethodName(invocation) + ", terminate directly."), invocation);
}
invocation.setAttachment(TIMEOUT_KEY, String.valueOf(timeout));
Integer payload = getUrl().getParameter(PAYLOAD, Integer.class);
Request request = new Request();
if (payload != null) {
request.setPayload(payload);
}
request.setData(inv);
request.setVersion(Version.getProtocolVersion());
if (isOneway) {
// 走下面的 else
} else {
request.setTwoWay(true);
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
// 发送netty 请求
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(request, timeout, executor).thenApply(AppResponse.class::cast);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
if (setFutureWhenSync || ((RpcInvocation) invocation).getInvokeMode() != InvokeMode.SYNC) {
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
}
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
// ...
} catch (RemotingException e) {
// ...
}
}
三、总结
- 基于@EnableDubbo > @EnableDubboConfig 找到每一个@DubboReference 生成代理对象
- 生产的代理对象是一个嵌套对象,里面有两个重要的对象分别是 MigrationInvoker、CallbackRegistrationInvoker
- MigrationInvoker 和 CallbackRegistrationInvoker 里面都有很多Filter,MigrationInvoker 主要是系统过滤器,自定义过滤器是 CallbackRegistrationInvoker
- MigrationInvoker 里面有很多 invoker,最后一个是 FailoverClusterInvoker,从这里转执行 CallbackRegistrationInvoker
- 在执行 CallbackRegistrationInvoker 的时候,会先执行自定义过滤器,最后执行 DubboInvoker 发起请求