【中】Nacos 热更新
最近有一个需求,需要把Nacos上某个配置文件,从A_data-id迁移到B_data-id上。
本质上就是看删除某个配置,会不会影响到正在运行的服务。我们知道Nacos有一个功能是热更新,在Nacos控制台上修改变量会实时被应用感知,那如果删除某个热更新变量,会不会导致我们正在运行的环境这个值为 null呢?
答案是不会
比如某个变量原本的值是 1,在Nacos删除这个变量发布后,未重启的服务读取的还是 1。
这虽然和理解上热更新有一点点差异(代码的值不应该和Nacos时刻保持一致吗?)但我个人觉得也合理,这是一种特殊的保护机制。
下面来从源码的角度来看看为何没有影响。
在Nacos上点了发布之后,Nacos就会把变更之后的内容推送到订阅了当前 data-id的客户端 (不管是否修改了内容,只要点了发布都推送)
客户端有一个定时轮询器,定时5s从一个 BlockingQueue 里面获取任务。每次Nacos推送给订阅者的请求都会是一个任务。
com.alibaba.nacos.client.config.impl.ClientWorker.ConfigRpcTransportClient#startInternal
private final BlockingQueue<Object> listenExecutebell = new ArrayBlockingQueue(1);
public void startInternal() {
this.executor.schedule(() -> {
while(!this.executor.isShutdown() && !this.executor.isTerminated()) {
try {
this.listenExecutebell.poll(5L, TimeUnit.SECONDS);
if (!this.executor.isShutdown() && !this.executor.isTerminated()) {
this.executeConfigListen();
}
} catch (Exception var2) {
ClientWorker.LOGGER.error("[ rpc listen execute ] [rpc listen] exception", var2);
}
}
}, 0L, TimeUnit.MILLISECONDS);
}
客户端的定时任务每5s循环一次,看看是否有新的配置文件变动。 (这里是循环获取本地的阻塞队列数据,至于Nacos如何把任务塞到队列的代码可以自行去看看,有了队列还会找不到塞的操作吗)
如果获取到任务就会执行 executeConfigListen
,这个方法会去判断当前data-id的内容是否有变动,如果有变动就会发送事件去处理。
判断的逻辑也很简单,拿配置文件里面的全部内容生成一个 md5,和上一次的md5进行对比,如果两个md5不一致,则说明有变化。
public void executeConfigListen() {
Map<String, List<CacheData>> listenCachesMap = new HashMap(16);
Map<String, List<CacheData>> removeListenCachesMap = new HashMap(16);
long now = System.currentTimeMillis();
boolean needAllSync = now - this.lastAllSyncTime >= 300000L;
Iterator var6 = ((Map)ClientWorker.this.cacheMap.get()).values().iterator();
while(var6.hasNext()) {
CacheData cache = (CacheData)var6.next();
synchronized(cache) {
if (cache.isSyncWithServer()) {
// md5判断的方法
cache.checkListenerMd5();
if (!needAllSync) {
continue;
}
}
// ...
// 省略很多的逻辑
}
}
// ...
// 省略很多的逻辑
}
com.alibaba.nacos.client.config.impl.CacheData#checkListenerMd5
void checkListenerMd5() {
Iterator var1 = this.listeners.iterator();
while(var1.hasNext()) {
ManagerListenerWrap wrap = (ManagerListenerWrap)var1.next();
// md5 对比
if (!this.md5.equals(wrap.lastCallMd5)) {
this.safeNotifyListener(this.dataId, this.group, this.content, this.type, this.md5, this.encryptedDataKey, wrap);
}
}
}
每一个data-id,都会被封装成一个 CacheData对象,它里面有很多内容:data-id的名称、内容、md5、group 等等
来看一下 CacheData 的构造方法,方便理解md5
// 默认为 true
static boolean initSnapshot = Boolean.valueOf(System.getProperty("nacos.cache.data.init.snapshot", "true"));
public CacheData(ConfigFilterChainManager configFilterChainManager, String name, String dataId, String group) {
if (null != dataId && null != group) {
this.name = name;
this.configFilterChainManager = configFilterChainManager;
this.dataId = dataId;
this.group = group;
this.tenant = TenantUtil.getUserTenantForAcm();
this.listeners = new CopyOnWriteArrayList();
this.isInitializing = true;
if (initSnapshot) {
this.content = this.loadCacheContentFromDiskLocal(name, dataId, group, this.tenant);
this.md5 = getMd5String(this.content);
}
this.encryptedDataKey = this.loadEncryptedDataKeyFromDiskLocal(name, dataId, group, this.tenant);
} else {
throw new IllegalArgumentException("dataId=" + dataId + ", group=" + group);
}
}
当md5不一致的时候就会执行 safeNotifyListener
这个方法里面会去解析配置文件里面的内容,然后发一个事件,这个事件就是热更新的核心了。
com.alibaba.nacos.client.config.impl.CacheData#safeNotifyListener
private void safeNotifyListener(String dataId, String group, String content, String type, String md5, String encryptedDataKey, ManagerListenerWrap listenerWrap) {
Listener listener = listenerWrap.listener;
if (listenerWrap.inNotifying) {
LOGGER.warn("[{}] [notify-currentSkip] dataId={}, group={}, md5={}, listener={}, listener is not finish yet,will try next time.", new Object[]{this.name, dataId, group, md5, listener});
} else {
Runnable job = () -> {
long start = System.currentTimeMillis();
ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
ClassLoader appClassLoader = listener.getClass().getClassLoader();
try {
if (listener instanceof AbstractSharedListener) {
AbstractSharedListener adapter = (AbstractSharedListener)listener;
adapter.fillContext(dataId, group);
LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", new Object[]{this.name, dataId, group, md5});
}
// ...
// 解析和发事件
listener.receiveConfigInfo(contentTmp);
// ...
listenerWrap.lastCallMd5 = md5;
} catch (NacosException var21) {
// ...
} catch (Throwable var22) {
// ...
} finally {
listenerWrap.inNotifying = false;
Thread.currentThread().setContextClassLoader(myClassLoader);
}
};
long startNotify = System.currentTimeMillis();
// ...
}
}
com.alibaba.nacos.spring.context.event.config.DelegatingEventPublishingListener#receiveConfigInfo
public void receiveConfigInfo(String content) {
// 解析 content
this.onReceived(content);
// 发送热更新事件
this.publishEvent(content);
}
private void publishEvent(String content) {
NacosConfigReceivedEvent event = new NacosConfigReceivedEvent(this.configService, this.dataId, this.groupId, content, this.configType);
this.applicationEventPublisher.publishEvent(event);
}
private void onReceived(String content) {
this.delegate.receiveConfigInfo(content);
}
事件消费
com.alibaba.nacos.spring.context.annotation.config.NacosValueAnnotationBeanPostProcessor#onApplicationEvent
public void onApplicationEvent(NacosConfigReceivedEvent event) {
// placeholderNacosValueTargetMap 里面存了每一个需要热更新的配置
Iterator var2 = this.placeholderNacosValueTargetMap.entrySet().iterator();
while(true) {
Map.Entry entry;
String key;
String newValue;
do {
if (!var2.hasNext()) {
return;
}
entry = (Map.Entry)var2.next();
key = this.environment.resolvePlaceholders((String)entry.getKey());
newValue = this.environment.getProperty(key);
// 如果新的值为 null, 则跳过当前循环,这也就是说明为什么 删了Nacos配置,未重启的系统读取的仍是之前的值
} while(newValue == null);
List<NacosValueTarget> beanPropertyList = (List)entry.getValue();
Iterator var7 = beanPropertyList.iterator();
while(var7.hasNext()) {
// 拿当前值的 md5 和 新的值md5去对比,如果不一样说明变了,就更新
NacosValueTarget target = (NacosValueTarget)var7.next();
String md5String = MD5Utils.md5Hex(newValue, "UTF-8");
boolean isUpdate = !target.lastMD5.equals(md5String);
if (isUpdate) {
target.updateLastMD5(md5String);
Object evaluatedValue = this.resolveNotifyValue(target.nacosValueExpr, key, newValue);
if (target.method == null) {
// 设置值
this.setField(target, evaluatedValue);
} else {
this.setMethod(target, evaluatedValue);
}
}
}
}
}