«

Nacos源码学习计划-Day22-Nacos2.x-服务变动如何通知订阅客户端

ZealSinger 发布于 阅读:91 技术文档


服务变更如何通知订阅服务的客户端

我们在Nacos源码学习计划-Day20-Nacos2.x-服务端处理客户端gRPC注册请求 的最后面有看到,服务端处理gRPC的注册请求的最后,即addPublisherIndexes()的最后一行,可以看到是发布了一个ServiceChangedEvent服务变更的事件

private void addPublisherIndexes(Service service, String clientId) {
   // 把 clientId 写入注册表
   publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
   publisherIndexes.get(service).add(clientId);
   // 发布服务改变事件
   NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}

今天就围绕这个来看看,服务变更后是如何通知对应的订阅的客户端的

初看ServiceChangedEvent

老规矩,对于Event事件处理,优先需要找到对应的handler处理器,还是利用IDEA的全局搜索能轻松找到

image-20251224142710962

看到这个地方,大家可以发现,这个if部分的逻辑其实就是将对应的ServiceChangedEvent封装成为PushDelayTask类型,然后调用delayTaskEngine.addTask()方法,将其放入到任务队列中去执行

如果有阅读了上一节内容的UU就会发现,在这里Nacos使用这个的 delayTaskEngine.addTask(),在上一节的最后,也就是ClientSubscribeServiceEvent客户端订阅服务事件的处理中也是利用这个,这个其实是Nacos2.X中很重要的延迟任务处理引擎

所以接下来,我们来好好分析一下这个玩意儿

PushDelayTaskExecuteEngine

延迟任务执行引擎,顾名思义是执行延迟任务,可以往执行引擎中添加任务,然后该任务会被延时执行。

这个delayTaskEngine的数据类型是PushDelayTaskExecuteEngine,我们可以先来查看一下他的相关的继承和实现关系

image-20251224144945653

既然我们知道了最终会将任务放入到一个任务队列中,那么我们尝试找找从队列中取任务的逻辑

但是实际搜索会发现,好像并没有这种一个一个拿的相关逻辑

image-20251224155809825

这个时候我们再回来看看整个PushDelayTaskExecuteEngine的代码,通过方法命名和整个组织的结构来看,可以猜测到,整个逻辑任务的实现可能依靠的是processTasks()这个方法,但是整个链路逻辑还是不是很明了,从代码来看,我们需要网上再去看其父类是如何实现的

image-20251224160527597

我们来看其父类NacosDelayTaskExecuteEngine的逻辑

可以从父类的构造方法中看到,构建的时候会利用内部的线程池成员开启ProcessRunnable 线程任务,而这个线程任务的底层逻辑就是执行processTasks()方法

image-20251224163346630

这个方法在子类中是有重写的,但是我们去看子类中的实现,就是加了一定的判断后再调用父类中的processTasks方法,所以我们直接来看父类中的实现即可

可以看到,首先从tasks中获取到所有的任务key进行遍历,通过 taskKey 获取到具体的任务之后,再通过 taskKey 获取对应的处理器,最终调用处理器(通过getProcessor()方法获取)的process()处理方法

getProcessor()的逻辑可以看到,如果给对应的key即特定的task设置了特定的processor处理器,则会保存在taskProcessors这个Map中,如果没有则使用默认的处理器,在顶级抽象接口中为NacosTaskProcessor,而PushDelayTaskExecuteEngine的构造方法中调用了set方法将这个处理器设置为了PushDelayTaskProcessor


// 任务池
protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;

// 这个不是NacosDelayTaskExecuteEngine的,而是来自顶级抽象类AbstractNacosTaskExecuteEngine 存储了特定的task-processor处理器的映射关系
private final ConcurrentHashMap<Object, NacosTaskProcessor> taskProcessors = new ConcurrentHashMap<>();

/**
* process tasks in execute engine.
*/
protected void processTasks() {
   // 获取全部的任务,进行遍历
   Collection<Object> keys = getAllTaskKeys();
   for (Object taskKey : keys) {
       
       // 通过任务 key,获取具体的任务,并且从任务池中移除掉
       AbstractDelayTask task = removeTask(taskKey);
       if (null == task) {
           continue;
      }
       // 通过任务 key 获取对应的处理器
       NacosTaskProcessor processor = getProcessor(taskKey);
       if (null == processor) {
           getEngineLog().error("processor not found for task, so discarded. " + task);
           continue;
      }
       try {
           // 调用处理器的处理方法
           if (!processor.process(task)) {
               retryFailedTask(taskKey, task);
          }
      } catch (Throwable e) {
           getEngineLog().error("Nacos task execute error ", e);
           retryFailedTask(taskKey, task);
      }
  }
}


@Override
public NacosTaskProcessor getProcessor(Object key) {
 // 如果处理器中没有对应的 key,那么就返回默认的任务处理器
 return taskProcessors.containsKey(key) ? taskProcessors.get(key) : defaultTaskProcessor;
}

而我们整个任务的存储结构tasks这个map中的元素,就是来自于我们上面类似ServiceChangedEvent的各种事件处理代码中,会通过addTask()方法放入到tasks中从而完成任务的放入

@Override
public void addTask(Object key, AbstractDelayTask newTask) {
   lock.lock();
   try {
       // 往任务池添加任务
       AbstractDelayTask existTask = tasks.get(key);
       if (null != existTask) {
           newTask.merge(existTask);
      }
       tasks.put(key, newTask);
  } finally {
       lock.unlock();
  }
}

再看ServiceChangedEvent

OK,分析完了这个延迟执行引擎,我们继续回到ServiceChangedEvent的处理逻辑中来

@Override
public void onEvent(Event event) {
   if (!upgradeJudgement.isUseGrpcFeatures()) {
       return;
  }
   if (event instanceof ServiceEvent.ServiceChangedEvent) {
       // 如果服务变动,会通知该 service 所有的订阅者,更新本地缓存
       ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
       Service service = serviceChangedEvent.getService();
       delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
  } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
       // 如果服务被一个客户端订阅,则只推送该客户端
       ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
       Service service = subscribedEvent.getService();
       delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
               subscribedEvent.getClientId()));
  }
}

我们知道了,这个入参event会被封装为一个Service对象,并且作为PushDelayTask最终被delayTaskEngine中的defaultTaskProcessorprocess()方法处理

因为没有触发addProcessor()方法,所以不会针对这个event设置特定的processor,所以这里肯定是直接使用defaultTaskProcessor

因为delayTaskEngine中的defaultTaskProcessor的类型是PushDelayTaskProcessor,所以我们直接来看这个处理器的process()方法

@Override
public boolean process(NacosTask task) {
   // 任务类型转换
   PushDelayTask pushDelayTask = (PushDelayTask) task;
   Service service = pushDelayTask.getService();
   // 提交 PushExecuteTask 线程任务
   NamingExecuteTaskDispatcher.getInstance()
          .dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
   return true;
}

可以看到,这个最初的入参event转换成了PushDelayTask后又被转化为了PushExecuteTask,这个就是我们熟知的任务类型了,那么接下来我们可以直接去看他对应的run()方法

@Override
public void run() {
   try {
       // 从注册表获取当前 service 最新的实例列表数据
       PushDataWrapper wrapper = generatePushData();
       ClientManager clientManager = delayTaskEngine.getClientManager();
       // 获取客户端id,也就是 rpc 客户端连接 id
       for (String each : getTargetClientIds()) {
           // 获取客户端,通知服务变动
           Client client = clientManager.getClient(each);
           if (null == client) {
               // means this client has disconnect
               continue;
          }
           // 获取客户端的订阅者
           Subscriber subscriber = client.getSubscriber(service);
           // 回调客户端
           delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
                   new NamingPushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
      }
  } catch (Exception e) {
       Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
       delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
  }
}

// 通过Service的信息(命名空间,group,cluster集群信息等等)和元数据(是否临时实例,保护阈值,路由选择器等等)得到PushDataWrapper
private PushDataWrapper generatePushData() {
   ServiceInfo serviceInfo = delayTaskEngine.getServiceStorage().getPushData(service);
   ServiceMetadata serviceMetadata = delayTaskEngine.getMetadataManager().getServiceMetadata(service).orElse(null);
   return new PushDataWrapper(serviceMetadata, serviceInfo);
}

private Collection<String> getTargetClientIds() {
   // 通过 pushToAll 这个参数来控制是否推送给全部的 client
   // 如果为 true 推送 service 全部的 client,如果为 false,需要指定 targetClients 参数,只推送该参数里面的 client
   return delayTask.isPushToAll() ? delayTaskEngine.getIndexesManager().getAllClientsSubscribeService(service)
          : delayTask.getTargetClients();
}

public Collection<String> getAllClientsSubscribeService(Service service) {
   // 从订阅表获取 clientId
   return subscriberIndexes.containsKey(service) ? subscriberIndexes.get(service) : new ConcurrentHashSet<>();
}

这里补充一下,对于isPushToAll()底层就是判断成员pushToAll为true还是false,这个成员的数值取决于任务类型,我们知道在最外层的时候,会通过switch-case区分event的类型从而包装成不同类型的task,虽然都是包装成为PushDelayTask,但是用到的构造方法不同,所以在初始化的时候这个pushToAll的值就不一样


private Set<String> targetClients;

// 服务变动事件,所使用的构造方法,可以看到pushToAll设置为true,就是要通知给所有的订阅了该服务的客户端
public PushDelayTask(Service service, long delay) {
   this.service = service;
   pushToAll = true;
   targetClients = null;
   setTaskInterval(delay);
   setLastProcessTime(System.currentTimeMillis());
}

// 服务订阅事件使用的构造方法
public PushDelayTask(Service service, long delay, String targetClient) {
   this.service = service;
   this.pushToAll = false;
   this.targetClients = new HashSet<>(1);
   // 把 clientId 添加到 targetClients 当中,这个 clientId 就是发起服务订阅的客户端ID
   this.targetClients.add(targetClient);
   setTaskInterval(delay);
   setLastProcessTime(System.currentTimeMillis());
}

我们回到run中的逻辑,在最后通过doPushWithCallback这个方式发送消息给对应的client,这个方法是来自于顶层接口,有三个实现类,因为我们之前聊到过Nacos2.X都是用的Rpc的形式了,所以直接去看Rpc版的实现

image-20251226163801378

@Override
public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data, PushCallBack callBack) {
   // 这里构建的是 NotifySubscriberRequest 类型请求参数
   pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildNotifySubscriberRequest(getServiceInfo(data, subscriber)),
           callBack, GlobalExecutor.getCallbackExecutor());
}

可以看到,这里发送的请求是NotifySubscriberRequest,那么接下来我们可以去客户端查找这个请求的处理器

public class NamingPushRequestHandler implements ServerRequestHandler {
   
   private final ServiceInfoHolder serviceInfoHolder;
   
   public NamingPushRequestHandler(ServiceInfoHolder serviceInfoHolder) {
       this.serviceInfoHolder = serviceInfoHolder;
  }
   
   @Override
   public Response requestReply(Request request) {
       if (request instanceof NotifySubscriberRequest) {
           // 类型转换
           NotifySubscriberRequest notifyResponse = (NotifySubscriberRequest) request;
           // 更新本地缓存数据
           serviceInfoHolder.processServiceInfo(notifyResponse.getServiceInfo());
           return new NotifySubscriberResponse();
      }
       return null;
  }
}

serviceInfoHolder.processServiceInfo 更新本地缓存的方法,我们在上一篇文章也见过了,也就是更新 serviceInfoMap 中的数据即可。

总结

  1. 事件触发:服务端处理完服务注册等操作后,会发布ServiceChangedEvent(服务变更事件)或ServiceSubscribedEvent(服务订阅事件)。

  2. 任务封装:事件处理器将事件封装为PushDelayTask,通过PushDelayTaskExecuteEngine(延迟任务执行引擎)加入任务队列。其中,服务变更事件对应的任务会设置pushToAll=true(通知所有订阅客户端),服务订阅事件则pushToAll=false且后续操作指定目标客户端 ID。

  3. 延迟任务执行PushDelayTaskExecuteEngine基于父类NacosDelayTaskExecuteEngine的线程池,通过processTasks()方法轮询处理任务,调用默认处理器PushDelayTaskProcessorprocess()方法。

  4. 推送任务执行PushDelayTask转换为PushExecuteTask,该任务会获取服务最新实例数据(ServiceInfo)和订阅客户端列表,通过 gRPC 向目标客户端发送NotifySubscriberRequest请求。

  5. 客户端处理:客户端通过NamingPushRequestHandler接收请求,调用serviceInfoHolder.processServiceInfo()更新本地缓存,完成服务变更的同步。

整个流程通过事件驱动、延迟任务调度和 gRPC 通信,实现了服务端与客户端的高效数据同步。

编程 Java 项目