Flink设计与实现:核心原理与源码解析
上QQ阅读APP看书,第一时间看更新

3.2.1 集群组件的创建和启动

如图3-4所示,DefaultDispatcherResourceManagerComponentFactory作为DispatcherResourceManagerComponentFactory接口的默认实现类,其内部包含了ResourceManagerFactory、DispatcherRunnerFactory和RestEndpointFactory三个主要的成员变量,分别提供了ResourceManager、DispatcherRunner以及WebMonitorEndpoint组件的创建方法。

所有的ClusterEntrypoint子类都会实现createDispatcherResourceManagerComponentFac-tory()抽象方法,以提供创建相应集群组件服务的能力。例如在StandaloneSessionClusterEntrypoint中,通过实现createDispatcherResourceManagerComponentFactory()抽象方法创建StandaloneSession集群中的组件和服务。

图3-4 DispatcherResourceManagerComponentFactory UML类图

如代码清单3-4所示,在StandaloneSessionClusterEntrypoint中会将StandaloneResourceManagerFactory实例作为参数传递给DefaultDispatcherResourceManagerComponentFactory。从这里也可以看出,不同类型的集群最主要的区别就是ResourceManager的实现类不同。

代码清单3-4 StandaloneSessionClusterEntrypoint.createDispatcherResourceManager-ComponentFactory()方法


protected DefaultDispatcherResourceManagerComponentFactory createDispatcher
   ResourceManagerComponentFactory(Configuration configuration) {
   return DefaultDispatcherResourceManagerComponentFactory.createSession
      ComponentFactory(StandaloneResourceManagerFactory.INSTANCE);
}

在DefaultDispatcherResourceManagerComponentFactory中会提供SessionComponentFactory和JobComponentFactory两种静态创建方法,用来创建不同类型集群组件。

如代码清单3-5所示,在createSessionComponentFactory()方法中,含有用于创建Session类型集群Dispatcher组件的SessionDispatcherFactory、ResourceManager组件对应的resourceManagerFactory以及创建restEndpointFactory对应的SessionRestEndpointFactory。最终将这些工厂实现类封装在DefaultDispatcherResourceManagerComponentFactory对象中,用于创建集群中对应的组件。

代码清单3-5 DefaultDispatcherResourceManagerComponentFactory.createSession-ComponentFactory()方法


public static DefaultDispatcherResourceManagerComponentFactory 
   createSessionComponentFactory(
      ResourceManagerFactory<?> resourceManagerFactory) {
   return new DefaultDispatcherResourceManagerComponentFactory(
      DefaultDispatcherRunnerFactory.createSessionRunner(SessionDispatcherFact
         ory.INSTANCE),
      resourceManagerFactory,
      SessionRestEndpointFactory.INSTANCE);
}

对于JobCluster来讲也是如此,如代码清单3-6所示,在DefaultDispatcherResourceManagerComponentFactory.createJobComponentFactory()方法中,包含了创建Per-Job类型集群的服务创建工厂,包括DefaultDispatcherRunnerFactory、ResourceManagerFactory和JobRestEndpointFactory实例等。

代码清单3-6 DefaultDispatcherResourceManagerComponentFactory.createJob-ComponentFactory()方法定义


public static DefaultDispatcherResourceManagerComponentFactory 
   createJobComponentFactory(
      ResourceManagerFactory<?> resourceManagerFactory,
      JobGraphRetriever jobGraphRetriever) {
   return new DefaultDispatcherResourceManagerComponentFactory(
      DefaultDispatcherRunnerFactory.createJobRunner(jobGraphRetriever),
      resourceManagerFactory,
      JobRestEndpointFactory.INSTANCE);
}

创建DefaultDispatcherResourceManagerComponentFactory完毕后,接下来调用DefaultDispatcherResourceManagerComponentFactory.create()方法创建运行时中的各个组件。create()方法的逻辑比较多,为了方便介绍,我们对方法进行拆解,对每个核心步骤对应的代码逻辑进行说明。

1.获取集群HA Leader恢复服务及创建组件Gateway

在create()方法中,先从highAvailabilityServices中获取dispatcherLeaderRetrievalService和resourceManagerRetrievalService,分别用于创建dispatcherGatewayRetriever和resourceManagerGatewayRetriever。这里的GatewayRetriever组件用于获取指定集群组件Gateway当前活跃的Leader地址,避免因为集群RPC组件服务宕机,Gateway发生切换而导致其他组件服务无法正常通信的问题,如代码清单3-7所示。

代码清单3-7 DefaultDispatcherResourceManagerComponentFactory.create()方法部分逻辑


// 获取dispatcherLeaderRetrievalService
dispatcherLeaderRetrievalService = highAvailabilityServices.
   getDispatcherLeaderRetriever();
// 获取resourceManagerRetrievalService
resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerL
   eaderRetriever();
// 创建dispatcherGatewayRetriever
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = 
   new RpcGatewayRetriever<>(
   rpcService,
   DispatcherGateway.class,
   DispatcherId::fromUuid,
   10,
   Time.milliseconds(50L));
// 创建resourceManagerGatewayRetriever
final LeaderGatewayRetriever<ResourceManagerGateway> 
   resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
   rpcService,
   ResourceManagerGateway.class,
   ResourceManagerId::fromUuid,
   10,
   Time.milliseconds(50L));

2.创建和启动WebMonitorEndpoint

下一步是创建WebMonitorEndpoint组件。WebMonitorEndpoint是前端页面访问Web后端服务接口的提供者,通过WebMonitorEndpoint提供的Restful接口可以查看集群整体监控信息、获取任务执行状态。

如代码清单3-8所示,WebMonitorEndpoint的创建过程主要包含以下步骤。

·调用WebMonitorEndpoint.createExecutorService()方法创建ExecutorService,用于处理Web请求服务时的多线程服务。

·获取metrics指标拉取的updateInterval配置,也就是Web页面获取一次监控指标的间隔时间。

·根据updateInterval参数创建MetricFetcher,用于获取Metric指标。

·调用RestEndpointFactory.createRestEndpoint()方法创建webMonitorEndpoint。

·启动创建好的webMonitorEndpoint组件。

代码清单3-8 DefaultDispatcherResourceManagerComponentFactory.create()方法部分逻辑


// 调用WebMonitorEndpoint.createExecutorService()方法创建ExecutorService
final ExecutorService executor = WebMonitorEndpoint.createExecutorService(
   configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
   configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
   "DispatcherRestEndpoint");
// 获取updateInterval参数
final long updateInterval = configuration.getLong(MetricOptions.METRIC_
   FETCHER_UPDATE_INTERVAL);
// 创建MetricFetcher
final MetricFetcher metricFetcher = updateInterval == 0
   ? VoidMetricFetcher.INSTANCE
   : MetricFetcherImpl.fromConfiguration(
      configuration,
      metricQueryServiceRetriever,
      dispatcherGatewayRetriever,
      executor);
// 调用RestEndpointFactory.createRestEndpoint()方法创建webMonitorEndpoint
webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
   configuration,
   dispatcherGatewayRetriever,
   resourceManagerGatewayRetriever,
   blobServer,
   executor,
   metricFetcher,
   highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
   fatalErrorHandler);
 // 启动创建好的webMonitorEndpoint
log.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();

3.创建和启动DispatcherRunner

DispatcherRunner是专门用于启动Dispatcher集群核心组件的驱动类。换句话讲,Dispatcher组件是借助DispatcherRunner启动和运行的。

如代码清单3-9所示,创建DispatcherRunner主要包括如下步骤。

·创建HistoryServerArchivist,用于在History Server上对指定的AccessExecutionGraph进行历史归档。

·创建PartialDispatcherServices,用于提供Dispatcher组件使用的一部分服务,包括高可用、blobServer等。之所以叫作PartialDispatcherServices,是因为Dispatcher还有其他服务会在后续执行过程中启动。

·调用dispatcherRunnerFactory.createDispatcherRunner()方法创建DispatcherRunner对象。创建参数,包括前面创建的所有参数信息,而DispatcherRunner会在后面被leaderElectionService服务启动和执行。

代码清单3-9 DefaultDispatcherResourceManagerComponentFactory.create()方法部分逻辑


// 获取HistoryServerArchivist
final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.
   createHistoryServerArchivist(configuration, webMonitorEndpoint);
// 创建PartialDispatcherServices
final PartialDispatcherServices partialDispatcherServices = new 
   PartialDispatcherServices(
      configuration,
      highAvailabilityServices,
      resourceManagerGatewayRetriever,
      blobServer,
      heartbeatServices,
         () -> MetricUtils.instantiateJobManagerMetricGroup(metricRegis try, 
         hostname),
      archivedExecutionGraphStore,
      fatalErrorHandler,
      historyServerArchivist,
      metricRegistry.getMetricQueryServiceGatewayRpcAddress());
// 创建并启动Dispatcher
log.debug("Starting Dispatcher.");
dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
   highAvailabilityServices.getDispatcherLeaderElectionService(),
   fatalErrorHandler,
   new HaServicesJobGraphStoreFactory(highAvailabilityServices),
   ioExecutor,
   rpcService,
   partialDispatcherServices);

4.创建和启动ResourceManager

下面介绍如何创建ResourceManager集群核心组件。如代码清单3-10所示,创建Resource-Manager组件的主要步骤如下。

·创建ResourceManagerMetricGroup,用于采集ResourceManager相关的监控指标。

·调用resourceManagerFactory.createResourceManager()方法创建resourceManager组件。

·启动所创建的resourceManager组件。

代码清单3-10 DefaultDispatcherResourceManagerComponentFactory.create()方法部分逻辑


// 创建ResourceManagerMetricGroup,用于采集ResourceManager相关的监控指标
resourceManagerMetricGroup = ResourceManagerMetricGroup.create(metricRegistry, 
   hostname);
// 调用resourceManagerFactory.createResourceManager()方法创建resourceManager组件
resourceManager = resourceManagerFactory.createResourceManager(
   configuration,
   ResourceID.generate(),
   rpcService,
   highAvailabilityServices,
   heartbeatServices,
   fatalErrorHandler,
   new ClusterInformation(hostname, blobServer.getPort()),
   webMonitorEndpoint.getRestBaseUrl(),
   resourceManagerMetricGroup);
   // 此处省略了DispatcherRunner的创建逻辑,请读者参考源码
   log.debug("Starting ResourceManager.");
   //启动创建的resourceManager
      resourceManager.start();

5.使用高可用服务启动组件Gateway

接下来通过高可用服务启动ResourceManager和Dispatcher对应的GatewayRetriever服务。我们知道,GatewayRetriever主要用于获取指定服务组件Leader对应的RpcGateway地址,保证系统的高可用,如代码清单3-11所示。

代码清单3-11 DefaultDispatcherResourceManagerComponentFactory.create()方法部分逻辑


resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);

6.返回DispatcherResourceManagerComponent对象

当所有的组件都创建完毕并启动后,就会通过DispatcherResourceManagerComponent对象对服务和组件进行封装,然后返回到ClusterEntrypoint中。至此,集群管理节点的所有主要组件就完成了创建和初始化步骤,用户可以通过浏览器访问并启动Flink集群UI页面,但因为集群还没有启动TaskManager,所以暂时还不能向启动的集群提交作业,如代码清单3-12所示。

代码清单3-12 返回封装好服务和组件的DispatcherResourceManagerComponent


return new DispatcherResourceManagerComponent(
   dispatcherRunner,
   resourceManager,
   dispatcherLeaderRetrievalService,
   resourceManagerRetrievalService,
   webMonitorEndpoint);

以上整体介绍了集群运行时中核心组件的创建过程,接下来我们详细介绍每个核心组件的创建过程。