2.2 NameServer启动流程
从源码的角度窥探一下NameServer启动流程,重点关注NameServer相关启动参数。
NameServer启动类:org.apache.rocketmq.namesrv.NamesrvStartup。
Step1:首先来解析配置文件,需要填充NameServerConfig、NettyServerConfig属性值。
代码清单2-1 NameServer加载配置文件
final NamesrvConfig namesrvConfig = new NamesrvConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); nettyServerConfig.setListenPort(9876); if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file ! = null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, " + file + "%n"); in.close(); } } if (commandLine.hasOption('p')) { MixAll.printObjectProperties(null, namesrvConfig); MixAll.printObjectProperties(null, nettyServerConfig); System.exit(0); } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
从代码我们可以知道先创建NameServerConfig(NameServer业务参数)、NettyServer-Config(NameServer网络参数),然后在解析启动时把指定的配置文件或启动命令中的选项值,填充到nameServerConfig, nettyServerConfig对象。参数来源有如下两种方式。
1)-c configFile通过-c命令指定配置文件的路径。
2)使用“--属性名 属性值”,例如 --listenPort 9876。
代码清单2-2 NameServerConfig属性
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"; private String productEnvName = "center"; private boolean clusterTest = false; private boolean orderMessageEnable = false;
□ rocketmqhome: rocketmq主目录,可以通过-Drocketmq.home.dir=path或通过设置环境变量ROCKETMQ_HOME来配置RocketMQ的主目录。
□ kvConfigPath:NameServer存储KV配置属性的持久化路径。
□ configStorePath:nameServer默认配置文件路径,不生效。nameServer启动时如果要通过配置文件配置NameServer启动属性的话,请使用-c选项。
□ orderMessageEnable:是否支持顺序消息,默认是不支持。
代码清单2-3 NettyServerConfig属性
private int listenPort = 8888; private int serverWorkerThreads = 8; private int serverCallbackExecutorThreads = 0; private int serverSelectorThreads = 3; private int serverOnewaySemaphoreValue = 256; private int serverAsyncSemaphoreValue = 64; private int serverChannelMaxIdleTimeSeconds = 120; private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; private boolean serverPooledByteBufAllocatorEnable = true; private boolean useEpollNativeSelector = false;
□ listenPort:NameServer监听端口,该值默认会被初始化为9876。
□ serverWorkerThreads:Netty业务线程池线程个数。
□ serverCallbackExecutorThreads:Netty public任务线程池线程个数,Netty网络设计,根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型(RequestCode)未注册线程池,则由public线程池执行。
□ serverSelectorThreads:IO线程池线程个数,主要是NameServer、Broker端解析请求、返回相应的线程个数,这类线程主要是处理网络请求的,解析请求包,然后转发到各个业务线程池完成具体的业务操作,然后将结果再返回调用方。
□ serverOnewaySemaphoreValue:send oneway消息请求并发度(Broker端参数)。
□ serverAsyncSemaphoreValue:异步消息发送最大并发度(Broker端参数)。
□ serverChannelMaxIdleTimeSeconds:网络连接最大空闲时间,默认120s。如果连接空闲时间超过该参数设置的值,连接将被关闭。
□ serverSocketSndBufSize:网络socket发送缓存区大小,默认64k。
□ serverSocketRcvBufSize:网络socket接收缓存区大小,默认64k。
□ serverPooledByteBufAllocatorEnable:ByteBuffer是否开启缓存,建议开启。
□ useEpollNativeSelector:是否启用Epoll IO模型,Linux环境建议开启。
小技巧
在启动NameServer时,可以先使用./mqnameserver -c configFile -p打印当前加载的配置属性。
Step2:根据启动属性创建NamesrvController实例,并初始化该实例,NameServerController实例为NameServer核心控制器。
代码清单2-4 NamesrvController#Initialize代码片段
public boolean initialize() { this.kvConfigManager.load(); this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); this.registerProcessor(); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); return true; }
加载KV配置,创建NettyServer网络处理对象,然后开启两个定时任务,在RocketMQ中此类定时任务统称为心跳检测。
□ 定时任务1:NameServer每隔10s扫描一次Broker,移除处于不激活状态的Broker。
□ 定时任务2:nameServer每隔10分钟打印一次KV配置。
Step3:注册JVM钩子函数并启动服务器,以便监听Broker、消息生产者的网络请求。
代码清单2-5 注册JVM钩子函数代码
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { public Void call() throws Exception { controller.shutdown(); return null; } })); controller.start();
这里主要是向读者展示一种常用的编程技巧,如果代码中使用了线程池,一种优雅停机的方式就是注册一个JVM钩子函数,在JVM进程关闭之前,先将线程池关闭,及时释放资源。