nacos集群同步

fxz大约 24 分钟

nacos集群同步

一致性协议

为什么 Nacos 需要一致性协议

Nacos 在开源支持就定下了一个目标,尽可能的减少用户部署以及运维成本,做到用户只需要一个程序包,就可以快速以单机模式启动 Nacos 或者以集群模式启动 Nacos。

而 Nacos 是一个需要存储数据的一个组件,因此,为了实现这个目标,就需要在 Nacos 内部实现数据存储。单机下其实问题不大,简单的内嵌关系型数据库即可;但是集群模式下,就需要考虑如何保障各个节点之间 的数据一致性以及数据同步,而要解决这个问题,就不得不引入共识算法,通过算法来保障各个节点之间的数据的一致性。

为什么 Nacos 选择了 Raft 以及 Distro

为什么 Nacos 会在单个集群中同时运行 CP 协议以及 AP 协议呢?这其实要从 Nacos 的场景出发的:Nacos 是一个集服务注册发现以及配置管理于一体的组件,因此对于集群下,各个节点之间 的数据一致性保障问题,需要拆分成两个方面:

  • 从服务注册发现来看

    • 服务发现注册中心,在当前微服务体系下,是十分重要的组件,服务之间感知对方服务的当前可正常提供服务的实例信息,必须从服务发现注册中心进行获取,因此对于服务注册发现中心组件的可用性,提出了很高的要求,需要在任何场景下,尽最大可能保证服务注册发现能力可以对外提供服务;同时 Nacos 的服务注册发现设计,采取了心跳可自动完成服务数据补偿的机制。如果数据丢失的话,是可以通过该机制快速弥补数据丢失。

      因此,为了满足服务发现注册中心的可用性,强一致性的共识算法这里就不太合适了,因为强一致性共识算法能否对外提供服务是有要求的,如果当前集群可用的节点数没有过半的话,整个算法直接“罢工”,而最终一致共识算法的话,更多保障服务的可用性,并且能够保证在一定的时间内各个节点之间的数据能够达成一致。

      上述的都是针对于 Nacos 服务发现注册中的非持久化服务而言(即需要客户端上报心跳进行服务实 例续约)。

      而对于 Nacos 服务发现注册中的持久化服务,因为所有的数据都是直接使用调用 Nacos 服务端直接创建,因此需要由 Nacos 保障数据在各个节点之间的强一致性,故而针对此类型的服务数据,选择了强一致性共识算法来保障数据的一致性。

  • 从配置管理来看

    • 配置数据,是直接在 Nacos 服务端进行创建并进行管理的,必须保证大部分的节点都保存了此配置数据才能认为配置被成功保存了,否则就会丢失配置的变更,如果出现这种情况,问题是很严重的,如果是发布重要配置变更出现了丢失变更动作的情况,那多半就要引起严重的现网故障了,因此对于配置数据的管理,是必须要求集群中大部分的节点是强一致的,而这里的话只能使用强一致性共识算法。

Distro设计思想

  • Nacos 每个节点是平等的都可以处理写请求,同时把新数据同步到其他节点。
  • 每个节点只负责部分数据,定时发送自己负责数据的校验值到其他节点来保持数据一致性。
  • 每个节点独立处理读请求,及时从本地发出响应。

数据初始化

新加入的 Distro 节点会进行全量数据拉取。具体操作是轮询所有的 Distro 节点,通过向其他的机器发送请求拉取全量数据。

在全量拉取操作完成之后,Nacos 的每台机器上都维护了当前的所有注册上来的非持久化实例数据。

数据校验

在 Distro 集群启动之后,各台机器之间会定期的发送心跳。心跳信息主要为各个机器上的所有数据的元信息(之所以使用元信息,是因为需要保证网络中数据传输的量级维持在一个较低水平)。这 种数据校验会以心跳的形式进行,即每台机器在固定时间间隔会向其他机器发起一次数据校验请求。

一旦在数据校验过程中,某台机器发现其他机器上的数据与本地数据不一致,则会发起一次全量拉 取请求,将数据补齐。

写操作

对于一个已经启动完成的 Distro 集群,在一次客户端发起写操作的流程中,当注册非持久化的实例 的写请求打到某台 Nacos 服务器时,Distro 集群处理的流程图如下。

整个步骤包括几个部分(图中从上到下顺序):

  • 前置的 Filter 拦截请求,并根据请求中包含的 IP 和 port 信息计算其所属的 Distro 责任节点, 并将该请求转发到所属的 Distro 责任节点上。
  • 责任节点上的 Controller 将写请求进行解析。
  • Distro 协议定期执行 Sync 任务,将本机所负责的所有的实例信息同步到其他节点上。

读操作

由于每台机器上都存放了全量数据,因此在每一次读操作中,Distro 机器会直接从本地拉取数据。 快速响应。

这种机制保证了 Distro 协议可以作为一种 AP 协议,对于读操作都进行及时的响应。在网络分区 的情况下,对于所有的读操作也能够正常返回;当网络恢复时,各个 Distro 节点会把各数据分片的 数据进行合并恢复。

小结

Distro 协议是 Nacos 对于临时实例数据开发的一致性协议。其数据存储在缓存中,并且会在启动时进行全量数据同步,并定期进行数据校验。

在 Distro 协议的设计思想下,每个 Distro 节点都可以接收到读写请求。所有的 Distro 协议的请 求场景主要分为三种情况:

  1. 当该节点接收到属于该节点负责的实例的写请求时,直接写入。

  2. 当该节点接收到不属于该节点负责的实例的写请求时,将在集群内部路由,转发给对应的节点,从而完成读写。

  3. 当该节点接收到任何读请求时,都直接在本机查询并返回(因为所有实例都被同步到了每台机器上)。

Distro 协议作为 Nacos 的内嵌临时实例一致性协议,保证了在分布式环境下每个节点上面的服务信息的状态都能够及时地通知其他节点,可以维持数十万量级服务实例的存储和一致性。

Distro集群同步

  • 2.x因为使用了长连接,只要ConnectionBasedClient.isNative=true,代表Client与这个节点直连,Client所在的节点就是责任节点,减少了写请求重定向其他节点的损耗。
  • 责任节点的Client数据发生变更后,会同步这个Client的全量数据给其他非责任节点。非责任节点更新ClientManager中的Client信息。
  • 为了避免非责任节点的isNative=false的Client数据不一致:
    • 责任节点每5s向非责任节点发送VERIFY数据,续租这些Client,包含了Client全量数据;
    • 非责任节点定时扫描isNative=false的Client数据,如果超过30s没有续租,移除这些非native的client。

当一个客户端发布或注销服务,会在Client模型里存储发布Service对应的Instance信息。

public abstract class AbstractClient implements Client {
    protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
    @Override
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
        if (null == publishers.put(service, instancePublishInfo)) {
            MetricsMonitor.incrementInstanceCount();
        }
        NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        return true;
    }

    @Override
    public InstancePublishInfo removeServiceInstance(Service service) {
        InstancePublishInfo result = publishers.remove(service);
        if (null != result) {
            NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        }
        return result;
    }
}

责任节点

完成本地写入注册信息后,触发ClientChangedEvent事件,DistroClientDataProcessor只会处理当前节点负责的client。

// DistroClientDataProcessor.java
private void syncToAllServer(ClientEvent event) {
    Client client = event.getClient();
    // Only ephemeral data sync by Distro, persist client should sync by raft.
    if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
        return;
    }
    if (event instanceof ClientEvent.ClientDisconnectEvent) {
        // 客户端断开连接
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.DELETE);
    } else if (event instanceof ClientEvent.ClientChangedEvent) {
        // 客户端新增/修改
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.CHANGE);
    }
}

DistroProtocol循环所有其他nacos节点,提交一个异步任务,这个异步任务会延迟1s(nacos.core.protocol.distro.data.sync_delay_ms)执行。

// DistroProtocol.java
public void sync(DistroKey distroKey, DataOperation action, long delay) {
    for (Member each : memberManager.allMembersWithoutSelf()) {
        syncToTarget(distroKey, action, each.getAddress(), delay);
    }
}

public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
    DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
            targetServer);
    DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
    distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
}

对于DELETE操作,由DistroSyncDeleteTask处理;

对于CHANGE操作,由DistroSyncChangeTask处理。

public class DistroSyncChangeTask extends AbstractDistroExecuteTask {
    private static final DataOperation OPERATION = DataOperation.CHANGE;
    // 无callback
    @Override
    protected boolean doExecute() {
        String type = getDistroKey().getResourceType();
        DistroData distroData = getDistroData(type);
        if (null == distroData) {
            return true;
        }
        return getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer());
    }
    // 有callback
    @Override
    protected void doExecuteWithCallback(DistroCallback callback) {
        String type = getDistroKey().getResourceType();
        DistroData distroData = getDistroData(type);
        getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer(), callback);
    }
    // 从DistroClientDataProcessor获取DistroData
    private DistroData getDistroData(String type) {
        DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());
        if (null != result) {
            result.setType(OPERATION);
        }
        return result;
    }
}

从DistroClientDataProcessor获取DistroData,是从ClientManager实时获取Client。

// DistroClientDataProcessor
public DistroData getDistroData(DistroKey distroKey) {
    Client client = clientManager.getClient(distroKey.getResourceKey());
    if (null == client) {
        return null;
    }
    byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
    return new DistroData(distroKey, data);
}

AbstractClient给DistroClientDataProcessor提供Client的注册的所有信息,包括客户端注册了哪些namespace,哪些group,哪些service,哪些instance。

// AbstractClient
protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
public ClientSyncData generateSyncData() {
    List<String> namespaces = new LinkedList<>();
    List<String> groupNames = new LinkedList<>();
    List<String> serviceNames = new LinkedList<>();
    List<InstancePublishInfo> instances = new LinkedList<>();
    for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
        namespaces.add(entry.getKey().getNamespace());
        groupNames.add(entry.getKey().getGroup());
        serviceNames.add(entry.getKey().getName());
        instances.add(entry.getValue());
    }
    return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
}

最终DistroClientTransportAgent封装为DistroDataRequest调用其他Nacos节点。

// DistroClientTransportAgent
public void syncData(DistroData data, String targetServer, DistroCallback callback) {
    if (isNoExistTarget(targetServer)) {
        callback.onSuccess();
    }
    DistroDataRequest request = new DistroDataRequest(data, data.getType());
    Member member = memberManager.find(targetServer);
    try {
        clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback));
    } catch (NacosException nacosException) {
        callback.onFailed(nacosException);
    }
}

非责任节点

非责任节点处理责任节点同步过来的Client数据。

DistroClientDataProcessor处理责任节点同步过来的数据。

// DistroClientDataProcessor
public boolean processData(DistroData distroData) {
  switch (distroData.getType()) {
    case ADD:
    case CHANGE:
      ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
        .deserialize(distroData.getContent(), ClientSyncData.class);
      handlerClientSyncData(clientSyncData);
      return true;
    case DELETE:
      String deleteClientId = distroData.getDistroKey().getResourceKey();
      clientManager.clientDisconnected(deleteClientId);
      return true;
    default:
      return false;
  }
}

private void handlerClientSyncData(ClientSyncData clientSyncData) {
  // 1. 保存ConnectionBasedClient,这类ConnectionBasedClient的isNative=false
  clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
  Client client = clientManager.getClient(clientSyncData.getClientId());
  // 2. 更新Client信息
  upgradeClient(client, clientSyncData);
}

注意这里Client的实现类仍然是ConnectionBasedClient,只不过它的isNative属性为false,这是非责任节点与责任节点的主要区别。

DistroClientDataProcessor的upgradeClient方法,更新Client里的注册表信息,发布对应事件。

va// DistroClientDataProcessor
private void upgradeClient(Client client, ClientSyncData clientSyncData) {
    List<String> namespaces = clientSyncData.getNamespaces();
    List<String> groupNames = clientSyncData.getGroupNames();
    List<String> serviceNames = clientSyncData.getServiceNames();
    List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
    Set<Service> syncedService = new HashSet<>();
    for (int i = 0; i < namespaces.size(); i++) {
        Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        syncedService.add(singleton);
        InstancePublishInfo instancePublishInfo = instances.get(i);
        if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
            client.addServiceInstance(singleton, instancePublishInfo);
            NotifyCenter.publishEvent(
                    new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
        }
    }
    for (Service each : client.getAllPublishedService()) {
        if (!syncedService.contains(each)) {
            client.removeServiceInstance(each);
            NotifyCenter.publishEvent(
                    new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
        }
    }
}

DistroFilter?

1.x版本,所有客户端的写请求都会经过DistroFilter。

Nacos源码(九)2.0注册中心DistroFilter.png

如果hash(服务名)%nacos节点列表大小==当前节点所处下标,则当前节点是责任节点,处理客户端写请求。

// DistroMapper
public boolean responsible(String responsibleTag) {
    final List<String> servers = healthyList;
    if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {
        return true;
    }
    if (CollectionUtils.isEmpty(servers)) {
        return false;
    }
    int index = servers.indexOf(EnvUtil.getLocalAddress());
    int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());
    if (lastIndex < 0 || index < 0) {
        return true;
    }

    int target = distroHash(responsibleTag) % servers.size();
    return target >= index && target <= lastIndex;
}

否则,1.x中需要将客户端请求交由责任节点处理,责任节点处理后,由当前节点返回客户端。

而在2.x中,DistroFilter对于客户端就没用了,因为客户端与服务端会建立长连接,当前nacos节点是否是责任节点,取决于Client身上的isNative属性。如果是客户端直接注册在这个nacos节点上的ConnectionBasedClient,它的isNative属性为true;如果是由Distro协议,同步到这个nacos节点上的ConnectionBasedClient,它的isNative属性为false。

public class ConnectionBasedClient extends AbstractClient {
    /**
     * {@code true} means this client is directly connect to current server. {@code false} means this client is synced
     * from other server.
     */
    private final boolean isNative;
}

综上,2.x减少了1.x当中写请求转发的步骤,通过长连接建立在哪个节点上,哪个节点就是责任节点,客户端也只会向这个责任节点发送请求。

Verify

Distro为了确保集群间数据一致,不仅仅依赖于数据发生改变时的实时同步,后台有定时任务做数据同步。

在1.x版本中,责任节点每5s同步所有Service的Instance列表的摘要(md5)给非责任节点。

非责任节点用对端传来的服务md5比对本地服务的md5,如果发生改变,需要反查责任节点。

在2.x版本中,对这个流程做了改造,责任节点会发送Client全量数据,非责任节点定时检测同步过来的Client是否过期,减少1.x版本中的反查。

  • 责任节点每5s向其他节点发送DataOperation=VERIFY类型的DistroData,来维持非责任节点的Client数据不过期。
public class DistroVerifyTimedTask implements Runnable {
    @Override
    public void run() {
        try {
            // 1. 所有其他节点
            List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
            for (String each : distroComponentHolder.getDataStorageTypes()) {
                // 2. 向这些节点发送Client.isNative=true的DistroData,type = VERIFY
                verifyForDataStorage(each, targetServer);
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
        }
    }
}
  • 非责任节点每5s扫描isNative=false的client,如果client30s内没有被VERIFY的DistroData更新过续租时间,会删除这个同步过来的Client数据。
private static class ExpiredClientCleaner implements Runnable {
    private final ConnectionBasedClientManager clientManager;
    @Override
    public void run() {
        long currentTime = System.currentTimeMillis();
        for (String each : clientManager.allClientId()) {
            ConnectionBasedClient client = (ConnectionBasedClient) clientManager.getClient(each);
            if (null != client && isExpireClient(currentTime, client)) {
                clientManager.clientDisconnected(each);
            }
        }
    }
    private boolean isExpireClient(long currentTime, ConnectionBasedClient client) {
        // 同步client 且 30s内没有续租 认为过期
        return !client.isNative() && currentTime - client.getLastRenewTime() > Constants.DEFAULT_IP_DELETE_TIMEOUT;
    }
}