经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » 编程经验 » 查看文章
Nacos源码阅读心得
来源:cnblogs  作者:迷茫的bug  时间:2023/10/25 17:35:26  对本文有异议

Nacos注册中心(1.4.1)源码解读心得

一丶Nacos介绍

  Nacos是阿里巴巴推出的一款新开源项目,是一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。它致力于帮助您发现、配置和管理微服务,提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。Nacos是构建以“服务”为中心的现代应用架构(例如微服务范式、云原生范式)的服务基础设施。

Nacos支持多种核心特性,包括:

  1. 服务发现:支持DNS与RPC服务发现,也提供原生SDK、OpenAPI等多种服务注册方式和DNS、HTTP与API等多种服务发现方式。
  2. 服务健康监测:提供对服务的实时的健康检查,阻止向不健康的主机或服务实例发送请求。
  3. 动态配置服务:提供配置统一管理功能,能够帮助我们将配置以中心化、外部化和动态化的方式管理所有环境的应用配置和服务配置。
  4. 动态DNS服务:支持动态DNS服务权重路由,能够很容易地实现中间层负载均衡、更灵活的路由策略、流量控制以及数据中心内网的简单DNS解析服务。
  5. 服务及其元数据管理:支持从微服务平台建设的视角管理数据中心的所有服务及元数据,包括管理服务的描述、生命周期、服务的静态依赖分析、服务的健康状态、服务的流量管理、路由及安全策略、服务的SLA以及最首要的metrics统计数据。

  Nacos可以与Spring、Spring Boot、Spring Cloud集成,并能代替Spring Cloud Eureka和Spring Cloud Config。通过Nacos Server和spring-cloud-starter-alibaba-nacos-config实现配置的动态变更。它提供了一个简洁易用的UI(控制台样例Demo)帮助您管理所有的服务和应用的配置。同时,它也提供了一些简单的DNS APIs TODO帮助您管理服务的关联域名和可用的IP:PORT列表。

二丶客户端注册流程

  • 在SpringCloudAlibaba这一套微服务组件中,Nacos作为注册中心向其他的微服务提供信息,业务服务通过从Nacos中拉取所需要的服务信息,再通过Ribbon在本地做负载均衡之后通过Feign组件发起接口调用。例如在电商系统中的下单服务,库存服务,支付服务等。完成一个下单过程中,下单服务需要调用库存服务减库存,调用支付服务完成支付等,而库存服务,支付服务的信息都会存储在注册中心即Nacos服务中。服务之间的调用只需要通过注册中心获取,不再需要每个服务都去存储需要调用的服务信息了,完成了解耦。
  • 客户端想要注册到注册中心去就要先引入Nacos的客户端依赖spring-cloud-starter-alibaba-nacos-discovery,并在配置文件中配上Nacos的服务地址和命名空间等信息。想要知道Nacos客户端的注册流程就得从引入的依赖入手,从maven依赖库中找到nacos的jar包,下面有一个META-INF文件夹,里面的spring.factory文件就是springboot自动装配过程中会装配的类:

     可以看到,红框标注的类就是跟服务发现自动装配相关性比较大的类文件了,直接在项目中搜索这个类可以看到以下文件:

    1. @Configuration
    2. @EnableConfigurationProperties
    3. @ConditionalOnNacosDiscoveryEnabled
    4. @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
    5. @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
    6. AutoServiceRegistrationAutoConfiguration.class })
    7. public class NacosDiscoveryAutoConfiguration {
    8. @Bean
    9. public NacosServiceRegistry nacosServiceRegistry(
    10. NacosDiscoveryProperties nacosDiscoveryProperties) {
    11. return new NacosServiceRegistry(nacosDiscoveryProperties);
    12. }
    13. @Bean
    14. @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    15. public NacosRegistration nacosRegistration(
    16. NacosDiscoveryProperties nacosDiscoveryProperties,
    17. ApplicationContext context) {
    18. return new NacosRegistration(nacosDiscoveryProperties, context);
    19. }
    20. @Bean
    21. @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    22. public NacosAutoServiceRegistration nacosAutoServiceRegistration(
    23. NacosServiceRegistry registry,
    24. AutoServiceRegistrationProperties autoServiceRegistrationProperties,
    25. NacosRegistration registration) {
    26. return new NacosAutoServiceRegistration(registry,
    27. autoServiceRegistrationProperties, registration);
    28. }
    29. }

    在这个类中有三个@Bean注解,仔细观察前两个bean都在第三个bean的参数中,所以第三个bean就是比较重要的Bean了。

  • 直接搜索NacosAutoServiceRegistry这个类可以看到其中有一个方法名为register(),通过类名和方法名大概能猜到这就是注册的主逻辑了。
    1. @Override
    2. protected void register() {
    3. if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
    4. log.debug("Registration disabled.");
    5. return;
    6. }
    7. if (this.registration.getPort() < 0) {
    8. this.registration.setPort(getPort().get());
    9. }
    10. super.register();
    11. }
  • 直接点到注册方法中去是NacosAutoServiceRegistry的抽象父类,抽象父类中有一个对象serviceRegistry,注册方法就是这个对象的register()方法,继续跟到这个register()方法中去:
    1. @Override
    2. public void register(Registration registration) {
    3. if (StringUtils.isEmpty(registration.getServiceId())) {
    4. log.warn("No service to register for nacos client...");
    5. return;
    6. }
    7. String serviceId = registration.getServiceId();
    8. Instance instance = getNacosInstanceFromRegistration(registration);
    9. try {
    10. namingService.registerInstance(serviceId, instance);
    11. log.info("nacos registry, {} {}:{} register finished", serviceId,
    12. instance.getIp(), instance.getPort());
    13. }
    14. catch (Exception e) {
    15. log.error("nacos registry, {} register failed...{},", serviceId,
    16. registration.toString(), e);
    17. }
    18. }

    可以看到通过getNacosInstanceFromRegistration()方法将Registration 转换成为了Instance 对象,后续通过namingService.registerInstance(serviceId, instance);进行了注册的动作,而这个Instance 对象其实就是我们Nacos服务端所存储的微服务相关的一些信息。

  • 继续跟namingService.registerInstance(serviceId, instance);方法:
    1. @Override
    2. public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    3. if (instance.isEphemeral()) {
    4. BeatInfo beatInfo = new BeatInfo();
    5. beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
    6. beatInfo.setIp(instance.getIp());
    7. beatInfo.setPort(instance.getPort());
    8. beatInfo.setCluster(instance.getClusterName());
    9. beatInfo.setWeight(instance.getWeight());
    10. beatInfo.setMetadata(instance.getMetadata());
    11. beatInfo.setScheduled(false);
    12. long instanceInterval = instance.getInstanceHeartBeatInterval();
    13. beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
    14. beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
    15. }
    16. serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
    17. }

    这是NamingService的实现类NacosNamingService中的实现,可以看到进行了一个if判断,这个其实是判断是否是一个临时的实例,如果是临时实例做一些处理,最后的注册请求是在serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);方法中的:

    1. public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    2. NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
    3. namespaceId, serviceName, instance);
    4. final Map<String, String> params = new HashMap<String, String>(9);
    5. params.put(CommonParams.NAMESPACE_ID, namespaceId);
    6. params.put(CommonParams.SERVICE_NAME, serviceName);
    7. params.put(CommonParams.GROUP_NAME, groupName);
    8. params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    9. params.put("ip", instance.getIp());
    10. params.put("port", String.valueOf(instance.getPort()));
    11. params.put("weight", String.valueOf(instance.getWeight()));
    12. params.put("enable", String.valueOf(instance.isEnabled()));
    13. params.put("healthy", String.valueOf(instance.isHealthy()));
    14. params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    15. params.put("metadata", JSON.toJSONString(instance.getMetadata()));
    16. reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
    17. }

    组装了一些参数最后发起了POST请求.其中UtilAndComs.NACOS_URL_INSTANCE这个常量最后拼接出来是/nacos/v1/ns/instance

  • 得出结论:客户端通过将自己服务的信息包括ip端口,命名空间,服务名等信息组装好,向Nacos服务发起POST请求注册到注册中心去。

三丶服务端存储客户端注册的服务信息

  •  在上面客户端注册流程最后我们得到了一个URI:/nacos/v1/ns/instance  通过这个URI我们可以去服务端的源码中搜索这个接口:
    1. @CanDistro
    2. @PostMapping
    3. @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    4. public String register(HttpServletRequest request) throws Exception {
    5. final String namespaceId = WebUtils
    6. .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    7. final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    8. NamingUtils.checkServiceNameFormat(serviceName);
    9. final Instance instance = parseInstance(request);
    10. serviceManager.registerInstance(namespaceId, serviceName, instance);
    11. return "ok";
    12. }

    以上是服务端的注册实例的接口,主要完成了三个动作:检查服务名信息,将请求中的参数转换为Instance实例对象,注册实例,继续跟 serviceManager.registerInstance(namespaceId, serviceName, instance)注册的方法:

    1. public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    2. createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    3. Service service = getService(namespaceId, serviceName);
    4. if (service == null) {
    5. throw new NacosException(NacosException.INVALID_PARAM,
    6. "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    7. }
    8. addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    9. }

     

    这个方法中主要动作有:创建空的Service对象,获取Service对象,注册实例等动作。

     

    1. createEmptyService(namespaceId, serviceName, instance.isEphemeral())方法:
    1. public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
    2. throws NacosException {
    3. Service service = getService(namespaceId, serviceName);
    4. if (service == null) {
    5. Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
    6. service = new Service();
    7. service.setName(serviceName);
    8. service.setNamespaceId(namespaceId);
    9. service.setGroupName(NamingUtils.getGroupName(serviceName));
    10. // now validate the service. if failed, exception will be thrown
    11. service.setLastModifiedMillis(System.currentTimeMillis());
    12. service.recalculateChecksum();
    13. if (cluster != null) {
    14. cluster.setService(service);
    15. service.getClusterMap().put(cluster.getName(), cluster);
    16. }
    17. service.validate();
    18. putServiceAndInit(service);
    19. if (!local) {
    20. addOrReplaceService(service);
    21. }
    22. }
    1.  

    先调用了getService方法:

    1. public Service getService(String namespaceId, String serviceName) {
    2. if (serviceMap.get(namespaceId) == null) {
    3. return null;
    4. }
    5. return chooseServiceMap(namespaceId).get(serviceName);
    6. }

    其中的serviceMap结构是

    1. private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

    这个map其实就是注册中心保存所有实例的map,最外层的key一般为namespace,里层的key一般为定义的group名,根据业务需要定义。

    1. public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
    2. ...private Map<String, Cluster> clusterMap = new HashMap<>();
    3. ...
    4. }

    map中的Service对象其中又有一个clusterMap,而Cluster对象的结构如下

    1. public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
    2. ...
    3. @JsonIgnore
    4. private Set<Instance> persistentInstances = new HashSet<>();
    5. @JsonIgnore
    6. private Set<Instance> ephemeralInstances = new HashSet<>();
    7. @JsonIgnore
    8. private Service service;
    9. ...
    10. }

    看到这里的两个HashSet中的Instance对象是否有些眼熟?他就是客户端注册到注册中心的服务实例信息

  • 所以整体来看,服务端存储各个微服务的结构如图所示:
  •  那么再回到上述的getService方法中,通过命名空间就可以得到同一组下面的服务,而chooseServiceMap(namespaceId).get(serviceName)又通过服务名来获取Service,其实得到的结构就是一个个的service其中还有一层Cluster;

  • 那么此时我的微服务还并未完成注册获取到的Service肯定是Null,继续往下走就会新构建一个Service,经过前面的赋值校验方法,会走到putServiceAndInit(service)方法中去:
    1. private void putServiceAndInit(Service service) throws NacosException {
    2. putService(service);
    3. service.init();
    4. consistencyService
    5. .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    6. consistencyService
    7. .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    8. Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
    9. }

    这个方法中做了两步很重要的操作,其中putService()方法:

    1. public void putService(Service service) {
    2. if (!serviceMap.containsKey(service.getNamespaceId())) {
    3. synchronized (putServiceLock) {
    4. if (!serviceMap.containsKey(service.getNamespaceId())) {
    5. serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
    6. }
    7. }
    8. }
    9. serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
    10. }

    就是将我们的service放到serviceMap中,源码中就用到了双检锁。

  • 而在service.init()方法中:
    1. public void init() {
    2. HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    3. for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
    4. entry.getValue().setService(this);
    5. entry.getValue().init();
    6. }
    7. }

    第一行的 HealthCheckReactor.scheduleCheck(clientBeatCheckTask);看类名是健康检查相关的,那么可以想到注册实例肯定要把自己的健康信息更新到注册中心去,再看参数:clientBeatCheckTask 服务心跳检查任务,点到这个类中:

    1. public class ClientBeatCheckTask implements Runnable {
    2. private Service service;
    3. @Override
    4. public void run() {
    5. try {
    6. if (!getDistroMapper().responsible(service.getName())) {
    7. return;
    8. }
    9. if (!getSwitchDomain().isHealthCheckEnabled()) {
    10. return;
    11. }
    12. List<Instance> instances = service.allIPs(true);
    13. // first set health status of instances:
    14. for (Instance instance : instances) {
    15. if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
    16. if (!instance.isMarked()) {
    17. if (instance.isHealthy()) {
    18. instance.setHealthy(false);
    19. Loggers.EVT_LOG
    20. .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
    21. instance.getIp(), instance.getPort(), instance.getClusterName(),
    22. service.getName(), UtilsAndCommons.LOCALHOST_SITE,
    23. instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
    24. getPushService().serviceChanged(service);
    25. ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
    26. }
    27. }
    28. }
    29. }
    30. if (!getGlobalConfig().isExpireInstance()) {
    31. return;
    32. }
    33. // then remove obsolete instances:
    34. for (Instance instance : instances) {
    35. if (instance.isMarked()) {
    36. continue;
    37. }
    38. if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
    39. // delete instance
    40. Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
    41. JacksonUtils.toJson(instance));
    42. deleteIp(instance);
    43. }
    44. }
    45. } catch (Exception e) {
    46. Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
    47. }
    48. }
    49. }

    实现了Runnable接口说明是一个线程,直接看run方法

    1. List<Instance> instances = service.allIPs(true);
    1. public List<Instance> allIPs(boolean ephemeral) {
    2. List<Instance> result = new ArrayList<>();
    3. for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
    4. result.addAll(entry.getValue().allIPs(ephemeral));
    5. }
    6. return result;
    7. }
    1. public List<Instance> allIPs(boolean ephemeral) {
    2. return ephemeral ? new ArrayList<>(ephemeralInstances) : new ArrayList<>(persistentInstances);
    3. }

    获取了service中的所有Instance,然后做一系列的心跳检查,发布事件等。到此就做完了实例初始化的动作。

  • 上述是createEmptyService(namespaceId, serviceName, instance.isEphemeral())方法:
  • 那么接下来
    1. Service service = getService(namespaceId, serviceName);

    肯定可以获取到service,直接看最后的addInstance(namespaceId, serviceName, instance.isEphemeral(), instance)方法:

    1. public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
    2. throws NacosException {
    3. String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    4. Service service = getService(namespaceId, serviceName);
    5. synchronized (service) {
    6. List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
    7. Instances instances = new Instances();
    8. instances.setInstanceList(instanceList);
    9. consistencyService.put(key, instances);
    10. }
    11. }
    1. String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral)这个方法通过nameSpaceId,服务名信息组建了一个key:
    1. public static String buildInstanceListKey(String namespaceId, String serviceName, boolean ephemeral) {
    2. return ephemeral ? buildEphemeralInstanceListKey(namespaceId, serviceName)
    3. : buildPersistentInstanceListKey(namespaceId, serviceName);
    4. }
    1. 可以看到是根据ephemeral来判断的,这个值是控制是否临时实例的,Instance中的默认值是true表示默认新建的就是临时实例,那么构建出来的字符串:"com.alibaba.nacos.naming.iplist.ephemeral."+ namespaceId +"##"+serviceName
    1. 中间是有ephemeral.的;下面又将初始化了的Instance获取到加锁执行注册逻辑:
    1. public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
    2. throws NacosException {
    3. Datum datum = consistencyService
    4. .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
    5. List<Instance> currentIPs = service.allIPs(ephemeral);
    6. Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
    7. Set<String> currentInstanceIds = Sets.newHashSet();
    8. for (Instance instance : currentIPs) {
    9. currentInstances.put(instance.toIpAddr(), instance);
    10. currentInstanceIds.add(instance.getInstanceId());
    11. }
    12. Map<String, Instance> instanceMap;
    13. if (datum != null && null != datum.value) {
    14. instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
    15. } else {
    16. instanceMap = new HashMap<>(ips.length);
    17. }
    18. for (Instance instance : ips) {
    19. if (!service.getClusterMap().containsKey(instance.getClusterName())) {
    20. Cluster cluster = new Cluster(instance.getClusterName(), service);
    21. cluster.init();
    22. service.getClusterMap().put(instance.getClusterName(), cluster);
    23. Loggers.SRV_LOG
    24. .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
    25. instance.getClusterName(), instance.toJson());
    26. }
    27. if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
    28. instanceMap.remove(instance.getDatumKey());
    29. } else {
    30. Instance oldInstance = instanceMap.get(instance.getDatumKey());
    31. if (oldInstance != null) {
    32. instance.setInstanceId(oldInstance.getInstanceId());
    33. } else {
    34. instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
    35. }
    36. instanceMap.put(instance.getDatumKey(), instance);
    37. }
    38. }
    39. if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
    40. throw new IllegalArgumentException(
    41. "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
    42. .toJson(instanceMap.values()));
    43. }
    44. return new ArrayList<>(instanceMap.values());
    45. }

    这个方法返回值是一个List<Instance>直接看返回值是instanceMap.values(),这个map的来源是通过第一个

    1. Datum datum = consistencyService .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral))

     

     

    的结果判断的,流程是先判断这个实例是否已经注册到注册中心了,没有的话新增一个instanceMap,有将其中的旧元素都放到新的instanceMap中,再对传过来的instance做一系列的检查注册操作,返回现有的instanceMap中的元素集合。

  • 通过返回的List<Instance>构建一个Instances对象,这个对象里面的结构:
    1. public class Instances implements Record {
    2. private static final long serialVersionUID = 5500823673993740145L;
    3. private List<Instance> instanceList = new ArrayList<>();
      }

    最后一步consistencyService.put(key, instances)方法:

    1. public void put(String key, Record value) throws NacosException {
    2. onPut(key, value);
    3. distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
    4. globalConfig.getTaskDispatchPeriod() / 2);
    5. }

    onPut方法就是注册的核心逻辑了:

    1. public void onPut(String key, Record value) {
    2. if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
    3. Datum<Instances> datum = new Datum<>();
    4. datum.value = (Instances) value;
    5. datum.key = key;
    6. datum.timestamp.incrementAndGet();
    7. dataStore.put(key, datum);
    8. }
    9. if (!listeners.containsKey(key)) {
    10. return;
    11. }
    12. notifier.addTask(key, DataOperation.CHANGE);
    13. }

    其中notifier的结构:

    1. public class Notifier implements Runnable {
    2. private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
    3. private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
    4. }

    说明又是一个线程类,成员变量有一个tasks的阻塞队列。

    1. public void addTask(String datumKey, DataOperation action) {
    2. if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
    3. return;
    4. }
    5. if (action == DataOperation.CHANGE) {
    6. services.put(datumKey, StringUtils.EMPTY);
    7. }
    8. tasks.offer(Pair.with(datumKey, action));
    9. }

    通过观察addTask方法可以看出向上面提到的阻塞队列tasks中添加了一个对象:

    1. public static <A, B> Pair<A, B> with(A value0, B value1) {
    2. return new Pair(value0, value1);
    3. }

    这里的Pair.with()可以不用管,只需要知道返回的一个Pair对象包含了前面生成的key和action,action代表了操作类型是新增或者编辑删除之类的。

          那么这个阻塞队列里面就包含了我们的服务信息,服务名,nameSpaceId,是否是临时实例等。再回去看notifier的run方法:

  1. public void run() {
  2. Loggers.DISTRO.info("distro notifier started");
  3. for (; ; ) {
  4. try {
  5. Pair<String, DataOperation> pair = tasks.take();
  6. handle(pair);
  7. } catch (Throwable e) {
  8. Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
  9. }
  10. }
  11. }

  可以看出是一个线程从阻塞队列中循环拿出实例的信息到后续的handle方法:

  1. private void handle(Pair<String, DataOperation> pair) {
  2. try {
  3. String datumKey = pair.getValue0();
  4. DataOperation action = pair.getValue1();
  5. services.remove(datumKey);
  6. int count = 0;
  7. if (!listeners.containsKey(datumKey)) {
  8. return;
  9. }
  10. for (RecordListener listener : listeners.get(datumKey)) {
  11. count++;
  12. try {
  13. if (action == DataOperation.CHANGE) {
  14. listener.onChange(datumKey, dataStore.get(datumKey).value);
  15. continue;
  16. }
  17. if (action == DataOperation.DELETE) {
  18. listener.onDelete(datumKey);
  19. continue;
  20. }
  21. } catch (Throwable e) {
  22. Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
  23. }
  24. }
  25. if (Loggers.DISTRO.isDebugEnabled()) {
  26. Loggers.DISTRO
  27. .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
  28. datumKey, count, action.name());
  29. }
  30. } catch (Throwable e) {
  31. Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
  32. }
  33. }

 

  1. @Override
  2. public void onChange(String key, Instances value) throws Exception {
  3. Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
  4. for (Instance instance : value.getInstanceList()) {
  5. if (instance == null) {
  6. // Reject this abnormal instance list:
  7. throw new RuntimeException("got null instance " + key);
  8. }
  9. if (instance.getWeight() > 10000.0D) {
  10. instance.setWeight(10000.0D);
  11. }
  12. if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
  13. instance.setWeight(0.01D);
  14. }
  15. }
  16. updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
  17. recalculateChecksum();
  18. }

在这里拿出所有的Instance实例对象进行权重默认值的设置,之后 updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key))方法:

  1. public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
  2. Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
  3. for (String clusterName : clusterMap.keySet()) {
  4. ipMap.put(clusterName, new ArrayList<>());
  5. }
  6. for (Instance instance : instances) {
  7. try {
  8. if (instance == null) {
  9. Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
  10. continue;
  11. }
  12. if (StringUtils.isEmpty(instance.getClusterName())) {
  13. instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
  14. }
  15. if (!clusterMap.containsKey(instance.getClusterName())) {
  16. Loggers.SRV_LOG
  17. .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
  18. instance.getClusterName(), instance.toJson());
  19. Cluster cluster = new Cluster(instance.getClusterName(), this);
  20. cluster.init();
  21. getClusterMap().put(instance.getClusterName(), cluster);
  22. }
  23. List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
  24. if (clusterIPs == null) {
  25. clusterIPs = new LinkedList<>();
  26. ipMap.put(instance.getClusterName(), clusterIPs);
  27. }
  28. clusterIPs.add(instance);
  29. } catch (Exception e) {
  30. Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
  31. }
  32. }
  33. for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
  34. //make every ip mine
  35. List<Instance> entryIPs = entry.getValue();
  36. clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
  37. }
  38. setLastModifiedMillis(System.currentTimeMillis());
  39. getPushService().serviceChanged(this);
  40. StringBuilder stringBuilder = new StringBuilder();
  41. for (Instance instance : allIPs()) {
  42. stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
  43. }
  44. Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
  45. stringBuilder.toString());
  46. }

 

其中将Instance实例放入Map中的逻辑:

  1. if (!clusterMap.containsKey(instance.getClusterName())) {
  2. Loggers.SRV_LOG
  3. .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
  4. instance.getClusterName(), instance.toJson());
  5. Cluster cluster = new Cluster(instance.getClusterName(), this);
  6. cluster.init();
  7. getClusterMap().put(instance.getClusterName(), cluster);
  8. }

可以得出结论:Nacos中有一个线程从阻塞队列notifier中拿出新注册的Instance初始化处理之后根据是否是临时实例放到对应的HashSet中,即保存了新注册的实例信息。而每次注册的结果就是将注册信息放入到阻塞队列中去;这种异步处理的方式使得Nacos的TPS可以达到1w3+,因为其保存的服务信息对实时性要求并不高,这种场景下使用异步处理是合适的。而Nacos在更新实例信息时采用了写时复制的思想,保证了服务上下线修改Map时的效率。由于其写时复制复制的其实是Cluster中的内容,不是复制整个Map,所以它的效率也是很高的。

附一张图灵学院诸葛老师总结的流程图:

 

原文链接:https://www.cnblogs.com/ybug/p/17787741.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号