nacos配置中心

fxz大约 18 分钟

nacos配置中心

总结

2.x中,Nacos配置中心主要的改动在于引进长连接代替了短连接长轮询

1.x中,Nacos配置中心通过长轮询的方式更新客户端配置,属于客户端主动拉取配置;

2.x中,Nacos配置中心支持客户端定时同步配置,服务端回调通知,所以2.x属于推拉结合的方式。

  • 拉:客户端会对全量CacheData发起配置监听请求ConfigBatchListenRequest,如果配置md5发生变更,会同步收到变更配置项,发起ConfigQuery请求查询实时配置。
  • :服务端配置变更,会发送ConfigChangeNotifyRequest请求给与当前节点建立长连接的客户端通知配置变更项。

由于2.x使用长连接代替长轮询,监听请求ConfigBatchListenRequest不会被服务端hold住,会立即返回。服务端只是将监听关系保存在内存中,方便后续通知。

groupKey和connectionId的映射关系,方便后续通过变更配置项找到对应客户端长连接;connectionId和groupKey的映射关系,只是为了控制台展示。这些关系保存在服务端的ConfigChangeListenContext单例中。

1.x需要通过groupKey找到仍然在进行长轮询的客户端AsyncContext;2.x是通过groupKey找到connectionId,再通过connectionId找到长连接,发送ConfigChangeNotifyRequest通知客户端配置变更。

概述

  • 客户端程序启动拉取配置。

  • 客户端注册配置变化监听器。

  • 客户端查询配置。

  • 客户端修改配置。

  • 服务端程序启动保存配置到磁盘。

  • 服务端注册监听到客户端信息。

  • 服务端查询配置。

  • 服务端修改配置。

  • 服务端通知监听。

本文以nacos2.x分析。需要结合spring cloud alibaba、spring boot配置加载相关逻辑。

Nacos 配置管理一致性协议分为两个大部分,第一部分是 Server 间一致性协议,一个是 SDK 与 Server 的一致性协议,配置作为分布式系统中非强一致数据,在现脑裂的时候可用性高于一致性, 因此阿里配置中心是采用 AP 一致性协议。

Server 间的一致性协议

有 DB 模式(读写分离架构)

一致性的核心是 Server 与 DB 保持数据一致性,从而保证 Server 数据一致;Server 之间都是对 等的。数据写任何一个 Server,优先持久化,持久化成功后异步通知其他节点到数据库中拉取最新 配置值,并且通知写入成功。

无 DB 模式

Server 间采用 Raft 协议保证数据一致性,行业大部分产品采用此模式,因此不展开介绍。Nacos 提供此模式,是方便用户本机运行,降低对存储依赖。

SDK 与 Server 的一致性协议

SDK 与 Server 一致性协议的核心是通过 MD5 值是否一致,如果不一致就拉取最新值。

Nacos 1.X

Nacos 1.X 采用 Http 1.1 短链接模拟长链接,每 30s 发一个心跳跟 Server 对比 SDK 配置 MD 5 值是否跟 Server 保持一致,如果一致就 hold 住链接,如果有不一致配置,就把不一致的配置 返回,然后 SDK 获取最新配置值。

Nacos 2.X

Nacos 2.x 相比上面 30s 一次的长轮训,升级成长链接模式,配置变更,启动建立长链接,配置变更服务端推送变更配置列表,然后 SDK 拉取配置更新,因此通信效率大幅提升。

客户端

客户端程序启动拉取配置

这一部分在springbooot配置加载相关文章中分析过了。nacos基于springboot2.4以后提供的imports机制做了拓展。客户端代码最终都是通过NacosConfigService去操作配置的。

Spring Boot配置加载流程open in new window

客户端配置监听器逻辑

  • 客户端在启动的时候会轮训配置监听器,请求服务端看配置是否变化,配置变化则回调监听器方法,最终发布RefreshEvent,触发springcloud配置动态刷新的逻辑。
  • 客户端注册ConfigChangeNotifyRequest的处理器,服务端配置变化的时候会发生ConfigChangeNotifyRequest,最终客户端执行的还是上述轮训逻辑。

NacosConfigService会创建ClientWorker和服务端通信。ClientWorker中初始化ConfigRpcTransportClient并且调用他的start方法。

public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager,
        final Properties properties) throws NacosException {
		
  	...........
      
    agent = new ConfigRpcTransportClient(properties, serverListManager);
    int count = ThreadUtils.getSuitableThreadCount(THREAD_MULTIPLE);
    ScheduledExecutorService executorService = Executors
            .newScheduledThreadPool(Math.max(count, MIN_THREAD_NUM), r -> {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker");
                t.setDaemon(true);
                return t;
            });
    agent.setExecutor(executorService);
    agent.start();
    
}

ConfigRpcTransportClient的start方法会执行内部的监听任务即executeConfigListen方法。

ConfigRpcTransportClientpublic void start() throws NacosException {
   	......... 
    startInternal();
}

public void startInternal() throws NacosException {
    executor.schedule(() -> {
        while (!executor.isShutdown() && !executor.isTerminated()) {
            try {
              	// 并发控制 典型的消费者生产者模型
                listenExecutebell.poll(5L, TimeUnit.SECONDS);
                if (executor.isShutdown() || executor.isTerminated()) {
                    continue;
                }
              	// 执行内部监听
                executeConfigListen();
            } catch (Exception e) {
                LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);
            }
        }
    }, 0L, TimeUnit.MILLISECONDS);
    
}

客户端会不断循环监听缓存,检查是否要更新缓存。关键的代码在checkListenCache方法。

  			/**
         * 执行配置监听
         */
        @Override
        public void executeConfigListen() {
						// 需要注册的监听
            Map<String, List<CacheData>> listenCachesMap = new HashMap<>(16);
          	// 需要注销的监听
            Map<String, List<CacheData>> removeListenCachesMap = new HashMap<>(16);

            // 是否需要全量同步 根据时间间隔判断
            long now = System.currentTimeMillis();
            boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;

          	// 遍历所有监听缓存
            for (CacheData cache : cacheMap.get().values()) {

                synchronized (cache) {

                    // 检查本地配置信息是否发生了变化 并更新CacheData信息
                    checkLocalConfig(cache);

                    // 本地缓存是否和服务器一致。
                    if (cache.isConsistentWithServer()) {
                        cache.checkListenerMd5();
                        if (!needAllSync) {
                            continue;
                        }
                    }

                    // 如果使用了本地配置信息,则直接跳过处理。
                    if (cache.isUseLocalConfigInfo()) {
                        continue;
                    }

                    // 判断缓存的信息是否是废弃的
                    if (!cache.isDiscard()) {
                        // 不是废弃的 则需要监听
                        List<CacheData> cacheDatas = listenCachesMap.computeIfAbsent(String.valueOf(cache.getTaskId()),
                                k -> new LinkedList<>());
                        cacheDatas.add(cache);
                    } else {
                        // 是废弃的 则需要移除监听
                        List<CacheData> cacheDatas = removeListenCachesMap.computeIfAbsent(
                                String.valueOf(cache.getTaskId()), k -> new LinkedList<>());
                        cacheDatas.add(cache);
                    }
                }

            }

            // 执行check listen,如果有更改键,则返回true。
            boolean hasChangedKeys = checkListenCache(listenCachesMap);

            //execute check remove listen.
            checkRemoveListenCache(removeListenCachesMap);

            if (needAllSync) {
                lastAllSyncTime = now;
            }
            //If has changed keys,notify re sync md5.
            if (hasChangedKeys) {
                notifyListenConfig();
            }

        }

checkListenCache方法通过构建ConfigBatchListenRequest请求,注册到服务端当前客户端配置的md5,获取配置变化可以直接通知客户端。如果配置变更则刷新变更的配置,同时通知监听器。

private boolean checkListenCache(Map<String, List<CacheData>> listenCachesMap) {

    final AtomicBoolean hasChangedKeys = new AtomicBoolean(false);
    if (!listenCachesMap.isEmpty()) {
        List<Future> listenFutures = new ArrayList<>();
        for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
            String taskId = entry.getKey();
            ExecutorService executorService = ensureSyncExecutor(taskId);
            Future future = executorService.submit(() -> {
                List<CacheData> listenCaches = entry.getValue();
                // 重置通知更改标志。
                for (CacheData cacheData : listenCaches) {
                    cacheData.getReceiveNotifyChanged().set(false);
                }

                ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
                configChangeListenRequest.setListen(true);
                try {
                    RpcClient rpcClient = ensureRpcClient(taskId);
                    // 请求服务端 配置批量监听请求
                    ConfigChangeBatchListenResponse listenResponse = (ConfigChangeBatchListenResponse) requestProxy(
                            rpcClient, configChangeListenRequest);
                    if (listenResponse != null && listenResponse.isSuccess()) {

                        Set<String> changeKeys = new HashSet<String>();

                        List<ConfigChangeBatchListenResponse.ConfigContext> changedConfigs = listenResponse.getChangedConfigs();
                        // 处理更改的配置,通知侦听器
                        if (!CollectionUtils.isEmpty(changedConfigs)) {
                            // 存在修改的文件
                            hasChangedKeys.set(true);
                          	// 遍历发生更改的文件
                            for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : changedConfigs) {
                                String changeKey = GroupKey.getKeyTenant(changeConfig.getDataId(),
                                        changeConfig.getGroup(), changeConfig.getTenant());
                                changeKeys.add(changeKey);
                                boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
                                // 刷新变更的配置 同时通知监听器
                                refreshContentAndCheck(changeKey, !isInitializing);
                            }

                        }

                        for (CacheData cacheData : listenCaches) {
                            if (cacheData.getReceiveNotifyChanged().get()) {
                                String changeKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group,
                                        cacheData.getTenant());
                                if (!changeKeys.contains(changeKey)) {
                                    boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
                                    // 刷新变更的配置 同时通知监听器
                                    refreshContentAndCheck(changeKey, !isInitializing);
                                }
                            }
                        }

                        //handler content configs
                        for (CacheData cacheData : listenCaches) {
                            cacheData.setInitializing(false);
                            String groupKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group,
                                    cacheData.getTenant());
                            if (!changeKeys.contains(groupKey)) {
                                synchronized (cacheData) {
                                    if (!cacheData.getReceiveNotifyChanged().get()) {
                                        cacheData.setConsistentWithServer(true);
                                    }
                                }
                            }
                        }

                    }
                } catch (Throwable e) {
                    LOGGER.error("Execute listen config change error ", e);
                    try {
                        Thread.sleep(50L);
                    } catch (InterruptedException interruptedException) {
                        //ignore
                    }
                    notifyListenConfig();
                }
            });
            listenFutures.add(future);

        }
        for (Future future : listenFutures) {
            try {
                future.get();
            } catch (Throwable throwable) {
                LOGGER.error("Async listen config change error ", throwable);
            }
        }

    }
    return hasChangedKeys.get();
}

refreshContentAndCheck获取最新的配置,同时和监听器中文件md5比对是否发生变化。配置内容发生变化则回调监听器的receiveConfigInfo方法,同时更新监听器的配置信息。

private void refreshContentAndCheck(String groupKey, boolean notify) {
    if (cacheMap.get() != null && cacheMap.get().containsKey(groupKey)) {
        CacheData cache = cacheMap.get().get(groupKey);
        refreshContentAndCheck(cache, notify);
    }
}

    /**
     * 获取最新的配置信息,并通知需要更新的监听器.
     * @param cacheData
     * @param notify
     */
    private void refreshContentAndCheck(CacheData cacheData, boolean notify) {
        try {
            // 请求服务端 获取最新的配置信息
            ConfigResponse response = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L,
                    notify);
            cacheData.setEncryptedDataKey(response.getEncryptedDataKey());
            cacheData.setContent(response.getContent());
            if (null != response.getConfigType()) {
                cacheData.setType(response.getConfigType());
            }
            if (notify) {
                LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                        agent.getName(), cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.getMd5(),
                        ContentUtils.truncateContent(response.getContent()), response.getConfigType());
            }
            // 通知需要更新的监听器
            cacheData.checkListenerMd5();
        } catch (Exception e) {
            LOGGER.error("refresh content and check md5 fail ,dataId={},group={},tenant={} ", cacheData.dataId,
                    cacheData.group, cacheData.tenant, e);
        }
    }

客户端查询配置

入口都是NacosConfigService。最终构建ConfigQueryResponse请求服务端。

客户端修改配置。

以发布配置为例,客户端构建ConfigPublishRequest请求服务端。

服务端

服务端程序启动保存配置到磁盘

@Conditional(ConditionOnExternalStorage.class)
@Component
@DependsOn({"rpcConfigChangeNotifier"})
public class ExternalDumpService extends DumpService {
   
    
    @PostConstruct
    @Override
    protected void init() throws Throwable {
        dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor, dumpAllTagProcessor);
    }
    
    @Override
    protected boolean canExecute() {
        return memberManager.isFirstIp();
    }
}

   protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProcessor,
            DumpAllBetaProcessor dumpAllBetaProcessor, DumpAllTagProcessor dumpAllTagProcessor) throws NacosException {
        String dumpFileContext = "CONFIG_DUMP_TO_FILE";
        TimerContext.start(dumpFileContext);
        try {
            LogUtil.DEFAULT_LOG.warn("DumpService start");
            
            Runnable dumpAll = () -> dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());
            
            Runnable dumpAllBeta = () -> dumpAllTaskMgr.addTask(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask());
            
            Runnable dumpAllTag = () -> dumpAllTaskMgr.addTask(DumpAllTagTask.TASK_ID, new DumpAllTagTask());
            
            Runnable clearConfigHistory = () -> {
                LOGGER.warn("clearConfigHistory start");
                if (canExecute()) {
                    try {
                        Timestamp startTime = getBeforeStamp(TimeUtils.getCurrentTime(), 24 * getRetentionDays());
                        int pageSize = 1000;
                        LOGGER.warn("clearConfigHistory, getBeforeStamp:{}, pageSize:{}", startTime, pageSize);
                        historyConfigInfoPersistService.removeConfigHistory(startTime, pageSize);
                    } catch (Throwable e) {
                        LOGGER.error("clearConfigHistory error : {}", e.toString());
                    }
                }
            };
            
            Timestamp currentTime = new Timestamp(System.currentTimeMillis());
            
            try {
              // 转存配置 执行一次DumpAllTask
                dumpConfigInfo(dumpAllProcessor);
                
                // update Beta cache
                LogUtil.DEFAULT_LOG.info("start clear all config-info-beta.");
                DiskUtil.clearAllBeta();
                if (namespacePersistService.isExistTable(BETA_TABLE_NAME)) {
                    dumpAllBetaProcessor.process(new DumpAllBetaTask());
                }
                // update Tag cache
                LogUtil.DEFAULT_LOG.info("start clear all config-info-tag.");
                DiskUtil.clearAllTag();
                if (namespacePersistService.isExistTable(TAG_TABLE_NAME)) {
                    dumpAllTagProcessor.process(new DumpAllTagTask());
                }
                
                // add to dump aggr
                List<ConfigInfoChanged> configList = configInfoAggrPersistService.findAllAggrGroup();
                if (configList != null && !configList.isEmpty()) {
                    total = configList.size();
                    List<List<ConfigInfoChanged>> splitList = splitList(configList, INIT_THREAD_COUNT);
                    for (List<ConfigInfoChanged> list : splitList) {
                        MergeAllDataWorker work = new MergeAllDataWorker(list);
                        work.start();
                    }
                    LOGGER.info("server start, schedule merge end.");
                }
            } catch (Exception e) {
                LogUtil.FATAL_LOG.error(
                        "Nacos Server did not start because dumpservice bean construction failure :\n" + e);
                throw new NacosException(NacosException.SERVER_ERROR,
                        "Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage(),
                        e);
            }
            if (!EnvUtil.getStandaloneMode()) {
                Runnable heartbeat = () -> {
                    String heartBeatTime = TimeUtils.getCurrentTime().toString();
                    // write disk
                    try {
                        DiskUtil.saveHeartBeatToDisk(heartBeatTime);
                    } catch (IOException e) {
                        LogUtil.FATAL_LOG.error("save heartbeat fail" + e.getMessage());
                    }
                };
                
                ConfigExecutor.scheduleConfigTask(heartbeat, 0, 10, TimeUnit.SECONDS);
                
                long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10;
                LogUtil.DEFAULT_LOG.warn("initialDelay:{}", initialDelay);
              
                // 6个小时执行一次DumpAllTask
                ConfigExecutor.scheduleConfigTask(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
                ConfigExecutor.scheduleConfigTask(dumpAllBeta, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE,
                        TimeUnit.MINUTES);
                ConfigExecutor.scheduleConfigTask(dumpAllTag, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE,
                        TimeUnit.MINUTES);
                ConfigExecutor.scheduleConfigTask(new DumpChangeConfigWorker(this, currentTime), 0,
                        DUMP_CHANGE_INTERVAL_IN_SECONDS, TimeUnit.SECONDS);
                
            }
            
            ConfigExecutor.scheduleConfigTask(clearConfigHistory, 10, 10, TimeUnit.MINUTES);
        } finally {
            TimerContext.end(dumpFileContext, LogUtil.DUMP_LOG);
        }
        
    }
    

process()会分页查询出数据库的所有配置,然后一个一个调用ConfigCacheService.dump()。

public boolean process(NacosTask task) {
    long currentMaxId = configInfoPersistService.findConfigMaxId();
    long lastMaxId = 0;
    while (lastMaxId < currentMaxId) {
        Page<ConfigInfoWrapper> page = configInfoPersistService.findAllConfigInfoFragment(lastMaxId, PAGE_SIZE);
        if (page == null || page.getPageItems() == null || page.getPageItems().isEmpty()) {
            break;
        }
      
        for (ConfigInfoWrapper cf : page.getPageItems()) {
            long id = cf.getId();
            lastMaxId = Math.max(id, lastMaxId);
            if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) {
                AggrWhitelist.load(cf.getContent());
            }
            
            if (cf.getDataId().equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
                ClientIpWhiteList.load(cf.getContent());
            }
            
            if (cf.getDataId().equals(SwitchService.SWITCH_META_DATA_ID)) {
                SwitchService.load(cf.getContent());
            }

            // dump为文件
            ConfigCacheService.dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(),
                    cf.getLastModified(), cf.getType(), cf.getEncryptedDataKey());
            
            final String content = cf.getContent();
            final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
            LogUtil.DUMP_LOG.info("[dump-all-ok] {}, {}, length={}, md5={}",
                    GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(), content.length(),
                    md5);
        }
        DEFAULT_LOG.info("[all-dump] {} / {}", lastMaxId, currentMaxId);
    }
    return true;
}

dump()就是将数据库的配置,保存到本地,一个配置对应一个文件,这样客户端来查询配置,直接查的本地文件,而不是查数据库。同时会发布LocalDataChangeEvent,触发回调客户端监听的逻辑。RpcConfigChangeNotifier监听LocalDataChangeEvent,向客户端发送ConfigChangeNotifyRequest。

服务端查询配置

如果不是内嵌式数据库,其实查的是本地文件,以此减轻数据库的压力。

ConfigQueryRequestHandlerif (PropertyUtil.isDirectRead()) {
    configInfoBase = configInfoBetaPersistService.findConfigInfo4Beta(dataId, group, tenant);
} else {
    content = ConfigDiskServiceFactory.getInstance().getBetaContent(dataId, group, tenant);
}

服务端修改配置

入库,然后修改配置还会触发ConfigDataChangeEvent,最终通过AsyncNotifyService执行AsyncRpcTask,还是会dump为本地文件。