nacos健康检查机制

fxz大约 33 分钟

nacos健康检查机制

概述

想象一下这么一个场景,你所在的地区突然发生地质灾害,你被掩盖在废墟下面,搜救队必须要知道你在废墟里面那么才能对你进行施救。那么有什么方法可以让救援队知道你在废墟下面?

第一种,你在废墟里面大喊help! help! I am here! ,让搜救队知道你的位置和健康状态。

第二种,搜救队使用了他们的专业检查设备,探测到你正埋在废墟下面。 这两种检查方式其实也可以类比到我们对于服务的探测,我们需要知道一个服务是否还健康。那么第一种方式是客户端主动上报,告诉服务端自己健康状态,如果在一段时间没有上报,那么我们就认为服务已经不健康。第二种,则是服务端主动向客户端进行探测,检查客户端是否还被能探测到。 image.png 再回到前面的场景中,如果是你在废墟中大声呼叫救援队并且提供你的位置和健康信息,那么相比于搜救队用探测设备挨着废墟探测会使探测队的工作量减轻很多,那么他可以专注于尽快将你救出。这也好比于注册中心对于服务健康状态的检测,如果所有服务都需要注册中心去主动探测,由于服务的数量远大于注册中心的数量,那么注册中心的任务量将会比较巨大。所以我们自然而然会想到,那就都采用服务主动上报的方式进行健康检查。那如果在废墟之下的我们因为身体状况无法呼救,那么搜救队就会放弃搜救了吗?当然不是,搜救队肯定也会对废墟进行全面探测将你救出。类比到服务健康检查,如果服务本身就没法主动进行健康上报,那么这个时候注册中心主动检查健康状态就有用武之地了。image.png 在当前主流的注册中心,对于健康检查机制主要都采用了 TTL(Time To Live)机制,即客户端在一定时间没有向注册中心发送心跳,那么注册中心会认为此服务不健康,进而触发后续的剔除逻辑。对于主动探测的方式那么根据不同的场景,需要采用的方式可能会有不同。

既然以上两种健康检查机制都有应用的场景,且适用场景不一致,那么 Nacos 对于健康检查的机制又是如何抉择的呢? 在介绍 Nacos 的健康检查机制之前,我们先回顾一下 Nacos 服务有什么特点。Nacos 提供了两种服务类型供用户注册实例时选择,分为非持久化实例(又称临时实例)和持久化实例(又称永久实例)。 临时实例只是临时存在于注册中心中,会在服务下线或不可用时被注册中心剔除,临时实例会与注册中心保持心跳,注册中心会在一段时间没有收到来自客户端的心跳后会将实例设置为不健康,然后在一段时间后进行剔除。 永久实例在被删除之前会永久的存在于注册中心,且有可能并不知道注册中心存在,不会主动向注册中心上报心跳,那么这个时候就需要注册中心主动进行探活。 从上面的介绍我们可以看出,Nacos 中两种健康探测方式均有被使用,Nacos 中健康检查的整体交互如下如所示。下面我们会详细介绍 Nacos 中对于两种实例的健康检查机制。 Nacos两种实例健康检查机制

临时实例健康检查机制

在 Nacos 中,用户可以通过两种方式进行临时实例的注册,通过 Nacos 的 OpenAPI 进行服务注册或通过 Nacos 提供的 SDK 进行服务注册。 OpenAPI 的注册方式实际是用户根据自身需求调用 Http 接口对服务进行注册,然后通过 Http 接口发送心跳到注册中心。在注册服务的同时会注册一个全局的客户端心跳检测的任务。在服务一段时间没有收到来自客户端的心跳后,该任务会将其标记为不健康,如果在间隔的时间内还未收到心跳,那么该任务会将其剔除。 SDK 的注册方式实际是通过 RPC 与注册中心保持连接(Nacos 2.x 版本中,旧版的还是仍然通过 OpenAPI 的方式),客户端会定时的通过 RPC 连接向 Nacos 注册中心发送心跳,保持连接的存活如果客户端和注册中心的连接断开,那么注册中心会主动剔除该 client 所注册的服务,达到下线的效果。同时 Nacos 注册中心还会在注册中心启动时,注册一个过期客户端清除的定时任务,用于删除那些健康状态超过一段时间的客户端。 从上面的特点我们可以发现,对于不同类型的使用方式,Nacos 对于健康检查的特点实际都是相同的,都是由客户端向注册中心发送心跳,注册中心会在连接断开或是心跳过期后将不健康的实例移除。 image.png

永久实例健康检查机制

Nacos 中使用 SDK 对于永久实例的注册实际也是使用 OpenAPI 的方式进行注册,这样可以保证即使是客户端下线后也不会影响永久实例的健康检查。 对于永久实例的的健康检查,Nacos 采用的是注册中心探测机制,注册中心会在持久化服务初始化时根据客户端选择的协议类型注册探活的定时任务。Nacos 现在内置提供了三种探测的协议,即 Http、TCP 以及 MySQL 。一般而言 Http 和 TCP 已经可以涵盖绝大多数的健康检查场景。MySQL 主要用于特殊的业务场景,例如数据库的主备需要通过服务名对外提供访问,需要确定当前访问数据库是否为主库时,那么我们此时的健康检查接口,是一个检查数据库是否为主库的 MySQL 命令。 image.png 由于持久化服务的实例的在被主动删除前一直存在的特性,探活的定时任务会不断探测服务的健康状态,并且将无法探测成功的实例标记为不健康。但是有些时候会有这样的场景,有些服务不希望去校验其健康状态,Nacos也是提供了对应的白名单配置,用户可以将服务配置到该白名单,那么 Nacos 会放弃对其进行健康检查,实例的健康状态也始终为用户传入的健康状态。

集群模式下的健康检查机制

一个完整的注册中心,是应该具备高可用的特性,即我们的注册中心是可以集群部署作为一个整体对外提供服务,当然 Nacos 也支持这样的特性。不同于单机部署,集群部署中我们的客户端只和其中一个注册中心服务保持链接和请求,但是我们的服务信息需要注册到所有的服务节点上,在其他客户端从任意一个注册中心服务获取服务列表时始终是所有的服务列表。在这种情况下,那么 Nacos 在集群模式下又是如何对不是和自己保持心跳连接的服务进行健康检查的呢? image.png 对于集群下的服务,Nacos 一个服务只会被 Nacos 集群中的一个注册中心所负责,其余节点的服务信息只是集群副本,用于订阅者在查询服务列表时,始终可以获取到全部的服务列表。临时实例只会对其被负责的注册中心节点发送心跳信息,注册中心服务节点会对其负责的永久实例进行健康探测,在获取到健康状态后由当前负责的注册中心节点将健康信息同步到集群中的其他的注册中心。 在Nacos中,服务的注册我们从注册方式维度实际可以分为两大类。第一类通过 SDK RPC 连接进行注册,客户端会和注册中心保持链接。第二类,通过 OpenAPI 进行 IP 和端口注册。 对于第一类,如何寻找到对其负责的注册中心节点呢?聪明的你肯定想到了,只需要和注册中心集群中的任意一台节点建立联系,那么由这个节点负责这个客户端就可以了。注册中心会在启动时注册一个全局的同步任务,用于将其当前负责的所有节点信息同步到集群中的其他节点,其他非负责的节点也会创建该客户端的信息,在非负责的节点上,连接类型的客户端,会有一个续约时间的概念,在收到其他节点的同步信息时,更新续约时间为当前时间,如果在集群中的其他节点在一段时间内没有收到不是自己的负责的节点的同步信息,那么认为此节点已经不健康,从而达到对不是自己负责的节点健康状态检查。image.png 对于第二类,方式其实也基本和第一类一致,OpenAPI 注册的临时实例也是通过同步自身负责的节点到其他节点来更新其他节点的对应的临时实例的心跳时间,保证其他节点不会删除或者修改此实例的健康状态。前面我们特别特别指明了是临时实例而没有说所有实例,你应该也可能会想到这种方式对于持久化节点的会显得多余,永久实例会在被主动删除前一直存在于注册中心,那么我们健康检查并不会去删除实例,所以我们只需要在负责的节点永久实例健康状态变更的时候通知到其余的节点即可。 image.png

临时实例健康检测

  • 1.x:临时实例客户端每隔5s定时向服务端发送心跳,确保Instance存活。超过15s会标记为不健康,30s剔除。
  • 2.x:临时实例客户端与服务端建立长连接,通过双向健康检查确保Client存活。服务端检测20s空闲连接,向客户端发起探测请求,如果客户端1s内响应,认为健康检查通过;客户端检测5s空闲连接,向服务端发起健康检查请求,如果服务端3s内响应,认为健康检查通过。
  • 2.x服务端如果发现Client不健康,会从ClientManager移除Client,进而触发各种事件(集群/客户端数据同步)。

add out date connection active detection,expel it when client no response. #10509open in new window

客户端在NamingGrpcClientProxy创建时,就会进行探活以及补偿处理

RpcClient#start
  
        clientEventExecutor.submit(() -> {
            while (true) {
                try {
                    if (isShutdown()) {
                        break;
                    }
                    ReconnectContext reconnectContext = reconnectionSignal
                            .poll(rpcClientConfig.connectionKeepAlive(), TimeUnit.MILLISECONDS);
                    if (reconnectContext == null) {
                        // 检查活动时间。
                        if (System.currentTimeMillis() - lastActiveTimeStamp >= rpcClientConfig.connectionKeepAlive()) {
                            boolean isHealthy = healthCheck();
                            if (!isHealthy) {
                                if (currentConnection == null) {
                                    continue;
                                }
                                LoggerUtils.printIfInfoEnabled(LOGGER,
                                        "[{}] Server healthy check fail, currentConnection = {}",
                                        rpcClientConfig.name(), currentConnection.getConnectionId());
                                
                                RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
                                if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
                                    break;
                                }
                                
                                boolean statusFLowSuccess = RpcClient.this.rpcClientStatus
                                        .compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
                                if (statusFLowSuccess) {
                                    reconnectContext = new ReconnectContext(null, false);
                                } else {
                                    continue;
                                }
                                
                            } else {
                                lastActiveTimeStamp = System.currentTimeMillis();
                                continue;
                            }
                        } else {
                            continue;
                        }
                        
                    }
                    
                    if (reconnectContext.serverInfo != null) {
                        // clear recommend server if server is not in server list.
                        boolean serverExist = false;
                        for (String server : getServerListFactory().getServerList()) {
                            ServerInfo serverInfo = resolveServerInfo(server);
                            if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
                                serverExist = true;
                                reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
                                break;
                            }
                        }
                        if (!serverExist) {
                            LoggerUtils.printIfInfoEnabled(LOGGER,
                                    "[{}] Recommend server is not in server list, ignore recommend server {}",
                                    rpcClientConfig.name(), reconnectContext.serverInfo.getAddress());
                            
                            reconnectContext.serverInfo = null;
                            
                        }
                    }
                    reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
                } catch (Throwable throwable) {
                    // Do nothing
                }
            }
        });
        
        
        
        
        

如果客户端持续与服务端通讯,服务端是不需要主动探活的。见GrpcRequestAcceptor会调用ConnectionManager的refreshActiveTime刷新连接的lastActivetime。

// GrpcRequestAcceptor
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
    String type = grpcRequest.getMetadata().getType();
    // 请求处理器
    RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
    // 解析请求参数
    Object parseObj = GrpcUtils.parse(grpcRequest);
      // ..
    Request request = (Request) parseObj;
    try {
        Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
        RequestMeta requestMeta = new RequestMeta();
        requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
        requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
        requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
        requestMeta.setLabels(connection.getMetaInfo().getLabels());
        // 刷新长连接active time
        connectionManager.refreshActiveTime(requestMeta.getConnectionId());
        Response response = requestHandler.handleRequest(request, requestMeta);
        Payload payloadResponse = GrpcUtils.convert(response);
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
    } catch (Throwable e) {
       //...
    }
}

服务端定时任务每隔3s检测20s没有通信的链接进行探活 不成功则剔除。

NacosRuntimeConnectionEjector#ejectOutdatedConnection


  private void ejectOutdatedConnection() {
        try {

            Loggers.CONNECTION.info("Connection check task start");

            Map<String, Connection> connections = connectionManager.connections;
            int totalCount = connections.size();
            MetricsMonitor.getLongConnectionMonitor().set(totalCount);
            int currentSdkClientCount = connectionManager.currentSdkClientCount();

            Loggers.CONNECTION.info("Long connection metrics detail ,Total count ={}, sdkCount={},clusterCount={}",
                    totalCount, currentSdkClientCount, (totalCount - currentSdkClientCount));

            Set<String> outDatedConnections = new HashSet<>();
            long now = System.currentTimeMillis();
            //outdated connections collect.
            for (Map.Entry<String, Connection> entry : connections.entrySet()) {
                Connection client = entry.getValue();
                if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {
                    outDatedConnections.add(client.getMetaInfo().getConnectionId());
                }
            }

            // 检查日期连接
            Loggers.CONNECTION.info("Out dated connection ,size={}", outDatedConnections.size());
            if (CollectionUtils.isNotEmpty(outDatedConnections)) {
                Set<String> successConnections = new HashSet<>();
                final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
                for (String outDateConnectionId : outDatedConnections) {
                    try {
                        Connection connection = connectionManager.getConnection(outDateConnectionId);
                        if (connection != null) {
                            ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
                            connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
                                @Override
                                public Executor getExecutor() {
                                    return null;
                                }

                                @Override
                                public long getTimeout() {
                                    return 5000L;
                                }

                                @Override
                                public void onResponse(Response response) {
                                    latch.countDown();
                                    if (response != null && response.isSuccess()) {
                                        connection.freshActiveTime();
                                        successConnections.add(outDateConnectionId);
                                    }
                                }

                                @Override
                                public void onException(Throwable e) {
                                    latch.countDown();
                                }
                            });

                            Loggers.CONNECTION.info("[{}]send connection active request ", outDateConnectionId);
                        } else {
                            latch.countDown();
                        }

                    } catch (ConnectionAlreadyClosedException e) {
                        latch.countDown();
                    } catch (Exception e) {
                        Loggers.CONNECTION.error("[{}]Error occurs when check client active detection ,error={}",
                                outDateConnectionId, e);
                        latch.countDown();
                    }
                }

                latch.await(5000L, TimeUnit.MILLISECONDS);
                Loggers.CONNECTION.info("Out dated connection check successCount={}", successConnections.size());

                for (String outDateConnectionId : outDatedConnections) {
                    if (!successConnections.contains(outDateConnectionId)) {
                        Loggers.CONNECTION.info("[{}]Unregister Out dated connection....", outDateConnectionId);
                        connectionManager.unregister(outDateConnectionId);
                    }
                }
            }

            Loggers.CONNECTION.info("Connection check task end");

        } catch (Throwable e) {
            Loggers.CONNECTION.error("Error occurs during connection check... ", e);
        }
    }

注销链接。

ConnectionManager#unregister:

// 链接注销
public synchronized void unregister(String connectionId) {
    Connection remove = this.connections.remove(connectionId);
    if (remove != null) {
        String clientIp = remove.getMetaInfo().clientIp;
        AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
        if (atomicInteger != null) {
            int count = atomicInteger.decrementAndGet();
            if (count <= 0) {
                connectionForClientIp.remove(clientIp);
            }
        }
        remove.close();
      
        LOGGER.info("[{}]Connection unregistered successfully. ", connectionId);
        clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
    }
}

通知监听器,回调注销链接逻辑。

ClientConnectionEventListenerRegistry#notifyClientDisConnected
public void notifyClientDisConnected(final Connection connection) {
    
    for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {
        try {
            clientConnectionEventListener.clientDisConnected(connection);
        } catch (Throwable throwable) {
            Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}",
                    clientConnectionEventListener.getName(), throwable);
        }
    }
    
}

删除客户端信息,发布事件,触发删除索引、集群同步。

ConnectionBasedClientManager#clientDisConnected
@Override
public void clientDisConnected(Connection connect) {
    clientDisconnected(connect.getMetaInfo().getConnectionId());
}

@Override
public boolean clientDisconnected(String clientId) {
    Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
 		// 先删客户端  
 	 	ConnectionBasedClient client = clients.remove(clientId);
    if (null == client) {
        return true;
    }
    client.release();
    boolean isResponsible = isResponsibleClient(client);
  	// 后删索引
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientReleaseEvent(client, isResponsible));
  	// 集群同步
    NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client, isResponsible));
    return true;
}
ClientServiceIndexesManager:


@Override
public void onEvent(Event event) {
    if (event instanceof ClientOperationEvent.ClientReleaseEvent) {
        handleClientDisconnect((ClientOperationEvent.ClientReleaseEvent) event);
    } else if (event instanceof ClientOperationEvent) {
        handleClientOperation((ClientOperationEvent) event);
    }
}

// 删除客户端相关的索引
private void handleClientDisconnect(ClientOperationEvent.ClientReleaseEvent event) {
    Client client = event.getClient();
    for (Service each : client.getAllSubscribeService()) {
        removeSubscriberIndexes(each, client.getClientId());
    }
    DeregisterInstanceReason reason = event.isNative()
            ? DeregisterInstanceReason.NATIVE_DISCONNECTED : DeregisterInstanceReason.SYNCED_DISCONNECTED;
    long currentTimeMillis = System.currentTimeMillis();
    for (Service each : client.getAllPublishedService()) {
        removePublisherIndexes(each, client.getClientId());
        InstancePublishInfo instance = client.getInstancePublishInfo(each);
        NotifyCenter.publishEvent(new DeregisterInstanceTraceEvent(currentTimeMillis,
                "", false, reason, each.getNamespace(), each.getGroup(), each.getName(),
                instance.getIp(), instance.getPort()));
    }
}
DistroClientDataProcessor:

// 集群同步
private void syncToAllServer(ClientEvent event) {
    Client client = event.getClient();
    if (isInvalidClient(client)) {
        return;
    }
    if (event instanceof ClientEvent.ClientDisconnectEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.DELETE);
    } else if (event instanceof ClientEvent.ClientChangedEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.CHANGE);
    }
}

ConnectionBasedClientManager的ExpiredClientCleaner会清理非本节点负责的客户端信息,本节点负责的由上述NacosRuntimeConnectionEjector#ejectOutdatedConnection剔除

ConnectionBasedClientManager// nacos集群清理非本节点负责的客户端信息
private static class ExpiredClientCleaner implements Runnable {
    
    private final ConnectionBasedClientManager clientManager;
    
    public ExpiredClientCleaner(ConnectionBasedClientManager clientManager) {
        this.clientManager = clientManager;
    }
    
    @Override
    public void run() {
        long currentTime = System.currentTimeMillis();
        for (String each : clientManager.allClientId()) {
            ConnectionBasedClient client = (ConnectionBasedClient) clientManager.getClient(each);
            if (null != client && client.isExpire(currentTime)) {
                clientManager.clientDisconnected(each);
            }
        }
    }
}

持久实例健康检测

客户端管理器 ClientManagerDelegate 实现 ClientManager 接口,持有 EphemeralIpPortClientManager、 PersistentIpPortClientManage 客户端管理器。PersistentIpPortClientManage对应的是基于Ip、端口的永久节点客户端管理器。

在注册实例,根据客户端Id获取连接时,会设置心跳检测的定时任务:

0
0

最终执行到 PersistentIpPortClientManager 的 clientConnected 方法:

 1 public boolean clientConnected(final Client client) {
 2     clients.computeIfAbsent(client.getClientId(), s -> {
 3         Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
 4         IpPortBasedClient ipPortBasedClient = (IpPortBasedClient) client;
 5         // 设置心跳检查定时任务
 6         ipPortBasedClient.init();
 7         return ipPortBasedClient;
 8     });
 9     return true;
10 }

IpPortBasedClient#init() 心跳检查定时任务如下:

 1 /**
 2  * 初始化client,设置心跳检测定时任务
 3  * Init client.
 4  */
 5 public void init() {
 6     // 临时节点
 7     if (ephemeral) {
 8         // ClientBeatCheckTaskV2 作为心跳检测的任务
 9         beatCheckTask = new ClientBeatCheckTaskV2(this);
10         HealthCheckReactor.scheduleCheck(beatCheckTask);
11     // 永久节点
12     } else {
13         // HealthCheckTaskV2 作为心跳检测的任务
14         healthCheckTaskV2 = new HealthCheckTaskV2(this);
15         HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
16     }
17 }

前提是不是grpc的客户端:

​ 临时实例:使用 ClientBeatCheckTaskV2 处理健康检查。

永久实例:使用 HealthCheckTaskV2 处理健康检查。

永久节点的健康检查是由服务端定时与客户端建立tcp连接做健康检查,是服务端主动发起的探测,服定时请求客户端判断是否健康。

永久实例使用 HealthCheckTaskV2 处理健康检查,HealthCheckTaskV2类图如下:

0
0

通过实现的run()方法得知,执行 HealthCheckTaskV2#doHealthCheck() 完成发起永久实例的健康检测。

最终执行 HealthCheckProcessorV2Delegate#process(),HealthCheckProcessorV2Delegate是一个代理类,具体的健康检查由内部维护的healthCheckProcessorMap中的具体实现类完成。

 1 /**
 2  * 健康检查处理代理类
 3  */
 4 public class HealthCheckProcessorV2Delegate implements HealthCheckProcessorV2 {
 5 
 6     // 健康检查实现类 容器
 7     private final Map<String, HealthCheckProcessorV2> healthCheckProcessorMap = new HashMap<>();
 8 
 9     @Override
10     public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
11         // 从元数据中获取当前客户端的健康检查类型,默认TCP (HTTP、TCP、MYSQL、NONE) => HealthCheckType 枚举类
12         String type = metadata.getHealthyCheckType();
13         // 根据类型从缓存中获取 健康检查具体处理类
14         HealthCheckProcessorV2 processor = healthCheckProcessorMap.get(type);
15         // 找不到处理类 使用 NoneHealthCheckProcessor 做健康检查,及什么都不做
16         if (processor == null) {
17             processor = healthCheckProcessorMap.get(NoneHealthCheckProcessor.TYPE);
18         }
19         processor.process(task, service, metadata);
20     }
21 }

HealthCheckProcessorV2类图如下:

0
0

子类实现与 HealthCheckType 枚举中的健康检查类型一一对应,下面来看下具体实现:

HttpHealthCheckProcessor

HttpHealthCheckProcessor的process是通过 RestTemplate 完成健康检测请求。

0
0

MysqlHealthCheckProcessor

MysqlHealthCheckProcessor的process是执行配置中的sql,完成健康检测。

0
0

健康检测任务 MysqlCheckTask#run(),通过执行SQL来完成健康检测,执行过程不报异常,证明实例健康。

0
0

TcpHealthCheckProcessor

TcpHealthCheckProcessor是通过构建Socket,然后对连接或读入事件进行监听,完成健康检测。

0
0

需要进行健康检测的实例添加到队列后,是在哪里执行的呢?

TcpHealthCheckProcessor实现了Runables接口,在TcpHealthCheckProcessor构造函数中启动了该任务。

0
0

在run方法中,执行了 processTask处理任务:

0
0

TcpHealthCheckProcessor#processTask() 详情如下:

 1 /**
 2  * 处理任务
 3  * @throws Exception
 4  */
 5 private void processTask() throws Exception {
 6     Collection<Callable<Void>> tasks = new LinkedList<>();
 7     do {
 8         // 获取队列中需要进行心跳检测的实例
 9         Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);
10         if (beat == null) {
11             return;
12         }
13 
14         // 添加到任务队列
15         tasks.add(new TaskProcessor(beat));
16     } while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);
17 
18     // 调用队列中任务,并获取执行结果
19     for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) {
20         f.get();
21     }
22 }

NoneHealthCheckProcessor

NoneHealthCheckProcessor什么都不做,不对示例进行健康检测。

img
img