浅析 Nacos2.x 注册客户端启动流程
注意:本文所有内容均是基于Nacos2.0.2版本。
客户端服务注册流程
Nacos 的服务注册的统一入口在 NamingService 这个接口处定义,通过调用 registerInstance()方法来注册。在 NacosNamingService 处实现了 NamingService 服务注册接口的基础注册方法:registerInstance()。而其具体实现是委托给了 NacosNamingServiceDelegate ,这个类比较特殊,在 1.x 版本是没有的,是在 2.x 版本中新增了 Grpc 之后为了适配原先 1.x 版本的 Nacos-Client 而存在的,在旧版本的客户端中,NacosNamingService 的服务注册请求是交给其代理类直接去发送 Http 请求 Server 来实现服务注册的。具体的暂时不做深入。在委托类中,进行 Grpc 和 Http 分流的操作我们可以初窥一下:
1 2 3
| private NamingClientProxy getExecuteClientProxy(Instance instance) { return instance.isEphemeral() ? grpcClientProxy : httpClientProxy; }
|
应该还是比较易懂的,直接根据服务实例的节点类型来判断,是临时节点的则直接走 GRPC ,否则 Http。
在处理 Grpc 请求的代理类 NamingGrpcClientProxy 中 继续调用其 registerService()方法,然后通过 requestToServer()封装 Grpc 请求,最后由 GrpcClient 去请求 Nacos-Server 注册服务。
源码解析
我们将会基于我们的流程,来分析 NamingService 实现类以及其代理类初始化流程。
一般的,我们会通过一个工厂方法 NamingFactory,来创建我们的 NamingService 的实例:
1 2 3 4 5
| Properties properties = new Properties();
properties.setProperty("serverAddr", "127.0.0.1:8848");
NamingService naming = NamingFactory.createNamingService(properties);
|
在创建实例的时候我们会传入一个 Properties 对象到我们的 NamingService 的构造方法中来初始化我们的 NamingService 对象。我们看一下其实现类 NacosNamingService 的构造函数是怎么处理的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public NacosNamingService(Properties properties) throws NacosException { init(properties); } private void init(Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); // 1 检查 ContextPath 的合法性 this.namespace = InitUtils.initNamespaceForNaming(properties); // 2 初始化 namespace InitUtils.initSerialization(); // 3 主要是为了序列化安全 InitUtils.initWebRootContext(properties); // 4 初始化 contextPath initLogName(properties); // 5 初始化日志前缀名称 this.changeNotifier = new InstancesChangeNotifier(); // 6 NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384); // 7 NotifyCenter.registerSubscriber(changeNotifier); // 8 this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties); // 9 this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier); // 10 }
|
我们依次来看看,NacosNamingService 构造函数调用了一个 init()方法,其中主要做了 11 件事情。其中,我们主要看后面五个。
首先看:
1
| this.changeNotifier = new InstancesChangeNotifier(); // 6
|
这里会新 new 一个 InstancesChangeNotifier 对象,见名知意,大概猜出它是用于实例变更的通知的,这一步我们在初始化流程中先不做详细分析,在后面会单独出来一个章节去解释这个 NotifyCenter 和 一系列的Notifier,Subscriber的妙用。抛开业务流程,这部分个人觉得还是可以品一下的。
好,那这样我们就直接跳到 init() 方法的最后两行:
1 2 3 4
| // 初始化 ServiceInfoHolder this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties); // 初始化 NamingClientProxyDelegate this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
|
看一下 ServiceInfoHolder ,这里就体现了命名规范的重要性了,我们还是可以使用那个词 – 见名知意,这个是一个服务信息的持有者,在我们将我们的服务注册到 Nacos 上之后,那么这个类将是我们交互访问最多的一个类。它的主要工作内容就是负责缓存,本地化服务信息以及预防故障的冗余处理,来看一下它比较重要的两个属性:
1 2 3 4
| // 用于缓存 private final ConcurrentMap<String, ServiceInfo> serviceInfoMap; // 用于故障冗余 private final FailoverReactor failoverReactor;
|
不难看出,ServiceInfoHolder 主要做的事情就是对服务信息的缓存以及故障冗余。
下面再来看一下他的构造方法:
1 2 3 4 5 6 7 8 9 10 11 12 13
| public ServiceInfoHolder(String namespace, Properties properties) { initCacheDir(namespace, properties); // 判断是否需要在启动的时候读取缓存的配置 if (isLoadCacheAtStart(properties)) { this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir)); } else { this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16); } // 初始化一个 FailoverReactor this.failoverReactor = new FailoverReactor(this, cacheDir); // 判断是否配置了空消息推送保护,开启了的时候在服务端推送错误或者空的服务信息的时候// 会忽略. this.pushEmptyProtection = isPushEmptyProtect(properties); }
|
很简单,初始化了 Map 和故障切换,还有就是根据配置判断是否需要忽略服务端无效的推送信息。
回到 init()方法,这是一个决定我们能否执行注册订阅等请求的一个类 – NamingClientProxy ,这个类的作用其实就是一个我们客户端中 1.x 和 2.x 的请求的适配和分发。我们知道,在 1.x 和 2.x 两个版本迭代的最大区别之一便是客户端服务端之间请求方式的改变,从原本的 http 优化到了 grpc 的请求形式。由于我们分析的主要是 2.x 的版本,所以客户端服务端均是默认 grpc 。当然,对于低版本客户端连接高版本服务端是可以兼容的,但反过来就不行了。
来,直接看一下 NamingClientProxyDelegate 的构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties, InstancesChangeNotifier changeNotifier) throws NacosException { // 初始化一个ServiceInfo更新的服务 this.serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this, changeNotifier); // 初始化 服务端地址管理的服务 this.serverListManager = new ServerListManager(properties, namespace); // 将 NacosNamingService 传递过来的 ServiceInfoHolder 指向自己内部的 ServiceInfoHolder. this.serviceInfoHolder = serviceInfoHolder; // 安全管理 this.securityProxy = new SecurityProxy(properties, NamingHttpClientManager.getInstance().getNacosRestTemplate()); initSecurityProxy(); // 初始化Http方式的请求服务 this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder); // 初始化Grpc方式的请求服务 this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder); }
|
这里我们直接看第一行和最后一行,中间的部分操作对于我们分析 2.x 的业务流程可以省略,有兴趣的读者可以自行去翻阅源码。首先看的是这个叫做 ServiceInfoUpdateService 的服务的作用。直接去到他的构造函数中:
1 2 3 4 5 6 7 8
| public ServiceInfoUpdateService(Properties properties, ServiceInfoHolder serviceInfoHolder, NamingClientProxy namingClientProxy, InstancesChangeNotifier changeNotifier) { this.executor = new ScheduledThreadPoolExecutor(initPollingThreadCount(properties), new NameThreadFactory("com.alibaba.nacos.client.naming.updater")); this.serviceInfoHolder = serviceInfoHolder; this.namingClientProxy = namingClientProxy; this.changeNotifier = changeNotifier; }
|
在构造器中初始化了一个用于任务调度的线程池 – ScheduledThreadPoolExecutor ,在向 ServiceInfoUpdateService 的 scheduleUpdateIfAbsent() 方法的时候便会使用这个内部的线程池去做一些更新注册服务相关的信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) { String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters); if (futureMap.get(serviceKey) != null) { return; } synchronized (futureMap) { if (futureMap.get(serviceKey) != null) { return; } ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters)); futureMap.put(serviceKey, future); } } private synchronized ScheduledFuture<?> addTask(UpdateTask task) { return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS); }
|
在更新任务 UpdateTask 的run() 方法中,更新的对象便是我们之前分析的 ServiceInfoHolder 。
再来看 NamingGrpcClientProxy :
1 2
| this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);
|
这段代码主要便是初始化 NamingGrpcClientProxy 这个 Grpc 请求的执行类,期间涉及到 Rpc 初始化和启动的流程,其内容篇幅比较长,我们会在后面单独起一个章节去分析,在这我们先简单看一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| // 构造器 public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory, Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException { super(securityProxy, properties); this.namespaceId = namespaceId; // 用于标识一个 RpcClient this.uuid = UUID.randomUUID().toString(); // rpc请求超时时间 this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1")); Map<String, String> labels = new HashMap<String, String>(); labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK); labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING); // 创建一个 RpcClient 用于 Rpc 请求 this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels); // 在 RpcClient 中会有链接结果的回调,这里是将自己注册到对应的 Linstener 中,用于相关事件发生时的回调 this.namingGrpcConnectionEventListener = new NamingGrpcConnectionEventListener(this); // 启动 start(serverListFactory, serviceInfoHolder); } // rpc请求客户端的启动,我们放在后面分析 private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException { rpcClient.serverListFactory(serverListFactory); rpcClient.start(); // 向 RpcClient 注册一个用于处理服务端推送的 Handler rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder)); // 向 RpcClient 注册一个用于连接建立与断开的监听器 rpcClient.registerConnectionListener(namingGrpcConnectionEventListener); // 向消息中心注册自己 NotifyCenter.registerSubscriber(this); }
|
整个过程的目的是比较清晰的,初始化一个带有特殊标记(uuid)的 rpcClient ,这个 rpcClient 将负责后续所有的 Grpc 请求。其中,事件注册占这中间代码量的近一半,确实,Nacos 自己定义的这一套内部的事件处理流程很意思,实现了基于 EDA 与 观察者模式的事件架构,个人比较喜欢这一块的思想,我们也会在后续起一个章节去分析他是如何通过自己内部的这个 NotifyCenter 来解耦繁多的应用组件的。
到这,Nacos 注册客户端启动流程就基本结束了,后续的 API 操作便是依托在这个流程之上来运行的。期间断断续续的再写,有很多讲的不到位的地方。这是我们整个 Nacos 旅程的开胃菜,我们会在后面的章节中慢慢的一点点的去啃食,RpcClient怎么启动的?NotifyCenter是如何工作的?等等一系列的疑问我们会慢慢分析。
总结,答题的流程是比较简单的,内容大体如下图:

本人关于图片作品版权的声明:
本人在此刊载的原创作品,其版权归属本人所有。
任何传统媒体、商业公司或其他网站未经本人的授权许可,不得擅自从本人转载、转贴或者以任何其他方式复制、使用上述作品。
传统媒体、商业公司或其他网站对上述作品的任何使用,均须事先与本人联系。
对于侵犯本人的合法权益的公司、媒体、网站和人员,本人聘请的律师受本人的委托,将采取必要的措施,通过包括法律诉讼在内的途径来维护本人的合法权益。
特此声明,敬请合作。