nacos服务发现和订阅机制
nacos服务发现和订阅机制
总结
nacos2.0以后,通过grpc进行通信,服务发现通过服务端在服务注册时保存的映射以及索引进行查询。
服务订阅以后会在本地保存服务信息,服务端发生变更时,会主动推送到客户端,客户端更新缓存。
订阅的服务变化时,会回调客户端注册的监听器。
一堆废话
由SC服务注册和发现可以知道,spring cloud alibaba中client服务发现最终调用的都是NamingService。
@SneakyThrows
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(PigAdminApplication.class, args);
DiscoveryClient discoveryClient = context.getBean(DiscoveryClient.class);
NamingService namingService = context.getBean(NacosServiceManager.class).getNamingService();
// 获取所有服务
List<String> services = discoveryClient.getServices();
// 根据服务获取服务实例
services.forEach(s -> discoveryClient.getInstances(s).forEach(System.out::println));
// 订阅服务
services.forEach(s -> {
try {
namingService.subscribe(s, e -> {
System.out.println("服务变化:" + ((NamingEvent) e).getServiceName() + " -> " + ((NamingEvent) e).getInstances());
});
} catch (NacosException e) {
throw new RuntimeException(e);
}
});
}
NacosDiscoveryClient中逻辑很简单,在serviceDiscovery的基础上对服务端返回的信息进行了本地缓存。
public class NacosDiscoveryClient implements DiscoveryClient {
.........
// 获取所有服务实例
@Override
public List<ServiceInstance> getInstances(String serviceId) {
try {
return Optional.of(serviceDiscovery.getInstances(serviceId))
.map(instances -> {
// 缓存服务实例
ServiceCache.setInstances(serviceId, instances);
return instances;
}).get();
}
catch (Exception e) {
if (failureToleranceEnabled) {
return ServiceCache.getInstances(serviceId);
}
throw new RuntimeException(
"Can not get hosts from nacos server. serviceId: " + serviceId, e);
}
}
// 获取所有服务id
@Override
public List<String> getServices() {
try {
return Optional.of(serviceDiscovery.getServices()).map(services -> {
// 缓存获取到的服务id
ServiceCache.setServiceIds(services);
return services;
}).get();
}
catch (Exception e) {
log.error("get service name from nacos server failed.", e);
return failureToleranceEnabled ? ServiceCache.getServiceIds()
: Collections.emptyList();
}
}
}
NacosServiceDiscovery最终使用namingService进行调用。
public class NacosServiceDiscovery {
.........
// 获取所有服务实例
public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
String group = discoveryProperties.getGroup();
List<Instance> instances = namingService().selectInstances(serviceId, group,
true);
return hostToServiceInstanceList(instances, serviceId);
}
// 获取所有服务id
public List<String> getServices() throws NacosException {
String group = discoveryProperties.getGroup();
ListView<String> services = namingService().getServicesOfServer(1,
Integer.MAX_VALUE, group);
return services.getData();
}
}
NacosNamingService:
// 获取符合条件的服务实例
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
String clusterString = StringUtils.join(clusters, ",");
if (subscribe) {
// 如果订阅 则先从本地缓存中获取
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo) {
// 本地缓存中没有 请求服务端
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
// 非订阅则直接请求服务端
serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
}
// 对服务端返回的结果进行过滤
return selectInstances(serviceInfo, healthy);
}
// 获取服务id
public ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName, AbstractSelector selector)
throws NacosException {
// 请求服务端 获取所有服务id
return clientProxy.getServiceList(pageNo, pageSize, groupName, selector);
}
客户端获取服务列表
NacosNamingService调用NamingClientProxyDelegate,最终调用NamingGrpcClientProxy。
NamingClientProxyDelegate:
public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector)
throws NacosException {
return grpcClientProxy.getServiceList(pageNo, pageSize, groupName, selector);
}
NamingGrpcClientProxy:
public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector)
throws NacosException {
ServiceListRequest request = new ServiceListRequest(namespaceId, groupName, pageNo, pageSize);
if (selector != null) {
if (SelectorType.valueOf(selector.getType()) == SelectorType.label) {
request.setSelector(JacksonUtils.toJson(selector));
}
}
// grpc请求服务端信息
ServiceListResponse response = requestToServer(request, ServiceListResponse.class);
ListView<String> result = new ListView<>();
result.setCount(response.getCount());
result.setData(response.getServiceNames());
return result;
}
服务端返回服务列表
服务端处理请求的入口都在GrpcRequestAcceptor这个类,对于ServiceListRequest这个类型的请求,会使用ServiceListRequestHandler进行处理。ServiceListRequestHandler的逻辑很简单,返回服务注册时保存的映射即可。
@Component
public class ServiceListRequestHandler extends RequestHandler<ServiceListRequest, ServiceListResponse> {
@Override
@Secured(action = ActionTypes.READ)
public ServiceListResponse handle(ServiceListRequest request, RequestMeta meta) throws NacosException {
// 根据命名空间获取到服务信息
Collection<Service> serviceSet = ServiceManager.getInstance().getSingletons(request.getNamespace());
// 创建相应对象
ServiceListResponse result = ServiceListResponse.buildSuccessResponse(0, new LinkedList<>());
if (!serviceSet.isEmpty()) {
// 根据请求的group筛选服务
Collection<String> serviceNameSet = selectServiceWithGroupName(serviceSet, request.getGroupName());
List<String> serviceNameList = ServiceUtil
.pageServiceName(request.getPageNo(), request.getPageSize(), serviceNameSet);
result.setCount(serviceNameSet.size());
result.setServiceNames(serviceNameList);
}
return result;
}
// 根据分组筛选服务
private Collection<String> selectServiceWithGroupName(Collection<Service> serviceSet, String groupName) {
Collection<String> result = new HashSet<>(serviceSet.size());
for (Service each : serviceSet) {
if (Objects.equals(groupName, each.getGroup())) {
result.add(each.getGroupedServiceName());
}
}
return result;
}
}
客户端服务订阅
定于的方法里,会首先查询缓存的服务信息,也就是serviceInfoHolder中的信息,如果没有则请求服务端进行订阅。
NamingClientProxyDelegate:
// 服务订阅
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
// 延时一秒请求服务端 更新本地订阅的服务信息
serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
// 订阅信息不存在 请求服务端
if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
}
// 更新本地缓存
serviceInfoHolder.processServiceInfo(result);
return result;
}
ServiceInfoUpdateService:
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
..........
// 延时一秒执行task
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
}
private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
UpdateTask:
public void run() {
long delayTime = DEFAULT_DELAY;
try {
if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(
serviceKey)) {
NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);
isCancel = true;
return;
}
ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
if (serviceObj == null) {
serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
serviceInfoHolder.processServiceInfo(serviceObj);
lastRefTime = serviceObj.getLastRefTime();
return;
}
if (serviceObj.getLastRefTime() <= lastRefTime) {
serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
serviceInfoHolder.processServiceInfo(serviceObj);
}
lastRefTime = serviceObj.getLastRefTime();
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
incFailCount();
return;
}
// TODO multiple time can be configured.
delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
resetFailCount();
} catch (NacosException e) {
handleNacosException(e);
} catch (Throwable e) {
handleUnknownException(e);
} finally {
if (!isCancel) {
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),
TimeUnit.MILLISECONDS);
}
}
}
服务查询请求
NamingGrpcClientProxy:
public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
boolean healthyOnly) throws NacosException {
ServiceQueryRequest request = new ServiceQueryRequest(namespaceId, serviceName, groupName);
request.setCluster(clusters);
request.setHealthyOnly(healthyOnly);
request.setUdpPort(udpPort);
QueryServiceResponse response = requestToServer(request, QueryServiceResponse.class);
return response.getServiceInfo();
}
服务端处理器为ServiceQueryRequestHandler:
@Component
public class ServiceQueryRequestHandler extends RequestHandler<ServiceQueryRequest, QueryServiceResponse> {
......
@Override
@Secured(action = ActionTypes.READ)
public QueryServiceResponse handle(ServiceQueryRequest request, RequestMeta meta) throws NacosException {
String namespaceId = request.getNamespace();
String groupName = request.getGroupName();
String serviceName = request.getServiceName();
Service service = Service.newService(namespaceId, groupName, serviceName);
String cluster = null == request.getCluster() ? "" : request.getCluster();
boolean healthyOnly = request.isHealthyOnly();
// 获取服务信息
ServiceInfo result = serviceStorage.getData(service);
ServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);
result = ServiceUtil.selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true,
meta.getClientIp());
return QueryServiceResponse.buildSuccessResponse(result);
}
}
ServiceStorage中缓存了服务信息,缓存中没有时会先通过索引构建,然后重新写到缓存。
ServiceStorage:
private final ClientServiceIndexesManager serviceIndexesManager;
private final ConcurrentMap<Service, ServiceInfo> serviceDataIndexes;
// 获取服务相关信息
public ServiceInfo getData(Service service) {
// serviceDataIndexes中没有的话 则通过索引检索
return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
}
public ServiceInfo getPushData(Service service) {
// 创建一个空的服务信息
ServiceInfo result = emptyServiceInfo(service);
// 服务没有注册过 则直接返回
if (!ServiceManager.getInstance().containSingleton(service)) {
return result;
}
Service singleton = ServiceManager.getInstance().getSingleton(service);
// 通过索引获取所有的实例信息
result.setHosts(getAllInstancesFromIndex(singleton));
// 把它缓存起来 下次直接拿 不需要查索引了
serviceDataIndexes.put(singleton, result);
return result;
}
private List<Instance> getAllInstancesFromIndex(Service service) {
Set<Instance> result = new HashSet<>();
Set<String> clusters = new HashSet<>();
// 通过服务 查出所有发布的客户端 遍历这些客户端
for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
// 获取客户端注册的服务实例信息
Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
if (instancePublishInfo.isPresent()) {
InstancePublishInfo publishInfo = instancePublishInfo.get();
//If it is a BatchInstancePublishInfo type, it will be processed manually and added to the instance list
if (publishInfo instanceof BatchInstancePublishInfo) {
BatchInstancePublishInfo batchInstancePublishInfo = (BatchInstancePublishInfo) publishInfo;
List<Instance> batchInstance = parseBatchInstance(service, batchInstancePublishInfo, clusters);
result.addAll(batchInstance);
} else {
Instance instance = parseInstance(service, instancePublishInfo.get());
result.add(instance);
clusters.add(instance.getClusterName());
}
}
}
// cache clusters of this service
serviceClusterIndex.put(service, clusters);
return new LinkedList<>(result);
}
private ServiceInfo emptyServiceInfo(Service service) {
ServiceInfo result = new ServiceInfo();
result.setName(service.getName());
result.setGroupName(service.getGroup());
result.setLastRefTime(System.currentTimeMillis());
result.setCacheMillis(switchDomain.getDefaultPushCacheMillis());
return result;
}
处理本地缓存
客户端更新ServiceInfoHolder中的服务信息:
ServiceInfoHolder:
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
String serviceKey = serviceInfo.getKey();
if (serviceKey == null) {
return null;
}
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (isEmptyOrErrorPush(serviceInfo)) {
//empty or error push, just ignore
return oldService;
}
// 更新服务信息
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
// 服务信息是否变化
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {
NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
JacksonUtils.toJson(serviceInfo.getHosts()));
// 服务如果发生变化 则发布InstancesChangeEvent InstancesChangeEvent会触发客户端订阅监听者的逻辑,nacos中事件相关实现在后面文章分析。
NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
本地缓存有两方面,第一方面是从注册中心获得实例信息缓存在内存当中,也就是通过Map的形式承载;第二方面是通过磁盘文件的形式定时缓存起来,以备不时之需。
故障转移也分两方面,第一方面是故障转移的开关是通过文件来标记的;第二方面是当开启故障转移之后,当发生故障时,可以从故障转移备份的文件中来获得服务实例信息。
在ServiceInfoHolder的构造方法中,还会初始化一个FailoverReactor类,同样是ServiceInfoHolder的成员变量。FailoverReactor的作用便是用来处理故障转移的。
public ServiceInfoHolder(String namespace, Properties properties) { .... // this为ServiceHolder当前对象,这里可以理解为两者相互持有对方的引用 this.failoverReactor = new FailoverReactor(this, cacheDir); ..... }
FailoverReactor的构造方法:
- 持有ServiceInfoHolder的引用
- 拼接故障目录:${user.home}/nacos/naming/public/failover,其中public也有可能是其他的自定义命名空间
- 初始化executorService(执行者服务)
- init方法:通过executorService开启多个定时任务执行
public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) { // 持有ServiceInfoHolder的引用 this.serviceInfoHolder = serviceInfoHolder; // 拼接故障目录:${user.home}/nacos/naming/public/failover this.failoverDir = cacheDir + FAILOVER_DIR; // 初始化executorService this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); // 守护线程模式运行 thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.failover"); return thread; } }); // 其他初始化操作,通过executorService开启多个定时任务执行 this.init(); }
init方法执行
在这个方法中开启了三个定时任务,这三个任务其实都是FailoverReactor的内部类:
- 初始化立即执行,执行间隔5秒,执行任务SwitchRefresher
- 初始化延迟30分钟执行,执行间隔24小时,执行任务DiskFileWriter
- 初始化立即执行,执行间隔10秒,执行核心操作为DiskFileWriter
public void init() { // 初始化立即执行,执行间隔5秒,执行任务SwitchRefresher executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS); // 初始化延迟30分钟执行,执行间隔24小时,执行任务DiskFileWriter executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES); // backup file on startup if failover directory is empty. // 如果故障目录为空,启动时立即执行,立即备份文件 // 初始化立即执行,执行间隔10秒,执行核心操作为DiskFileWriter executorService.schedule(new Runnable() { @Override public void run() { try { File cacheDir = new File(failoverDir); if (!cacheDir.exists() && !cacheDir.mkdirs()) { throw new IllegalStateException("failed to create cache dir: " + failoverDir); } File[] files = cacheDir.listFiles(); if (files == null || files.length <= 0) { new DiskFileWriter().run(); } } catch (Throwable e) { NAMING_LOGGER.error("[NA] failed to backup file on startup.", e); } } }, 10000L, TimeUnit.MILLISECONDS); }
这里我们先看DiskFileWriter,这里的逻辑就是获取ServiceInfo中缓存的ServiceInfo,判断是否满足写入磁盘,如果条件满足,就将其写入拼接的故障目录,因为后两个定时任务执行的都是DiskFileWriter,但是第三个定时任务是有前置判断的,只要文件不存在就会立即执行把文件写入到本地磁盘中。
class DiskFileWriter extends TimerTask { @Override public void run() { Map<String, ServiceInfo> map = serviceInfoHolder.getServiceInfoMap(); for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) { ServiceInfo serviceInfo = entry.getValue(); if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils .equals(serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY) || StringUtils .equals(serviceInfo.getName(), UtilAndComs.ENV_CONFIGS) || StringUtils .equals(serviceInfo.getName(), UtilAndComs.VIP_CLIENT_FILE) || StringUtils .equals(serviceInfo.getName(), UtilAndComs.ALL_HOSTS)) { continue; } DiskCache.write(serviceInfo, failoverDir); } } }
定时任务SwitchRefresher的核心实现,具体逻辑如下:
- 如果故障转移文件不存在,则直接返回(文件开关)
- 比较文件修改时间,如果已经修改,则获取故障转移文件中的内容。
- 故障转移文件中存储了0和1标识。0表示关闭,1表示开启。
- 当为开启状态时,执行线程FailoverFileReader。
class SwitchRefresher implements Runnable { long lastModifiedMillis = 0L; @Override public void run() { try { File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH); // 文件不存在则退出 if (!switchFile.exists()) { switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString()); NAMING_LOGGER.debug("failover switch is not found, {}", switchFile.getName()); return; } long modified = switchFile.lastModified(); if (lastModifiedMillis < modified) { lastModifiedMillis = modified; // 获取故障转移文件内容 String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH, Charset.defaultCharset().toString()); if (!StringUtils.isEmpty(failover)) { String[] lines = failover.split(DiskCache.getLineSeparator()); for (String line : lines) { String line1 = line.trim(); // 1 表示开启故障转移模式 if (IS_FAILOVER_MODE.equals(line1)) { switchParams.put(FAILOVER_MODE_PARAM, Boolean.TRUE.toString()); NAMING_LOGGER.info("failover-mode is on"); new FailoverFileReader().run(); // 0 表示关闭故障转移模式 } else if (NO_FAILOVER_MODE.equals(line1)) { switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString()); NAMING_LOGGER.info("failover-mode is off"); } } } else { switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString()); } } } catch (Throwable e) { NAMING_LOGGER.error("[NA] failed to read failover switch.", e); } } }
FailoverFileReader
顾名思义,故障转移文件读取,基本操作就是读取failover目录存储的备份服务信息文件内容,然后转换成ServiceInfo,并且将所有的ServiceInfo储存在FailoverReactor的ServiceMap属性中。
流程如下:
- 读取failover目录下的所有文件,进行遍历处理
- 如果文件不存在跳过
- 如果文件是故障转移开关标志文件跳过
- 读取文件中的备份内容,转换为ServiceInfo对象
- 将ServiceInfo对象放入到domMap中
- 最后判断domMap不为空,赋值给serviceMap
class FailoverFileReader implements Runnable { @Override public void run() { Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16); BufferedReader reader = null; try { File cacheDir = new File(failoverDir); if (!cacheDir.exists() && !cacheDir.mkdirs()) { throw new IllegalStateException("failed to create cache dir: " + failoverDir); } File[] files = cacheDir.listFiles(); if (files == null) { return; } for (File file : files) { if (!file.isFile()) { continue; } // 如果是故障转移标志文件,则跳过 if (file.getName().equals(UtilAndComs.FAILOVER_SWITCH)) { continue; } ServiceInfo dom = new ServiceInfo(file.getName()); try { String dataString = ConcurrentDiskUtil.getFileContent(file, Charset.defaultCharset().toString()); reader = new BufferedReader(new StringReader(dataString)); String json; if ((json = reader.readLine()) != null) { try { dom = JacksonUtils.toObj(json, ServiceInfo.class); } catch (Exception e) { NAMING_LOGGER.error("[NA] error while parsing cached dom : {}", json, e); } } } catch (Exception e) { NAMING_LOGGER.error("[NA] failed to read cache for dom: {}", file.getName(), e); } finally { try { if (reader != null) { reader.close(); } } catch (Exception e) { //ignore } } if (!CollectionUtils.isEmpty(dom.getHosts())) { domMap.put(dom.getKey(), dom); } } } catch (Exception e) { NAMING_LOGGER.error("[NA] failed to read cache file", e); } // 读入缓存 if (domMap.size() > 0) { serviceMap = domMap; } } }
这里有一个疑问就是serviceMap是哪里用到的,这个其实是我们之前读取实例时候用到的getServiceInfo方法。其实这里就是一旦开启故障转移就会先调用failoverReactor.getService方法,此方法便是从serviceMap中获取ServiceInfo
public ServiceInfo getService(String key) { ServiceInfo serviceInfo = serviceMap.get(key); if (serviceInfo == null) { serviceInfo = new ServiceInfo(); serviceInfo.setName(key); } return serviceInfo; }
调用serviceMap方法getServiceInfo方法就在ServiceInfoHolder中
//ServiceInfoHolder public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) { NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch()); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); String key = ServiceInfo.getKey(groupedServiceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } return serviceInfoMap.get(key); }
订阅请求
服务端对于订阅的处理主要逻辑分两步,第一步查询服务信息,第二步保存订阅者的索引信息。
@Component
public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServiceRequest, SubscribeServiceResponse> {
..........
@Override
@Secured(action = ActionTypes.READ)
public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
String namespaceId = request.getNamespace();
String serviceName = request.getServiceName();
String groupName = request.getGroupName();
String app = request.getHeader("app", "unknown");
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
Service service = Service.newService(namespaceId, groupName, serviceName, true);
Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),
namespaceId, groupedServiceName, 0, request.getClusters());
// 在serviceStorage中查询服务信息
ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,
true, subscriber.getIp());
if (request.isSubscribe()) {
// 订阅
clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
NotifyCenter.publishEvent(new SubscribeServiceTraceEvent(System.currentTimeMillis(),
meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
} else {
// 取消订阅
clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
NotifyCenter.publishEvent(new UnsubscribeServiceTraceEvent(System.currentTimeMillis(),
meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
}
return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}
}
将订阅者信息保存到客户端索引中。
EphemeralClientOperationServiceImpl:
public void subscribeService(Service service, Subscriber subscriber, String clientId) {
Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
Client client = clientManager.getClient(clientId);
if (!clientIsLegal(client, clientId)) {
return;
}
client.addServiceSubscriber(singleton, subscriber);
client.setLastUpdatedTime();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}
AbstractClient:
protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);
public boolean addServiceSubscriber(Service service, Subscriber subscriber) {
if (null == subscribers.put(service, subscriber)) {
MetricsMonitor.incrementSubscribeCount();
}
return true;
}
服务端推送订阅信息
服务注册中说过,服务端发生服务信息发生变化后,会发布ServiceEvent.ServiceChangedEvent从而触发服务端主动推送订阅信息。
NamingSubscriberServiceV2Impl:
public void onEvent(Event event) {
if (event instanceof ServiceEvent.ServiceChangedEvent) {
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
// 创建延时任务
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
MetricsMonitor.incrementServiceChangeCount(service.getNamespace(), service.getGroup(), service.getName());
} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
Service service = subscribedEvent.getService();
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
subscribedEvent.getClientId()));
}
}
弯弯绕绕最终执行的是PushExecuteTaskd的逻辑:
public void run() {
try {
// 封装服务信息
PushDataWrapper wrapper = generatePushData();
ClientManager clientManager = delayTaskEngine.getClientManager();
// 遍历要推送的目标客户端id
for (String each : getTargetClientIds()) {
Client client = clientManager.getClient(each);
if (null == client) {
// means this client has disconnect
continue;
}
Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);
// 执行推送
delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
new ServicePushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
}
} catch (Exception e) {
Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
}
}
PushExecutorRpcImpl:
public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data,
NamingPushCallback callBack) {
ServiceInfo actualServiceInfo = getServiceInfo(data, subscriber);
callBack.setActualServiceInfo(actualServiceInfo);
pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildNotifySubscriberRequest(actualServiceInfo),
callBack, GlobalExecutor.getCallbackExecutor());
}
RpcPushService:
public void pushWithCallback(String connectionId, ServerRequest request, PushCallBack requestCallBack,
Executor executor) {
Connection connection = connectionManager.getConnection(connectionId);
if (connection != null) {
try {
connection.asyncRequest(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) {
@Override
public Executor getExecutor() {
return executor;
}
@Override
public void onResponse(Response response) {
if (response.isSuccess()) {
requestCallBack.onSuccess();
} else {
requestCallBack.onFail(new NacosException(response.getErrorCode(), response.getMessage()));
}
}
@Override
public void onException(Throwable e) {
requestCallBack.onFail(e);
}
});
} catch (ConnectionAlreadyClosedException e) {
connectionManager.unregister(connectionId);
requestCallBack.onSuccess();
} catch (Exception e) {
Loggers.REMOTE_DIGEST
.error("error to send push response to connectionId ={},push response={}", connectionId,
request, e);
requestCallBack.onFail(e);
}
} else {
requestCallBack.onSuccess();
}
}
客户端接收订阅推送
public class NamingPushRequestHandler implements ServerRequestHandler {
private final ServiceInfoHolder serviceInfoHolder;
public NamingPushRequestHandler(ServiceInfoHolder serviceInfoHolder) {
this.serviceInfoHolder = serviceInfoHolder;
}
@Override
public Response requestReply(Request request) {
if (request instanceof NotifySubscriberRequest) {
NotifySubscriberRequest notifyRequest = (NotifySubscriberRequest) request;
// 更新本地缓存
serviceInfoHolder.processServiceInfo(notifyRequest.getServiceInfo());
return new NotifySubscriberResponse();
}
return null;
}
}
客户端订阅监听器回调
上面ServiceInfoHolder中就会发步InstancesChangeEvent从而触发回调监听的逻辑。nacos事件实现在后面的文章再分析。
InstancesChangeNotifier:
public void onEvent(InstancesChangeEvent event) {
String key = ServiceInfo
.getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters());
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (CollectionUtils.isEmpty(eventListeners)) {
return;
}
for (final EventListener listener : eventListeners) {
final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event);
if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {
((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent));
} else {
listener.onEvent(namingEvent);
}
}
}