nacos服务发现和订阅机制

fxz大约 27 分钟

nacos服务发现和订阅机制

总结

nacos2.0以后,通过grpc进行通信,服务发现通过服务端在服务注册时保存的映射以及索引进行查询。

服务订阅以后会在本地保存服务信息,服务端发生变更时,会主动推送到客户端,客户端更新缓存。

订阅的服务变化时,会回调客户端注册的监听器。

一堆废话

SC服务注册和发现open in new window可以知道,spring cloud alibaba中client服务发现最终调用的都是NamingService。

    @SneakyThrows
    public static void main(String[] args) {
       ConfigurableApplicationContext context = SpringApplication.run(PigAdminApplication.class, args);

       DiscoveryClient discoveryClient = context.getBean(DiscoveryClient.class);
       NamingService namingService = context.getBean(NacosServiceManager.class).getNamingService();

       // 获取所有服务
       List<String> services = discoveryClient.getServices();
       // 根据服务获取服务实例
       services.forEach(s -> discoveryClient.getInstances(s).forEach(System.out::println));
       // 订阅服务
       services.forEach(s -> {
          try {
             namingService.subscribe(s, e -> {
                System.out.println("服务变化:" + ((NamingEvent) e).getServiceName() + " -> " + ((NamingEvent) e).getInstances());
             });
          } catch (NacosException e) {
             throw new RuntimeException(e);
          }
       });
    }

NacosDiscoveryClient中逻辑很简单,在serviceDiscovery的基础上对服务端返回的信息进行了本地缓存。

public class NacosDiscoveryClient implements DiscoveryClient {
	
  	.........

  	// 获取所有服务实例
    @Override
    public List<ServiceInstance> getInstances(String serviceId) {
       try {
          return Optional.of(serviceDiscovery.getInstances(serviceId))
                .map(instances -> {
                   // 缓存服务实例
                   ServiceCache.setInstances(serviceId, instances);
                   return instances;
                }).get();
       }
       catch (Exception e) {
          if (failureToleranceEnabled) {
             return ServiceCache.getInstances(serviceId);
          }
          throw new RuntimeException(
                "Can not get hosts from nacos server. serviceId: " + serviceId, e);
       }
    }

  	// 获取所有服务id
    @Override
    public List<String> getServices() {
       try {
          return Optional.of(serviceDiscovery.getServices()).map(services -> {
             // 缓存获取到的服务id
             ServiceCache.setServiceIds(services);
             return services;
          }).get();
       }
       catch (Exception e) {
          log.error("get service name from nacos server failed.", e);
          return failureToleranceEnabled ? ServiceCache.getServiceIds()
                : Collections.emptyList();
       }
    }

}

NacosServiceDiscovery最终使用namingService进行调用。

public class NacosServiceDiscovery {
	
  	.........
   
    // 获取所有服务实例
    public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
       String group = discoveryProperties.getGroup();
       List<Instance> instances = namingService().selectInstances(serviceId, group,
             true);
       return hostToServiceInstanceList(instances, serviceId);
    }

  	// 获取所有服务id
    public List<String> getServices() throws NacosException {
       String group = discoveryProperties.getGroup();
       ListView<String> services = namingService().getServicesOfServer(1,
             Integer.MAX_VALUE, group);
       return services.getData();
    }
}
NacosNamingService:

// 获取符合条件的服务实例
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
            boolean subscribe) throws NacosException {
        
        ServiceInfo serviceInfo;
        String clusterString = StringUtils.join(clusters, ",");
        if (subscribe) {
          	// 如果订阅 则先从本地缓存中获取 
            serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
            if (null == serviceInfo) {
              	// 本地缓存中没有 请求服务端
                serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
            }
        } else {
          	// 非订阅则直接请求服务端
            serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
        }
  
  			// 对服务端返回的结果进行过滤
        return selectInstances(serviceInfo, healthy);
    }

// 获取服务id
public ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName, AbstractSelector selector)
        throws NacosException {
  	// 请求服务端 获取所有服务id
    return clientProxy.getServiceList(pageNo, pageSize, groupName, selector);
}

客户端获取服务列表

NacosNamingService调用NamingClientProxyDelegate,最终调用NamingGrpcClientProxy。

NamingClientProxyDelegate:

public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector)
        throws NacosException {
    return grpcClientProxy.getServiceList(pageNo, pageSize, groupName, selector);
}
NamingGrpcClientProxy:

public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector)
        throws NacosException {
    ServiceListRequest request = new ServiceListRequest(namespaceId, groupName, pageNo, pageSize);
    if (selector != null) {
        if (SelectorType.valueOf(selector.getType()) == SelectorType.label) {
            request.setSelector(JacksonUtils.toJson(selector));
        }
    }
  
  	// grpc请求服务端信息
    ServiceListResponse response = requestToServer(request, ServiceListResponse.class);
  
    ListView<String> result = new ListView<>();
    result.setCount(response.getCount());
    result.setData(response.getServiceNames());
  
    return result;
}

服务端返回服务列表

服务端处理请求的入口都在GrpcRequestAcceptor这个类,对于ServiceListRequest这个类型的请求,会使用ServiceListRequestHandler进行处理。ServiceListRequestHandler的逻辑很简单,返回服务注册时保存的映射即可。

@Component
public class ServiceListRequestHandler extends RequestHandler<ServiceListRequest, ServiceListResponse> {
    
    @Override
    @Secured(action = ActionTypes.READ)
    public ServiceListResponse handle(ServiceListRequest request, RequestMeta meta) throws NacosException {
      // 根据命名空间获取到服务信息  
      Collection<Service> serviceSet = ServiceManager.getInstance().getSingletons(request.getNamespace());
      // 创建相应对象  
   		ServiceListResponse result = ServiceListResponse.buildSuccessResponse(0, new LinkedList<>());
        if (!serviceSet.isEmpty()) {
         		// 根据请求的group筛选服务
            Collection<String> serviceNameSet = selectServiceWithGroupName(serviceSet, request.getGroupName());
            List<String> serviceNameList = ServiceUtil
                    .pageServiceName(request.getPageNo(), request.getPageSize(), serviceNameSet);
            result.setCount(serviceNameSet.size());
            result.setServiceNames(serviceNameList);
        }
        return result;
    }
    
  	// 根据分组筛选服务
    private Collection<String> selectServiceWithGroupName(Collection<Service> serviceSet, String groupName) {
        Collection<String> result = new HashSet<>(serviceSet.size());
        for (Service each : serviceSet) {
            if (Objects.equals(groupName, each.getGroup())) {
                result.add(each.getGroupedServiceName());
            }
        }
        return result;
    }
    
}

客户端服务订阅

定于的方法里,会首先查询缓存的服务信息,也就是serviceInfoHolder中的信息,如果没有则请求服务端进行订阅。

NamingClientProxyDelegate:

// 服务订阅
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
    String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
  
  	// 延时一秒请求服务端 更新本地订阅的服务信息
    serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
  
    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
  	// 订阅信息不存在 请求服务端
    if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
    }
  
  	// 更新本地缓存
    serviceInfoHolder.processServiceInfo(result);
  
    return result;
}
ServiceInfoUpdateService:

public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
    	..........
      // 延时一秒执行task
      ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
}

 private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
        return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
UpdateTaskpublic void run() {
    long delayTime = DEFAULT_DELAY;
    
    try {
        if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(
                serviceKey)) {
            NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);
            isCancel = true;
            return;
        }
        
        ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        if (serviceObj == null) {
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            serviceInfoHolder.processServiceInfo(serviceObj);
            lastRefTime = serviceObj.getLastRefTime();
            return;
        }
        
        if (serviceObj.getLastRefTime() <= lastRefTime) {
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            serviceInfoHolder.processServiceInfo(serviceObj);
        }
      
        lastRefTime = serviceObj.getLastRefTime();
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
            incFailCount();
            return;
        }
      
        // TODO multiple time can be configured.
        delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
        resetFailCount();
    } catch (NacosException e) {
        handleNacosException(e);
    } catch (Throwable e) {
        handleUnknownException(e);
    } finally {
        if (!isCancel) {
            executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),
                    TimeUnit.MILLISECONDS);
        }
    }
}

服务查询请求

NamingGrpcClientProxypublic ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
        boolean healthyOnly) throws NacosException {
    ServiceQueryRequest request = new ServiceQueryRequest(namespaceId, serviceName, groupName);
    request.setCluster(clusters);
    request.setHealthyOnly(healthyOnly);
    request.setUdpPort(udpPort);
    QueryServiceResponse response = requestToServer(request, QueryServiceResponse.class);
    return response.getServiceInfo();
}

服务端处理器为ServiceQueryRequestHandler:

@Component
public class ServiceQueryRequestHandler extends RequestHandler<ServiceQueryRequest, QueryServiceResponse> {
    
    ......
    
    @Override
    @Secured(action = ActionTypes.READ)
    public QueryServiceResponse handle(ServiceQueryRequest request, RequestMeta meta) throws NacosException {
        String namespaceId = request.getNamespace();
        String groupName = request.getGroupName();
        String serviceName = request.getServiceName();
        Service service = Service.newService(namespaceId, groupName, serviceName);
        String cluster = null == request.getCluster() ? "" : request.getCluster();
        boolean healthyOnly = request.isHealthyOnly();
      
      	// 获取服务信息
        ServiceInfo result = serviceStorage.getData(service);
        ServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);
        result = ServiceUtil.selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true,
                meta.getClientIp());
        return QueryServiceResponse.buildSuccessResponse(result);
    }
}

ServiceStorage中缓存了服务信息,缓存中没有时会先通过索引构建,然后重新写到缓存。

ServiceStorage:

private final ClientServiceIndexesManager serviceIndexesManager;
private final ConcurrentMap<Service, ServiceInfo> serviceDataIndexes;

// 获取服务相关信息
public ServiceInfo getData(Service service) {
  	// serviceDataIndexes中没有的话 则通过索引检索
    return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
}

public ServiceInfo getPushData(Service service) {
		// 创建一个空的服务信息
    ServiceInfo result = emptyServiceInfo(service);
  	// 服务没有注册过 则直接返回
    if (!ServiceManager.getInstance().containSingleton(service)) {
        return result;
    }
    Service singleton = ServiceManager.getInstance().getSingleton(service);
  
  	// 通过索引获取所有的实例信息
    result.setHosts(getAllInstancesFromIndex(singleton));
  	// 把它缓存起来 下次直接拿 不需要查索引了
    serviceDataIndexes.put(singleton, result);
  
    return result;
}

private List<Instance> getAllInstancesFromIndex(Service service) {
    Set<Instance> result = new HashSet<>();
    Set<String> clusters = new HashSet<>();
  	
  	// 通过服务 查出所有发布的客户端 遍历这些客户端
    for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
      	// 获取客户端注册的服务实例信息
        Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
        if (instancePublishInfo.isPresent()) {
            InstancePublishInfo publishInfo = instancePublishInfo.get();
            //If it is a BatchInstancePublishInfo type, it will be processed manually and added to the instance list
            if (publishInfo instanceof BatchInstancePublishInfo) {
                BatchInstancePublishInfo batchInstancePublishInfo = (BatchInstancePublishInfo) publishInfo;
                List<Instance> batchInstance = parseBatchInstance(service, batchInstancePublishInfo, clusters);
                result.addAll(batchInstance);
            } else {
                Instance instance = parseInstance(service, instancePublishInfo.get());
                result.add(instance);
                clusters.add(instance.getClusterName());
            }
        }
    }
  
    // cache clusters of this service
    serviceClusterIndex.put(service, clusters);
    return new LinkedList<>(result);
}


private ServiceInfo emptyServiceInfo(Service service) {
    ServiceInfo result = new ServiceInfo();
    result.setName(service.getName());
    result.setGroupName(service.getGroup());
    result.setLastRefTime(System.currentTimeMillis());
    result.setCacheMillis(switchDomain.getDefaultPushCacheMillis());
    return result;
}

处理本地缓存

客户端更新ServiceInfoHolder中的服务信息:

ServiceInfoHolder:
 
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
    String serviceKey = serviceInfo.getKey();
    if (serviceKey == null) {
        return null;
    }
    ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
    if (isEmptyOrErrorPush(serviceInfo)) {
        //empty or error push, just ignore
        return oldService;
    }
  
  	// 更新服务信息
    serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
  
  	// 服务信息是否变化
    boolean changed = isChangedServiceInfo(oldService, serviceInfo);
    if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
        serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
    }
    MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
    if (changed) {
        NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
                JacksonUtils.toJson(serviceInfo.getHosts()));
      	// 服务如果发生变化 则发布InstancesChangeEvent InstancesChangeEvent会触发客户端订阅监听者的逻辑,nacos中事件相关实现在后面文章分析。
        NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
                serviceInfo.getClusters(), serviceInfo.getHosts()));
        DiskCache.write(serviceInfo, cacheDir);
    }
    return serviceInfo;
}

本地缓存有两方面,第一方面是从注册中心获得实例信息缓存在内存当中,也就是通过Map的形式承载;第二方面是通过磁盘文件的形式定时缓存起来,以备不时之需。

故障转移也分两方面,第一方面是故障转移的开关是通过文件来标记的;第二方面是当开启故障转移之后,当发生故障时,可以从故障转移备份的文件中来获得服务实例信息。

在ServiceInfoHolder的构造方法中,还会初始化一个FailoverReactor类,同样是ServiceInfoHolder的成员变量。FailoverReactor的作用便是用来处理故障转移的。

image
image
public ServiceInfoHolder(String namespace, Properties properties) {
 ....
 // this为ServiceHolder当前对象,这里可以理解为两者相互持有对方的引用
 this.failoverReactor = new FailoverReactor(this, cacheDir);
 .....
}

FailoverReactor的构造方法:

  1. 持有ServiceInfoHolder的引用
  2. 拼接故障目录:${user.home}/nacos/naming/public/failover,其中public也有可能是其他的自定义命名空间
  3. 初始化executorService(执行者服务)
  4. init方法:通过executorService开启多个定时任务执行
public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) {
    // 持有ServiceInfoHolder的引用
    this.serviceInfoHolder = serviceInfoHolder;
    // 拼接故障目录:${user.home}/nacos/naming/public/failover
    this.failoverDir = cacheDir + FAILOVER_DIR;
    // 初始化executorService
    this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            // 守护线程模式运行
            thread.setDaemon(true);
            thread.setName("com.alibaba.nacos.naming.failover");
            return thread;
        }
    });
    // 其他初始化操作,通过executorService开启多个定时任务执行
    this.init();
}

init方法执行

在这个方法中开启了三个定时任务,这三个任务其实都是FailoverReactor的内部类:

  1. 初始化立即执行,执行间隔5秒,执行任务SwitchRefresher
  2. 初始化延迟30分钟执行,执行间隔24小时,执行任务DiskFileWriter
  3. 初始化立即执行,执行间隔10秒,执行核心操作为DiskFileWriter
public void init() {
	// 初始化立即执行,执行间隔5秒,执行任务SwitchRefresher
    executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);
	// 初始化延迟30分钟执行,执行间隔24小时,执行任务DiskFileWriter
    executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);

    // backup file on startup if failover directory is empty.
    // 如果故障目录为空,启动时立即执行,立即备份文件
    // 初始化立即执行,执行间隔10秒,执行核心操作为DiskFileWriter
    executorService.schedule(new Runnable() {
        @Override
        public void run() {
            try {
                File cacheDir = new File(failoverDir);

                if (!cacheDir.exists() && !cacheDir.mkdirs()) {
                    throw new IllegalStateException("failed to create cache dir: " + failoverDir);
                }

                File[] files = cacheDir.listFiles();
                if (files == null || files.length <= 0) {
                    new DiskFileWriter().run();
                }
            } catch (Throwable e) {
                NAMING_LOGGER.error("[NA] failed to backup file on startup.", e);
            }

        }
    }, 10000L, TimeUnit.MILLISECONDS);
}

这里我们先看DiskFileWriter,这里的逻辑就是获取ServiceInfo中缓存的ServiceInfo,判断是否满足写入磁盘,如果条件满足,就将其写入拼接的故障目录,因为后两个定时任务执行的都是DiskFileWriter,但是第三个定时任务是有前置判断的,只要文件不存在就会立即执行把文件写入到本地磁盘中。

class DiskFileWriter extends TimerTask {
        
        @Override
        public void run() {
            Map<String, ServiceInfo> map = serviceInfoHolder.getServiceInfoMap();
            for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) {
                ServiceInfo serviceInfo = entry.getValue();
                if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils
                        .equals(serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY) || StringUtils
                        .equals(serviceInfo.getName(), UtilAndComs.ENV_CONFIGS) || StringUtils
                        .equals(serviceInfo.getName(), UtilAndComs.VIP_CLIENT_FILE) || StringUtils
                        .equals(serviceInfo.getName(), UtilAndComs.ALL_HOSTS)) {
                    continue;
                }
                
                DiskCache.write(serviceInfo, failoverDir);
            }
        }
    }
    

定时任务SwitchRefresher的核心实现,具体逻辑如下:

  1. 如果故障转移文件不存在,则直接返回(文件开关)
  2. 比较文件修改时间,如果已经修改,则获取故障转移文件中的内容。
  3. 故障转移文件中存储了0和1标识。0表示关闭,1表示开启。
  4. 当为开启状态时,执行线程FailoverFileReader。
class SwitchRefresher implements Runnable {
		long lastModifiedMillis = 0L;

		@Override
		public void run() {
			try {
				File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH);            
        // 文件不存在则退出            
				if (!switchFile.exists()) {
					switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
					NAMING_LOGGER.debug("failover switch is not found, {}", switchFile.getName());
					return;
				}
				long modified = switchFile.lastModified();
				if (lastModifiedMillis < modified) {
					lastModifiedMillis = modified;
					// 获取故障转移文件内容     
					String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH, Charset.defaultCharset().toString());
					if (!StringUtils.isEmpty(failover)) {
						String[] lines = failover.split(DiskCache.getLineSeparator());
						for (String line : lines) {
							String line1 = line.trim();
							// 1 表示开启故障转移模式                        
							if (IS_FAILOVER_MODE.equals(line1)) {
								switchParams.put(FAILOVER_MODE_PARAM, Boolean.TRUE.toString());
								NAMING_LOGGER.info("failover-mode is on");
								new FailoverFileReader().run();                       
                // 0 表示关闭故障转移模式                       
							} else if (NO_FAILOVER_MODE.equals(line1)) {
								switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
								NAMING_LOGGER.info("failover-mode is off");
							}
						}
					} else {
						switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
					}
				}
			} catch (Throwable e) {
				NAMING_LOGGER.error("[NA] failed to read failover switch.", e);
			}
		}
	}

FailoverFileReader

顾名思义,故障转移文件读取,基本操作就是读取failover目录存储的备份服务信息文件内容,然后转换成ServiceInfo,并且将所有的ServiceInfo储存在FailoverReactor的ServiceMap属性中。

流程如下:

  1. 读取failover目录下的所有文件,进行遍历处理
  2. 如果文件不存在跳过
  3. 如果文件是故障转移开关标志文件跳过
  4. 读取文件中的备份内容,转换为ServiceInfo对象
  5. 将ServiceInfo对象放入到domMap中
  6. 最后判断domMap不为空,赋值给serviceMap
	class FailoverFileReader implements Runnable {
		@Override
		public void run() {
			Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16);
			BufferedReader reader = null;
			try {
				File cacheDir = new File(failoverDir);
				if (!cacheDir.exists() && !cacheDir.mkdirs()) {
					throw new IllegalStateException("failed to create cache dir: " + failoverDir);
				}
				File[] files = cacheDir.listFiles();
				if (files == null) {
					return;
				}
				for (File file : files) {
					if (!file.isFile()) {
						continue;
					}
					// 如果是故障转移标志文件,则跳过            
					if (file.getName().equals(UtilAndComs.FAILOVER_SWITCH)) {
						continue;
					}
					ServiceInfo dom = new ServiceInfo(file.getName());
					try {
						String dataString = ConcurrentDiskUtil.getFileContent(file, Charset.defaultCharset().toString());
						reader = new BufferedReader(new StringReader(dataString));
						String json;
						if ((json = reader.readLine()) != null) {
							try {
								dom = JacksonUtils.toObj(json, ServiceInfo.class);
							} catch (Exception e) {
								NAMING_LOGGER.error("[NA] error while parsing cached dom : {}", json, e);
							}
						}
					} catch (Exception e) {
						NAMING_LOGGER.error("[NA] failed to read cache for dom: {}", file.getName(), e);
					} finally {
						try {
							if (reader != null) {
								reader.close();
							}
						} catch (Exception e) {
							//ignore
						}
					}
					if (!CollectionUtils.isEmpty(dom.getHosts())) {
						domMap.put(dom.getKey(), dom);
					}
				}
			} catch (Exception e) {
				NAMING_LOGGER.error("[NA] failed to read cache file", e);
			}        // 读入缓存      
			if (domMap.size() > 0) {
				serviceMap = domMap;
			}
		}
	}

这里有一个疑问就是serviceMap是哪里用到的,这个其实是我们之前读取实例时候用到的getServiceInfo方法。其实这里就是一旦开启故障转移就会先调用failoverReactor.getService方法,此方法便是从serviceMap中获取ServiceInfo

public ServiceInfo getService(String key) {
		ServiceInfo serviceInfo = serviceMap.get(key);
		if (serviceInfo == null) {
			serviceInfo = new ServiceInfo();
			serviceInfo.setName(key);
		}
		return serviceInfo;
	}

调用serviceMap方法getServiceInfo方法就在ServiceInfoHolder中

	//ServiceInfoHolder
	public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
		NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
		String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
		String key = ServiceInfo.getKey(groupedServiceName, clusters);
		if (failoverReactor.isFailoverSwitch()) {
			return failoverReactor.getService(key);
		}
		return serviceInfoMap.get(key);
	}

订阅请求

服务端对于订阅的处理主要逻辑分两步,第一步查询服务信息,第二步保存订阅者的索引信息。

@Component
public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServiceRequest, SubscribeServiceResponse> {
  	..........
      
    @Override
    @Secured(action = ActionTypes.READ)
    public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
        String namespaceId = request.getNamespace();
        String serviceName = request.getServiceName();
        String groupName = request.getGroupName();
        String app = request.getHeader("app", "unknown");
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
      
        Service service = Service.newService(namespaceId, groupName, serviceName, true);
      
        Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),
                namespaceId, groupedServiceName, 0, request.getClusters());
      
      	// 在serviceStorage中查询服务信息
        ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
                metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,
                true, subscriber.getIp());
      
        if (request.isSubscribe()) {
          	// 订阅
            clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
            NotifyCenter.publishEvent(new SubscribeServiceTraceEvent(System.currentTimeMillis(),
                    meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
        } else {
          	// 取消订阅
            clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
            NotifyCenter.publishEvent(new UnsubscribeServiceTraceEvent(System.currentTimeMillis(),
                    meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
        }
        return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
    }
}

将订阅者信息保存到客户端索引中。

EphemeralClientOperationServiceImpl:

public void subscribeService(Service service, Subscriber subscriber, String clientId) {
    Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
    Client client = clientManager.getClient(clientId);
    if (!clientIsLegal(client, clientId)) {
        return;
    }
    client.addServiceSubscriber(singleton, subscriber);
    client.setLastUpdatedTime();
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}
AbstractClient:

protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);

public boolean addServiceSubscriber(Service service, Subscriber subscriber) {
    if (null == subscribers.put(service, subscriber)) {
        MetricsMonitor.incrementSubscribeCount();
    }
    return true;
}

服务端推送订阅信息

服务注册中说过,服务端发生服务信息发生变化后,会发布ServiceEvent.ServiceChangedEvent从而触发服务端主动推送订阅信息。

NamingSubscriberServiceV2Impl:

public void onEvent(Event event) {
    if (event instanceof ServiceEvent.ServiceChangedEvent) {
        ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
        Service service = serviceChangedEvent.getService();
      	// 创建延时任务
        delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
        MetricsMonitor.incrementServiceChangeCount(service.getNamespace(), service.getGroup(), service.getName());
    } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
        ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
        Service service = subscribedEvent.getService();
        delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
                subscribedEvent.getClientId()));
    }
}

弯弯绕绕最终执行的是PushExecuteTaskd的逻辑:

public void run() {
    try {
      	// 封装服务信息
        PushDataWrapper wrapper = generatePushData();
        ClientManager clientManager = delayTaskEngine.getClientManager();
      	// 遍历要推送的目标客户端id
        for (String each : getTargetClientIds()) {
            Client client = clientManager.getClient(each);
            if (null == client) {
                // means this client has disconnect
                continue;
            }
            Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);
          	// 执行推送
            delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
                    new ServicePushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
        }
    } catch (Exception e) {
        Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
        delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
    }
}
PushExecutorRpcImpl:
public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data,
        NamingPushCallback callBack) {
    ServiceInfo actualServiceInfo = getServiceInfo(data, subscriber);
    callBack.setActualServiceInfo(actualServiceInfo);
    pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildNotifySubscriberRequest(actualServiceInfo),
            callBack, GlobalExecutor.getCallbackExecutor());
}
RpcPushService:

public void pushWithCallback(String connectionId, ServerRequest request, PushCallBack requestCallBack,
        Executor executor) {
    Connection connection = connectionManager.getConnection(connectionId);
    if (connection != null) {
        try {
            connection.asyncRequest(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) {
                
                @Override
                public Executor getExecutor() {
                    return executor;
                }
                
                @Override
                public void onResponse(Response response) {
                    if (response.isSuccess()) {
                        requestCallBack.onSuccess();
                    } else {
                        requestCallBack.onFail(new NacosException(response.getErrorCode(), response.getMessage()));
                    }
                }
                
                @Override
                public void onException(Throwable e) {
                    requestCallBack.onFail(e);
                }
            });
        } catch (ConnectionAlreadyClosedException e) {
            connectionManager.unregister(connectionId);
            requestCallBack.onSuccess();
        } catch (Exception e) {
            Loggers.REMOTE_DIGEST
                    .error("error to send push response to connectionId ={},push response={}", connectionId,
                            request, e);
            requestCallBack.onFail(e);
        }
    } else {
        requestCallBack.onSuccess();
    }
}

客户端接收订阅推送

public class NamingPushRequestHandler implements ServerRequestHandler {
    
    private final ServiceInfoHolder serviceInfoHolder;
    
    public NamingPushRequestHandler(ServiceInfoHolder serviceInfoHolder) {
        this.serviceInfoHolder = serviceInfoHolder;
    }
    
    @Override
    public Response requestReply(Request request) {
        if (request instanceof NotifySubscriberRequest) {
            NotifySubscriberRequest notifyRequest = (NotifySubscriberRequest) request;
          	// 更新本地缓存
            serviceInfoHolder.processServiceInfo(notifyRequest.getServiceInfo());
            return new NotifySubscriberResponse();
        }
        return null;
    }
}

客户端订阅监听器回调

上面ServiceInfoHolder中就会发步InstancesChangeEvent从而触发回调监听的逻辑。nacos事件实现在后面的文章再分析。

InstancesChangeNotifierpublic void onEvent(InstancesChangeEvent event) {
    String key = ServiceInfo
            .getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters());
    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
    if (CollectionUtils.isEmpty(eventListeners)) {
        return;
    }
    for (final EventListener listener : eventListeners) {
        final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event);
        if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {
            ((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent));
        } else {
            listener.onEvent(namingEvent);
        }
    }
}