3.2.1 集群组件的创建和启动
如图3-4所示,DefaultDispatcherResourceManagerComponentFactory作为DispatcherResourceManagerComponentFactory接口的默认实现类,其内部包含了ResourceManagerFactory、DispatcherRunnerFactory和RestEndpointFactory三个主要的成员变量,分别提供了ResourceManager、DispatcherRunner以及WebMonitorEndpoint组件的创建方法。
所有的ClusterEntrypoint子类都会实现createDispatcherResourceManagerComponentFac-tory()抽象方法,以提供创建相应集群组件服务的能力。例如在StandaloneSessionClusterEntrypoint中,通过实现createDispatcherResourceManagerComponentFactory()抽象方法创建StandaloneSession集群中的组件和服务。
图3-4 DispatcherResourceManagerComponentFactory UML类图
如代码清单3-4所示,在StandaloneSessionClusterEntrypoint中会将StandaloneResourceManagerFactory实例作为参数传递给DefaultDispatcherResourceManagerComponentFactory。从这里也可以看出,不同类型的集群最主要的区别就是ResourceManager的实现类不同。
代码清单3-4 StandaloneSessionClusterEntrypoint.createDispatcherResourceManager-ComponentFactory()方法
protected DefaultDispatcherResourceManagerComponentFactory createDispatcher ResourceManagerComponentFactory(Configuration configuration) { return DefaultDispatcherResourceManagerComponentFactory.createSession ComponentFactory(StandaloneResourceManagerFactory.INSTANCE); }
在DefaultDispatcherResourceManagerComponentFactory中会提供SessionComponentFactory和JobComponentFactory两种静态创建方法,用来创建不同类型集群组件。
如代码清单3-5所示,在createSessionComponentFactory()方法中,含有用于创建Session类型集群Dispatcher组件的SessionDispatcherFactory、ResourceManager组件对应的resourceManagerFactory以及创建restEndpointFactory对应的SessionRestEndpointFactory。最终将这些工厂实现类封装在DefaultDispatcherResourceManagerComponentFactory对象中,用于创建集群中对应的组件。
代码清单3-5 DefaultDispatcherResourceManagerComponentFactory.createSession-ComponentFactory()方法
public static DefaultDispatcherResourceManagerComponentFactory createSessionComponentFactory( ResourceManagerFactory<?> resourceManagerFactory) { return new DefaultDispatcherResourceManagerComponentFactory( DefaultDispatcherRunnerFactory.createSessionRunner(SessionDispatcherFact ory.INSTANCE), resourceManagerFactory, SessionRestEndpointFactory.INSTANCE); }
对于JobCluster来讲也是如此,如代码清单3-6所示,在DefaultDispatcherResourceManagerComponentFactory.createJobComponentFactory()方法中,包含了创建Per-Job类型集群的服务创建工厂,包括DefaultDispatcherRunnerFactory、ResourceManagerFactory和JobRestEndpointFactory实例等。
代码清单3-6 DefaultDispatcherResourceManagerComponentFactory.createJob-ComponentFactory()方法定义
public static DefaultDispatcherResourceManagerComponentFactory createJobComponentFactory( ResourceManagerFactory<?> resourceManagerFactory, JobGraphRetriever jobGraphRetriever) { return new DefaultDispatcherResourceManagerComponentFactory( DefaultDispatcherRunnerFactory.createJobRunner(jobGraphRetriever), resourceManagerFactory, JobRestEndpointFactory.INSTANCE); }
创建DefaultDispatcherResourceManagerComponentFactory完毕后,接下来调用DefaultDispatcherResourceManagerComponentFactory.create()方法创建运行时中的各个组件。create()方法的逻辑比较多,为了方便介绍,我们对方法进行拆解,对每个核心步骤对应的代码逻辑进行说明。
1.获取集群HA Leader恢复服务及创建组件Gateway
在create()方法中,先从highAvailabilityServices中获取dispatcherLeaderRetrievalService和resourceManagerRetrievalService,分别用于创建dispatcherGatewayRetriever和resourceManagerGatewayRetriever。这里的GatewayRetriever组件用于获取指定集群组件Gateway当前活跃的Leader地址,避免因为集群RPC组件服务宕机,Gateway发生切换而导致其他组件服务无法正常通信的问题,如代码清单3-7所示。
代码清单3-7 DefaultDispatcherResourceManagerComponentFactory.create()方法部分逻辑
// 获取dispatcherLeaderRetrievalService dispatcherLeaderRetrievalService = highAvailabilityServices. getDispatcherLeaderRetriever(); // 获取resourceManagerRetrievalService resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerL eaderRetriever(); // 创建dispatcherGatewayRetriever final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>( rpcService, DispatcherGateway.class, DispatcherId::fromUuid, 10, Time.milliseconds(50L)); // 创建resourceManagerGatewayRetriever final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>( rpcService, ResourceManagerGateway.class, ResourceManagerId::fromUuid, 10, Time.milliseconds(50L));
2.创建和启动WebMonitorEndpoint
下一步是创建WebMonitorEndpoint组件。WebMonitorEndpoint是前端页面访问Web后端服务接口的提供者,通过WebMonitorEndpoint提供的Restful接口可以查看集群整体监控信息、获取任务执行状态。
如代码清单3-8所示,WebMonitorEndpoint的创建过程主要包含以下步骤。
·调用WebMonitorEndpoint.createExecutorService()方法创建ExecutorService,用于处理Web请求服务时的多线程服务。
·获取metrics指标拉取的updateInterval配置,也就是Web页面获取一次监控指标的间隔时间。
·根据updateInterval参数创建MetricFetcher,用于获取Metric指标。
·调用RestEndpointFactory.createRestEndpoint()方法创建webMonitorEndpoint。
·启动创建好的webMonitorEndpoint组件。
代码清单3-8 DefaultDispatcherResourceManagerComponentFactory.create()方法部分逻辑
// 调用WebMonitorEndpoint.createExecutorService()方法创建ExecutorService final ExecutorService executor = WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS), configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"); // 获取updateInterval参数 final long updateInterval = configuration.getLong(MetricOptions.METRIC_ FETCHER_UPDATE_INTERVAL); // 创建MetricFetcher final MetricFetcher metricFetcher = updateInterval == 0 ? VoidMetricFetcher.INSTANCE : MetricFetcherImpl.fromConfiguration( configuration, metricQueryServiceRetriever, dispatcherGatewayRetriever, executor); // 调用RestEndpointFactory.createRestEndpoint()方法创建webMonitorEndpoint webMonitorEndpoint = restEndpointFactory.createRestEndpoint( configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, blobServer, executor, metricFetcher, highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), fatalErrorHandler); // 启动创建好的webMonitorEndpoint log.debug("Starting Dispatcher REST endpoint."); webMonitorEndpoint.start();
3.创建和启动DispatcherRunner
DispatcherRunner是专门用于启动Dispatcher集群核心组件的驱动类。换句话讲,Dispatcher组件是借助DispatcherRunner启动和运行的。
如代码清单3-9所示,创建DispatcherRunner主要包括如下步骤。
·创建HistoryServerArchivist,用于在History Server上对指定的AccessExecutionGraph进行历史归档。
·创建PartialDispatcherServices,用于提供Dispatcher组件使用的一部分服务,包括高可用、blobServer等。之所以叫作PartialDispatcherServices,是因为Dispatcher还有其他服务会在后续执行过程中启动。
·调用dispatcherRunnerFactory.createDispatcherRunner()方法创建DispatcherRunner对象。创建参数,包括前面创建的所有参数信息,而DispatcherRunner会在后面被leaderElectionService服务启动和执行。
代码清单3-9 DefaultDispatcherResourceManagerComponentFactory.create()方法部分逻辑
// 获取HistoryServerArchivist final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist. createHistoryServerArchivist(configuration, webMonitorEndpoint); // 创建PartialDispatcherServices final PartialDispatcherServices partialDispatcherServices = new PartialDispatcherServices( configuration, highAvailabilityServices, resourceManagerGatewayRetriever, blobServer, heartbeatServices, () -> MetricUtils.instantiateJobManagerMetricGroup(metricRegis try, hostname), archivedExecutionGraphStore, fatalErrorHandler, historyServerArchivist, metricRegistry.getMetricQueryServiceGatewayRpcAddress()); // 创建并启动Dispatcher log.debug("Starting Dispatcher."); dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner( highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, new HaServicesJobGraphStoreFactory(highAvailabilityServices), ioExecutor, rpcService, partialDispatcherServices);
4.创建和启动ResourceManager
下面介绍如何创建ResourceManager集群核心组件。如代码清单3-10所示,创建Resource-Manager组件的主要步骤如下。
·创建ResourceManagerMetricGroup,用于采集ResourceManager相关的监控指标。
·调用resourceManagerFactory.createResourceManager()方法创建resourceManager组件。
·启动所创建的resourceManager组件。
代码清单3-10 DefaultDispatcherResourceManagerComponentFactory.create()方法部分逻辑
// 创建ResourceManagerMetricGroup,用于采集ResourceManager相关的监控指标 resourceManagerMetricGroup = ResourceManagerMetricGroup.create(metricRegistry, hostname); // 调用resourceManagerFactory.createResourceManager()方法创建resourceManager组件 resourceManager = resourceManagerFactory.createResourceManager( configuration, ResourceID.generate(), rpcService, highAvailabilityServices, heartbeatServices, fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), webMonitorEndpoint.getRestBaseUrl(), resourceManagerMetricGroup); // 此处省略了DispatcherRunner的创建逻辑,请读者参考源码 log.debug("Starting ResourceManager."); //启动创建的resourceManager resourceManager.start();
5.使用高可用服务启动组件Gateway
接下来通过高可用服务启动ResourceManager和Dispatcher对应的GatewayRetriever服务。我们知道,GatewayRetriever主要用于获取指定服务组件Leader对应的RpcGateway地址,保证系统的高可用,如代码清单3-11所示。
代码清单3-11 DefaultDispatcherResourceManagerComponentFactory.create()方法部分逻辑
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever); dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
6.返回DispatcherResourceManagerComponent对象
当所有的组件都创建完毕并启动后,就会通过DispatcherResourceManagerComponent对象对服务和组件进行封装,然后返回到ClusterEntrypoint中。至此,集群管理节点的所有主要组件就完成了创建和初始化步骤,用户可以通过浏览器访问并启动Flink集群UI页面,但因为集群还没有启动TaskManager,所以暂时还不能向启动的集群提交作业,如代码清单3-12所示。
代码清单3-12 返回封装好服务和组件的DispatcherResourceManagerComponent
return new DispatcherResourceManagerComponent( dispatcherRunner, resourceManager, dispatcherLeaderRetrievalService, resourceManagerRetrievalService, webMonitorEndpoint);
以上整体介绍了集群运行时中核心组件的创建过程,接下来我们详细介绍每个核心组件的创建过程。