【中】Nacos是如何热更新的
一、监听器的添加
在 Nacos配置文件如何初始化的 里面,讲到了 NacosConfigApplicationContextInitializer,其实这里不但会做配置文件加载的初始化,还会做监听器的初始化
public void initialize(ConfigurableApplicationContext context) {
singleton.setApplicationContext(context);
environment = context.getEnvironment();
nacosConfigProperties = NacosConfigPropertiesUtils.buildNacosConfigProperties(environment);
final NacosConfigLoader configLoader = NacosConfigLoaderFactory.getSingleton(nacosConfigProperties, environment, builder);
if (!enable()) {
logger.info("[Nacos Config Boot] : The preload configuration is not enabled");
}
else {
if (processor.enable()) {
processor.publishDeferService(context);
configLoader.addListenerIfAutoRefreshed(processor.getDeferPropertySources());
}
else {
// 配置文件的初始化
configLoader.loadConfig();
// 监听器的初始化
configLoader.addListenerIfAutoRefreshed();
}
}
final ConfigurableListableBeanFactory factory = context.getBeanFactory();
if (!factory
.containsSingleton(NacosBeanUtils.GLOBAL_NACOS_PROPERTIES_BEAN_NAME)) {
factory.registerSingleton(NacosBeanUtils.GLOBAL_NACOS_PROPERTIES_BEAN_NAME,
configLoader.getGlobalProperties());
}
}
一直点进去,就会看到生成 Listener 的地方,注意它也是匿名类
public static void addListenerIfAutoRefreshed(
final NacosPropertySource nacosPropertySource, final Properties properties,
final ConfigurableEnvironment environment) {
if (!nacosPropertySource.isAutoRefreshed()) { // Disable Auto-Refreshed
return;
}
final String dataId = nacosPropertySource.getDataId();
final String groupId = nacosPropertySource.getGroupId();
final String type = nacosPropertySource.getType();
final NacosServiceFactory nacosServiceFactory = getNacosServiceFactoryBean(beanFactory);
try {
ConfigService configService = nacosServiceFactory.createConfigService(properties);
// Listener 生成的地方
Listener listener = new AbstractListener() {
@Override
public void receiveConfigInfo(String config) {
String name = nacosPropertySource.getName();
NacosPropertySource newNacosPropertySource = new NacosPropertySource(dataId, groupId, name, config, type);
newNacosPropertySource.copy(nacosPropertySource);
MutablePropertySources propertySources = environment.getPropertySources();
propertySources.replace(name, newNacosPropertySource);
}
};
if (configService instanceof EventPublishingConfigService) {
// 添加 Listener
((EventPublishingConfigService) configService).addListener(dataId, groupId, type, listener);
}
else {
// ....
}
}
catch (NacosException e) {
throw new RuntimeException(
"ConfigService can't add Listener with properties : " + properties,
e);
}
}
下面是添加Listener的整个路径,最后会添加到 cacheMap里面去。 同时会添加到 listenExecutebell 里面去,这是一个阻塞队列,会有一个线程不断的扫描它
- com.alibaba.nacos.spring.context.event.config.EventPublishingConfigService#addListener(java.lang.String, java.lang.String, java.lang.String, com.alibaba.nacos.api.config.listener.Listener)
- com.alibaba.nacos.spring.context.event.config.EventPublishingConfigService#addListener(java.lang.String, java.lang.String, com.alibaba.nacos.api.config.listener.Listener)
- com.alibaba.nacos.client.config.NacosConfigService#addListener
- com.alibaba.nacos.client.config.impl.ClientWorker#addTenantListeners
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
group = blank2defaultGroup(group);
String tenant = agent.getTenant();
// 添加到本地缓存中去
CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
synchronized (cache) {
for (Listener listener : listeners) {
cache.addListener(listener);
}
cache.setSyncWithServer(false);
// 添加到 阻塞队列里面去
agent.notifyListenConfig();
}
}
二、收集每个使用需要热更新的bean
通过Nacos后置处理器收集每一个需要热更新的bean,然后按照 key-value的形式存入到 Map中去
比如 @NacosValue(value = "${testOneKey}", autoRefreshed = true)
,key = testOneKey
Nacos热更新的注解是 @NacosValue
,这个注解对应的后置处理器是
@Override
public Object postProcessBeforeInitialization(Object bean, final String beanName) throws BeansException {
doWithFields(bean, beanName);
doWithMethods(bean, beanName);
return super.postProcessBeforeInitialization(bean, beanName);
}
一步步往下看
- com.alibaba.nacos.spring.context.annotation.config.NacosValueAnnotationBeanPostProcessor#doWithFields
- com.alibaba.nacos.spring.context.annotation.config.NacosValueAnnotationBeanPostProcessor#doWithAnnotation
private void doWithAnnotation(String beanName, Object bean, NacosValue annotation, int modifiers, Method method, Field field) {
if (annotation != null) {
if (Modifier.isStatic(modifiers)) {
return;
}
// 是否开启热更新
if (annotation.autoRefreshed()) {
// 解析变量
// @NacosValue(value = "${testOneKey}", autoRefreshed = true)
// 👇👇👇👇👇👇👇👇👇👇👇
// testOneKey
String placeholder = resolvePlaceholder(annotation.value());
if (placeholder == null) {
return;
}
NacosValueTarget nacosValueTarget = new NacosValueTarget(bean, beanName, method, field, annotation.value());
// 存入Map
put2ListMap(placeholderNacosValueTargetMap, placeholder, nacosValueTarget);
}
}
}
private <K, V> void put2ListMap(Map<K, List<V>> map, K key, V value) {
List<V> valueList = map.get(key);
if (valueList == null) {
valueList = new ArrayList<V>();
}
valueList.add(value);
map.put(key, valueList);
}
三、数更新入口
1、2 步骤里面已经做了热更新的监听(轮训获取变化的data-id),和热更新使用方的收集。那现在自然只要做热更新的解析了
在 第一步添加监听器的时候,会开启一个 ClientWorker中的一个定时任务,这个定时任务会一直轮训去配置中心里获取有变动的data-id
@Override
public void startInternal() {
executor.schedule(() -> {
while (!executor.isShutdown() && !executor.isTerminated()) {
try {
listenExecutebell.poll(5L, TimeUnit.SECONDS);
if (executor.isShutdown() || executor.isTerminated()) {
continue;
}
executeConfigListen();
} catch (Exception e) {
LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);
}
}
}, 0L, TimeUnit.MILLISECONDS);
}
executeConfigListen
是轮训的核心方法,这个方法很长,下面给出核心的流程
@Override
public void executeConfigListen() {
Map<String, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16);
Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16);
long now = System.currentTimeMillis();
boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;
// ....
boolean hasChangedKeys = false;
if (!listenCachesMap.isEmpty()) {
for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
String taskId = entry.getKey();
List<CacheData> listenCaches = entry.getValue();
// ...
// 构建请求的入参
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
configChangeListenRequest.setListen(true);
try {
RpcClient rpcClient = ensureRpcClient(taskId);
ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(rpcClient, configChangeListenRequest);
if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) {
Set<String> changeKeys = new HashSet<String>();
// 如果有变动的配置文件,就调用 refreshContentAndCheck 方法
if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {
hasChangedKeys = true;
for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse
.getChangedConfigs()) {
String changeKey = GroupKey.getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(), changeConfig.getTenant());
changeKeys.add(changeKey);
boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
// 通过md5校验是否有更新,有就去刷新配置文件
refreshContentAndCheck(changeKey, !isInitializing);
}
}
// ...
}
} catch (Exception e) {
// ...
}
}
}
// ...
if (needAllSync) {
lastAllSyncTime = now;
}
//If has changed keys,notify re sync md5.
if (hasChangedKeys) {
notifyListenConfig();
}
}
请求Nacos的入参,除了基本的参数,可以看到还有一个 md5,故而猜测:服务端是通过 md5来判断内容是否有变更
private ConfigBatchListenRequest buildConfigRequest(List<CacheData> caches) {
ConfigBatchListenRequest configChangeListenRequest = new ConfigBatchListenRequest();
for (CacheData cacheData : caches) {
configChangeListenRequest.addConfigListenContext(cacheData.group, cacheData.dataId, cacheData.tenant, cacheData.getMd5());
}
return configChangeListenRequest;
}
public void addConfigListenContext(String group, String dataId, String tenant, String md5) {
ConfigListenContext configListenContext = new ConfigListenContext();
configListenContext.dataId = dataId;
configListenContext.group = group;
configListenContext.md5 = md5;
configListenContext.tenant = tenant;
configListenContexts.add(configListenContext);
}
通过data-id,获取具体的content,并基于新的content计算新的md5
- com.alibaba.nacos.client.config.impl.ClientWorker#refreshContentAndCheck(java.lang.String, boolean)
- com.alibaba.nacos.client.config.impl.ClientWorker#refreshContentAndCheck(com.alibaba.nacos.client.config.impl.CacheData, boolean)
private void refreshContentAndCheck(String groupKey, boolean notify) {
if (cacheMap.get() != null && cacheMap.get().containsKey(groupKey)) {
CacheData cache = cacheMap.get().get(groupKey);
refreshContentAndCheck(cache, notify);
}
}
private void refreshContentAndCheck(CacheData cacheData, boolean notify) {
try {
ConfigResponse response = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L, notify);
// 设置新的 content和新的md5
cacheData.setContent(response.getContent());
cacheData.setEncryptedDataKey(response.getEncryptedDataKey());
if (null != response.getConfigType()) {
cacheData.setType(response.getConfigType());
}
if (notify) {
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
agent.getName(), cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.getMd5(),
ContentUtils.truncateContent(response.getContent()), response.getConfigType());
}
cacheData.checkListenerMd5();
} catch (Exception e) {
LOGGER.error("refresh content and check md5 fail ,dataId={},group={},tenant={} ", cacheData.dataId,
cacheData.group, cacheData.tenant, e);
}
}
public void setContent(String content) {
this.content = content;
this.md5 = getMd5String(this.content);
}
public static String getMd5String(String config) {
return (null == config) ? Constants.NULL : MD5Utils.md5Hex(config, Constants.ENCODE);
}
校验内容是否真的有变动(md5对比,如果有就发送变动事件)
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) {
safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);
}
}
}
异步去执行通知处理的逻辑,不会阻塞轮训的线程
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {
final Listener listener = listenerWrap.listener;
if (listenerWrap.inNotifying) {
LOGGER.warn(
"[{}] [notify-currentSkip] dataId={}, group={}, md5={}, listener={}, listener is not finish yet,will try next time.",
name, dataId, group, md5, listener);
return;
}
Runnable job = () -> {
long start = System.currentTimeMillis();
ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
ClassLoader appClassLoader = listener.getClass().getClassLoader();
try {
if (listener instanceof AbstractSharedListener) {
AbstractSharedListener adapter = (AbstractSharedListener) listener;
adapter.fillContext(dataId, group);
LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
}
// Before executing the callback, set the thread classloader to the classloader of
// the specific webapp to avoid exceptions or misuses when calling the spi interface in
// the callback method (this problem occurs only in multi-application deployment).
Thread.currentThread().setContextClassLoader(appClassLoader);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setGroup(group);
cr.setContent(content);
cr.setEncryptedDataKey(encryptedDataKey);
configFilterChainManager.doFilter(null, cr);
String contentTmp = cr.getContent();
listenerWrap.inNotifying = true;
// 发送通知的地方
listener.receiveConfigInfo(contentTmp);
// compare lastContent and content
if (listener instanceof AbstractConfigChangeListener) {
Map data = ConfigChangeHandler.getInstance()
.parseChangeData(listenerWrap.lastContent, content, type);
ConfigChangeEvent event = new ConfigChangeEvent(data);
((AbstractConfigChangeListener) listener).receiveConfigChange(event);
listenerWrap.lastContent = content;
}
// 赋值新的 lastCallMd5
listenerWrap.lastCallMd5 = md5;
LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ,cost={} millis.", name, dataId,
group, md5, listener, (System.currentTimeMillis() - start));
} catch (NacosException ex) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name,
dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group,
md5, listener, t.getCause());
} finally {
listenerWrap.inNotifying = false;
Thread.currentThread().setContextClassLoader(myClassLoader);
}
};
final long startNotify = System.currentTimeMillis();
try {
// 异步执行
if (null != listener.getExecutor()) {
listener.getExecutor().execute(job);
} else {
try {
INTERNAL_NOTIFIER.submit(job);
} catch (RejectedExecutionException rejectedExecutionException) {
LOGGER.warn(
"[{}] [notify-blocked] dataId={}, group={}, md5={}, listener={}, no available internal notifier,will sync notifier ",
name, dataId, group, md5, listener);
job.run();
} catch (Throwable throwable) {
LOGGER.error(
"[{}] [notify-blocked] dataId={}, group={}, md5={}, listener={}, submit internal async task fail,throwable= ",
name, dataId, group, md5, listener, throwable);
job.run();
}
}
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
group, md5, listener, t.getCause());
}
final long finishNotify = System.currentTimeMillis();
LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
name, (finishNotify - startNotify), dataId, group, md5, listener);
}
从上面得知 listener的实现是 DelegatingEventPublishingListener,这里会发送一个事件 NacosConfigReceivedEvent
@Override
public void receiveConfigInfo(String content) {
onReceived(content);
publishEvent(content);
}
private void publishEvent(String content) {
NacosConfigReceivedEvent event = new NacosConfigReceivedEvent(configService,
dataId, groupId, content, configType);
applicationEventPublisher.publishEvent(event);
}
private void onReceived(String content) {
delegate.receiveConfigInfo(content);
}
四、热更新解析
事件解析入口
@Override
public void onApplicationEvent(NacosConfigReceivedEvent event) {
// 循环Map (参考: 二、收集每个使用热更新的地方)
for (Map.Entry<String, List<NacosValueTarget>> entry : placeholderNacosValueTargetMap.entrySet()) {
String key = environment.resolvePlaceholders(entry.getKey());
// 获取新的值
String newValue = environment.getProperty(key);
// null则跳过,项目启动后如果删掉热更新的值,项目其实还是用的之前的
if (newValue == null) {
continue;
}
List<NacosValueTarget> beanPropertyList = entry.getValue();
for (NacosValueTarget target : beanPropertyList) {
// 同样计算 新值的md5,进行对比,不一样就更新
String md5String = MD5Utils.md5Hex(newValue, "UTF-8");
boolean isUpdate = !target.lastMD5.equals(md5String);
if (isUpdate) {
target.updateLastMD5(md5String);
Object evaluatedValue = resolveNotifyValue(target.nacosValueExpr, key, newValue);
if (target.method == null) {
setField(target, evaluatedValue);
}
else {
setMethod(target, evaluatedValue);
}
}
}
}
}
private void setField(final NacosValueTarget nacosValueTarget,
final Object propertyValue) {
final Object bean = nacosValueTarget.bean;
Field field = nacosValueTarget.field;
String fieldName = field.getName();
try {
ReflectionUtils.makeAccessible(field);
field.set(bean, convertIfNecessary(field, propertyValue));
if (logger.isDebugEnabled()) {
logger.debug("Update value of the {}" + " (field) in {} (bean) with {}",
fieldName, nacosValueTarget.beanName, propertyValue);
}
}
catch (Throwable e) {
if (logger.isErrorEnabled()) {
logger.error("Can't update value of the " + fieldName + " (field) in "
+ nacosValueTarget.beanName + " (bean)", e);
}
}
}
五、总结
总结
- 在配置文件中开启热更新
nacos.config.autoRefresh=ture
,(大部分都是用yaml格式,这里为了展示方便用 properties格式) - 项目初始化的时候
NacosConfigApplicationContextInitializer#initialize
会为每一个data-id创建一个 Listener。最终会存入AtomicReference<Map<String, CacheData>> cacheMap
NacosValueAnnotationBeanPostProcessor#postProcessBeforeInitialization
会把每一个使用了@NacosValue(value = "${testOneKey}", autoRefreshed = true)
的地方都解析到,存入Map<String, List<NacosValueTarget>>
中,testOneKey 为key,value 是每一个使用方的bean
ClientWorker.ConfigRpcTransportClient#startInternal
会启动一个定时任务,轮训的去获取每一个 data-id的远程数据。如果发现内容有变化(通过md5对比),就发送处理事件NacosValueAnnotationBeanPostProcessor#onApplicationEvent
接受到事件之后,拿到最终的value,进行md5对比,有变动就通过反射去更新