nacos配置中心
nacos配置中心
总结
2.x中,Nacos配置中心主要的改动在于引进长连接代替了短连接长轮询。
1.x中,Nacos配置中心通过长轮询的方式更新客户端配置,属于客户端主动拉取配置;
2.x中,Nacos配置中心支持客户端定时同步配置,服务端回调通知,所以2.x属于推拉结合的方式。
- 拉:客户端会对全量CacheData发起配置监听请求ConfigBatchListenRequest,如果配置md5发生变更,会同步收到变更配置项,发起ConfigQuery请求查询实时配置。
- 推:服务端配置变更,会发送ConfigChangeNotifyRequest请求给与当前节点建立长连接的客户端通知配置变更项。
由于2.x使用长连接代替长轮询,监听请求ConfigBatchListenRequest不会被服务端hold住,会立即返回。服务端只是将监听关系保存在内存中,方便后续通知。
groupKey和connectionId的映射关系,方便后续通过变更配置项找到对应客户端长连接;connectionId和groupKey的映射关系,只是为了控制台展示。这些关系保存在服务端的ConfigChangeListenContext单例中。
1.x需要通过groupKey找到仍然在进行长轮询的客户端AsyncContext;2.x是通过groupKey找到connectionId,再通过connectionId找到长连接,发送ConfigChangeNotifyRequest通知客户端配置变更。
概述
客户端程序启动拉取配置。
客户端注册配置变化监听器。
客户端查询配置。
客户端修改配置。
服务端程序启动保存配置到磁盘。
服务端注册监听到客户端信息。
服务端查询配置。
服务端修改配置。
服务端通知监听。
本文以nacos2.x分析。需要结合spring cloud alibaba、spring boot配置加载相关逻辑。
Nacos 配置管理一致性协议分为两个大部分,第一部分是 Server 间一致性协议,一个是 SDK 与 Server 的一致性协议,配置作为分布式系统中非强一致数据,在现脑裂的时候可用性高于一致性, 因此阿里配置中心是采用 AP 一致性协议。
Server 间的一致性协议
有 DB 模式(读写分离架构)
一致性的核心是 Server 与 DB 保持数据一致性,从而保证 Server 数据一致;Server 之间都是对 等的。数据写任何一个 Server,优先持久化,持久化成功后异步通知其他节点到数据库中拉取最新 配置值,并且通知写入成功。
无 DB 模式
Server 间采用 Raft 协议保证数据一致性,行业大部分产品采用此模式,因此不展开介绍。Nacos 提供此模式,是方便用户本机运行,降低对存储依赖。
SDK 与 Server 的一致性协议
SDK 与 Server 一致性协议的核心是通过 MD5 值是否一致,如果不一致就拉取最新值。
Nacos 1.X
Nacos 1.X 采用 Http 1.1 短链接模拟长链接,每 30s 发一个心跳跟 Server 对比 SDK 配置 MD 5 值是否跟 Server 保持一致,如果一致就 hold 住链接,如果有不一致配置,就把不一致的配置 返回,然后 SDK 获取最新配置值。
Nacos 2.X
Nacos 2.x 相比上面 30s 一次的长轮训,升级成长链接模式,配置变更,启动建立长链接,配置变更服务端推送变更配置列表,然后 SDK 拉取配置更新,因此通信效率大幅提升。
客户端
客户端程序启动拉取配置
这一部分在springbooot配置加载相关文章中分析过了。nacos基于springboot2.4以后提供的imports机制做了拓展。客户端代码最终都是通过NacosConfigService去操作配置的。
客户端配置监听器逻辑
- 客户端在启动的时候会轮训配置监听器,请求服务端看配置是否变化,配置变化则回调监听器方法,最终发布RefreshEvent,触发springcloud配置动态刷新的逻辑。
- 客户端注册ConfigChangeNotifyRequest的处理器,服务端配置变化的时候会发生ConfigChangeNotifyRequest,最终客户端执行的还是上述轮训逻辑。
NacosConfigService会创建ClientWorker和服务端通信。ClientWorker中初始化ConfigRpcTransportClient并且调用他的start方法。
public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager,
final Properties properties) throws NacosException {
...........
agent = new ConfigRpcTransportClient(properties, serverListManager);
int count = ThreadUtils.getSuitableThreadCount(THREAD_MULTIPLE);
ScheduledExecutorService executorService = Executors
.newScheduledThreadPool(Math.max(count, MIN_THREAD_NUM), r -> {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker");
t.setDaemon(true);
return t;
});
agent.setExecutor(executorService);
agent.start();
}
ConfigRpcTransportClient的start方法会执行内部的监听任务即executeConfigListen方法。
ConfigRpcTransportClient:
public void start() throws NacosException {
.........
startInternal();
}
public void startInternal() throws NacosException {
executor.schedule(() -> {
while (!executor.isShutdown() && !executor.isTerminated()) {
try {
// 并发控制 典型的消费者生产者模型
listenExecutebell.poll(5L, TimeUnit.SECONDS);
if (executor.isShutdown() || executor.isTerminated()) {
continue;
}
// 执行内部监听
executeConfigListen();
} catch (Exception e) {
LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);
}
}
}, 0L, TimeUnit.MILLISECONDS);
}
客户端会不断循环监听缓存,检查是否要更新缓存。关键的代码在checkListenCache方法。
/**
* 执行配置监听
*/
@Override
public void executeConfigListen() {
// 需要注册的监听
Map<String, List<CacheData>> listenCachesMap = new HashMap<>(16);
// 需要注销的监听
Map<String, List<CacheData>> removeListenCachesMap = new HashMap<>(16);
// 是否需要全量同步 根据时间间隔判断
long now = System.currentTimeMillis();
boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;
// 遍历所有监听缓存
for (CacheData cache : cacheMap.get().values()) {
synchronized (cache) {
// 检查本地配置信息是否发生了变化 并更新CacheData信息
checkLocalConfig(cache);
// 本地缓存是否和服务器一致。
if (cache.isConsistentWithServer()) {
cache.checkListenerMd5();
if (!needAllSync) {
continue;
}
}
// 如果使用了本地配置信息,则直接跳过处理。
if (cache.isUseLocalConfigInfo()) {
continue;
}
// 判断缓存的信息是否是废弃的
if (!cache.isDiscard()) {
// 不是废弃的 则需要监听
List<CacheData> cacheDatas = listenCachesMap.computeIfAbsent(String.valueOf(cache.getTaskId()),
k -> new LinkedList<>());
cacheDatas.add(cache);
} else {
// 是废弃的 则需要移除监听
List<CacheData> cacheDatas = removeListenCachesMap.computeIfAbsent(
String.valueOf(cache.getTaskId()), k -> new LinkedList<>());
cacheDatas.add(cache);
}
}
}
// 执行check listen,如果有更改键,则返回true。
boolean hasChangedKeys = checkListenCache(listenCachesMap);
//execute check remove listen.
checkRemoveListenCache(removeListenCachesMap);
if (needAllSync) {
lastAllSyncTime = now;
}
//If has changed keys,notify re sync md5.
if (hasChangedKeys) {
notifyListenConfig();
}
}
checkListenCache方法通过构建ConfigBatchListenRequest请求,注册到服务端当前客户端配置的md5,获取配置变化可以直接通知客户端。如果配置变更则刷新变更的配置,同时通知监听器。
private boolean checkListenCache(Map<String, List<CacheData>> listenCachesMap) {
final AtomicBoolean hasChangedKeys = new AtomicBoolean(false);
if (!listenCachesMap.isEmpty()) {
List<Future> listenFutures = new ArrayList<>();
for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
String taskId = entry.getKey();
ExecutorService executorService = ensureSyncExecutor(taskId);
Future future = executorService.submit(() -> {
List<CacheData> listenCaches = entry.getValue();
// 重置通知更改标志。
for (CacheData cacheData : listenCaches) {
cacheData.getReceiveNotifyChanged().set(false);
}
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
configChangeListenRequest.setListen(true);
try {
RpcClient rpcClient = ensureRpcClient(taskId);
// 请求服务端 配置批量监听请求
ConfigChangeBatchListenResponse listenResponse = (ConfigChangeBatchListenResponse) requestProxy(
rpcClient, configChangeListenRequest);
if (listenResponse != null && listenResponse.isSuccess()) {
Set<String> changeKeys = new HashSet<String>();
List<ConfigChangeBatchListenResponse.ConfigContext> changedConfigs = listenResponse.getChangedConfigs();
// 处理更改的配置,通知侦听器
if (!CollectionUtils.isEmpty(changedConfigs)) {
// 存在修改的文件
hasChangedKeys.set(true);
// 遍历发生更改的文件
for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : changedConfigs) {
String changeKey = GroupKey.getKeyTenant(changeConfig.getDataId(),
changeConfig.getGroup(), changeConfig.getTenant());
changeKeys.add(changeKey);
boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
// 刷新变更的配置 同时通知监听器
refreshContentAndCheck(changeKey, !isInitializing);
}
}
for (CacheData cacheData : listenCaches) {
if (cacheData.getReceiveNotifyChanged().get()) {
String changeKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group,
cacheData.getTenant());
if (!changeKeys.contains(changeKey)) {
boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
// 刷新变更的配置 同时通知监听器
refreshContentAndCheck(changeKey, !isInitializing);
}
}
}
//handler content configs
for (CacheData cacheData : listenCaches) {
cacheData.setInitializing(false);
String groupKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group,
cacheData.getTenant());
if (!changeKeys.contains(groupKey)) {
synchronized (cacheData) {
if (!cacheData.getReceiveNotifyChanged().get()) {
cacheData.setConsistentWithServer(true);
}
}
}
}
}
} catch (Throwable e) {
LOGGER.error("Execute listen config change error ", e);
try {
Thread.sleep(50L);
} catch (InterruptedException interruptedException) {
//ignore
}
notifyListenConfig();
}
});
listenFutures.add(future);
}
for (Future future : listenFutures) {
try {
future.get();
} catch (Throwable throwable) {
LOGGER.error("Async listen config change error ", throwable);
}
}
}
return hasChangedKeys.get();
}
refreshContentAndCheck获取最新的配置,同时和监听器中文件md5比对是否发生变化。配置内容发生变化则回调监听器的receiveConfigInfo方法,同时更新监听器的配置信息。
private void refreshContentAndCheck(String groupKey, boolean notify) {
if (cacheMap.get() != null && cacheMap.get().containsKey(groupKey)) {
CacheData cache = cacheMap.get().get(groupKey);
refreshContentAndCheck(cache, notify);
}
}
/**
* 获取最新的配置信息,并通知需要更新的监听器.
* @param cacheData
* @param notify
*/
private void refreshContentAndCheck(CacheData cacheData, boolean notify) {
try {
// 请求服务端 获取最新的配置信息
ConfigResponse response = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L,
notify);
cacheData.setEncryptedDataKey(response.getEncryptedDataKey());
cacheData.setContent(response.getContent());
if (null != response.getConfigType()) {
cacheData.setType(response.getConfigType());
}
if (notify) {
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
agent.getName(), cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.getMd5(),
ContentUtils.truncateContent(response.getContent()), response.getConfigType());
}
// 通知需要更新的监听器
cacheData.checkListenerMd5();
} catch (Exception e) {
LOGGER.error("refresh content and check md5 fail ,dataId={},group={},tenant={} ", cacheData.dataId,
cacheData.group, cacheData.tenant, e);
}
}
客户端查询配置
入口都是NacosConfigService。最终构建ConfigQueryResponse请求服务端。
客户端修改配置。
以发布配置为例,客户端构建ConfigPublishRequest请求服务端。
服务端
服务端程序启动保存配置到磁盘
@Conditional(ConditionOnExternalStorage.class)
@Component
@DependsOn({"rpcConfigChangeNotifier"})
public class ExternalDumpService extends DumpService {
@PostConstruct
@Override
protected void init() throws Throwable {
dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor, dumpAllTagProcessor);
}
@Override
protected boolean canExecute() {
return memberManager.isFirstIp();
}
}
protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProcessor,
DumpAllBetaProcessor dumpAllBetaProcessor, DumpAllTagProcessor dumpAllTagProcessor) throws NacosException {
String dumpFileContext = "CONFIG_DUMP_TO_FILE";
TimerContext.start(dumpFileContext);
try {
LogUtil.DEFAULT_LOG.warn("DumpService start");
Runnable dumpAll = () -> dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());
Runnable dumpAllBeta = () -> dumpAllTaskMgr.addTask(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask());
Runnable dumpAllTag = () -> dumpAllTaskMgr.addTask(DumpAllTagTask.TASK_ID, new DumpAllTagTask());
Runnable clearConfigHistory = () -> {
LOGGER.warn("clearConfigHistory start");
if (canExecute()) {
try {
Timestamp startTime = getBeforeStamp(TimeUtils.getCurrentTime(), 24 * getRetentionDays());
int pageSize = 1000;
LOGGER.warn("clearConfigHistory, getBeforeStamp:{}, pageSize:{}", startTime, pageSize);
historyConfigInfoPersistService.removeConfigHistory(startTime, pageSize);
} catch (Throwable e) {
LOGGER.error("clearConfigHistory error : {}", e.toString());
}
}
};
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
try {
// 转存配置 执行一次DumpAllTask
dumpConfigInfo(dumpAllProcessor);
// update Beta cache
LogUtil.DEFAULT_LOG.info("start clear all config-info-beta.");
DiskUtil.clearAllBeta();
if (namespacePersistService.isExistTable(BETA_TABLE_NAME)) {
dumpAllBetaProcessor.process(new DumpAllBetaTask());
}
// update Tag cache
LogUtil.DEFAULT_LOG.info("start clear all config-info-tag.");
DiskUtil.clearAllTag();
if (namespacePersistService.isExistTable(TAG_TABLE_NAME)) {
dumpAllTagProcessor.process(new DumpAllTagTask());
}
// add to dump aggr
List<ConfigInfoChanged> configList = configInfoAggrPersistService.findAllAggrGroup();
if (configList != null && !configList.isEmpty()) {
total = configList.size();
List<List<ConfigInfoChanged>> splitList = splitList(configList, INIT_THREAD_COUNT);
for (List<ConfigInfoChanged> list : splitList) {
MergeAllDataWorker work = new MergeAllDataWorker(list);
work.start();
}
LOGGER.info("server start, schedule merge end.");
}
} catch (Exception e) {
LogUtil.FATAL_LOG.error(
"Nacos Server did not start because dumpservice bean construction failure :\n" + e);
throw new NacosException(NacosException.SERVER_ERROR,
"Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage(),
e);
}
if (!EnvUtil.getStandaloneMode()) {
Runnable heartbeat = () -> {
String heartBeatTime = TimeUtils.getCurrentTime().toString();
// write disk
try {
DiskUtil.saveHeartBeatToDisk(heartBeatTime);
} catch (IOException e) {
LogUtil.FATAL_LOG.error("save heartbeat fail" + e.getMessage());
}
};
ConfigExecutor.scheduleConfigTask(heartbeat, 0, 10, TimeUnit.SECONDS);
long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10;
LogUtil.DEFAULT_LOG.warn("initialDelay:{}", initialDelay);
// 6个小时执行一次DumpAllTask
ConfigExecutor.scheduleConfigTask(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
ConfigExecutor.scheduleConfigTask(dumpAllBeta, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE,
TimeUnit.MINUTES);
ConfigExecutor.scheduleConfigTask(dumpAllTag, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE,
TimeUnit.MINUTES);
ConfigExecutor.scheduleConfigTask(new DumpChangeConfigWorker(this, currentTime), 0,
DUMP_CHANGE_INTERVAL_IN_SECONDS, TimeUnit.SECONDS);
}
ConfigExecutor.scheduleConfigTask(clearConfigHistory, 10, 10, TimeUnit.MINUTES);
} finally {
TimerContext.end(dumpFileContext, LogUtil.DUMP_LOG);
}
}
process()会分页查询出数据库的所有配置,然后一个一个调用ConfigCacheService.dump()。
public boolean process(NacosTask task) {
long currentMaxId = configInfoPersistService.findConfigMaxId();
long lastMaxId = 0;
while (lastMaxId < currentMaxId) {
Page<ConfigInfoWrapper> page = configInfoPersistService.findAllConfigInfoFragment(lastMaxId, PAGE_SIZE);
if (page == null || page.getPageItems() == null || page.getPageItems().isEmpty()) {
break;
}
for (ConfigInfoWrapper cf : page.getPageItems()) {
long id = cf.getId();
lastMaxId = Math.max(id, lastMaxId);
if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) {
AggrWhitelist.load(cf.getContent());
}
if (cf.getDataId().equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
ClientIpWhiteList.load(cf.getContent());
}
if (cf.getDataId().equals(SwitchService.SWITCH_META_DATA_ID)) {
SwitchService.load(cf.getContent());
}
// dump为文件
ConfigCacheService.dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(),
cf.getLastModified(), cf.getType(), cf.getEncryptedDataKey());
final String content = cf.getContent();
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
LogUtil.DUMP_LOG.info("[dump-all-ok] {}, {}, length={}, md5={}",
GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(), content.length(),
md5);
}
DEFAULT_LOG.info("[all-dump] {} / {}", lastMaxId, currentMaxId);
}
return true;
}
dump()就是将数据库的配置,保存到本地,一个配置对应一个文件,这样客户端来查询配置,直接查的本地文件,而不是查数据库。同时会发布LocalDataChangeEvent,触发回调客户端监听的逻辑。RpcConfigChangeNotifier监听LocalDataChangeEvent,向客户端发送ConfigChangeNotifyRequest。
服务端查询配置
如果不是内嵌式数据库,其实查的是本地文件,以此减轻数据库的压力。
ConfigQueryRequestHandler:
if (PropertyUtil.isDirectRead()) {
configInfoBase = configInfoBetaPersistService.findConfigInfo4Beta(dataId, group, tenant);
} else {
content = ConfigDiskServiceFactory.getInstance().getBetaContent(dataId, group, tenant);
}
服务端修改配置
入库,然后修改配置还会触发ConfigDataChangeEvent,最终通过AsyncNotifyService执行AsyncRpcTask,还是会dump为本地文件。