去哪儿网消息队列设计与实现
去哪儿网近日在GitHub上开源了其内部广泛使用的消息队列(内部代号QMQ),本文从去哪儿网使用消息队列所碰到的各种问题出发探讨去哪儿网消息队列的设计与实现。
背景
2012年,随着公司业务的快速增长,公司当时的单体应用架构很难满足业务快速增长的要求,和其他很多公司一样,去哪儿网也开始了服务化改造,按照业务等要素将原来庞大的单体应用拆分成不同的服务。那么在进行服务化改造之前首先就是面临是服务化基础设施的技术选型,其中最重要的就是服务之间的通信中间件。一般来讲服务之间的通信可以分为同步方式和异步方式。同步的方式的代表就是RPC,我们选择了当时还在活跃开发的Alibaba Dubbo(在之后Dubbo官方停止了开发,但是最近Dubbo项目又重新启动了)。
异步方式的代表就是消息队列(Message Queue),MQ在当时也有很多开源的选择:RabbitMQ, ActiveMQ, Kafka, MetaQ(RocketMQ的前身)。首先因为技术栈我们排除了erlang开发的RabbitMQ,而Kafka以及Java版Kafka的MetaQ在当时还并不成熟和稳定。而ActiveMQ在去哪儿网已经有很多应用在使用了,但是使用过程中并不一帆风顺:宕机,消息丢失,消息堵塞等问题屡见不鲜,而且ActiveMQ发展多年,代码也非常复杂,想要完全把控也不容易,所以我们决定自己造一个轮子。
问题
如果我们要在服务化拆分中使用消息队列,那么我们需要解决哪些问题呢?首先去哪儿网提供了旅游产品在线预订服务,那么就涉及电商交易,在电商交易中我们认为数据的一致性是非常关键的要素。那么我们的MQ必须提供一致性保证。
MQ提供一致性保证又分为两个方面:发消息时我们如何确保业务操作和发消息是一致的,也就是不能出现业务操作成功消息未发出或者消息发出了但是业务并没有成功的情况。举例来说,支付服务使用消息通知出票服务,那么不能出现支付成功,但是消息没有发出,这会引起用户投诉;但是也不能出现支付未成功,但是消息发出最后出票了,这会导致公司损失。总结一下就是发消息和业务需要有事务保证。一致性的另一端是消费者,比如消费者临时出错或网络故障,我们如何确保消息最终被处理了。那么我们通过消费ACK和重试来达到最终一致性。
而服务端设计,在当时我们考虑的并不多,我们原计划只在交易环节中使用自己开发的MQ,经过对未来数据的预估我们选择数据库作为MQ Server的消息存储,但是随着MQ在各系统中的大量应用,就不仅限于交易场景了,而且大家都期望所有场景中只使用一套API,所以后来消息量迅速增长,迫使我们对存储模型进行了重新设计。再加上旅游产品预定的特征,大部分预定都是未来某个时间点的,这个时间可长可短,短的话可能是几个小时,长的话可能是半年以上,那么我们对延时消息的需求也很强烈。那么这种延时时间不固定的方式也对服务端设计提出了挑战。
接下来本文会从客户端一致性设计和服务端存储模型两方面进行讨论。
客户端一致性
提到一致性,大家肯定就想到事务,而一提到事务,肯定就想到关系型数据库,那么我们是不是可以借助关系型DB里久经考验的事务来实现这个一致性呢。我们以MySQL为例,对于MySQL中同一个实例里面的db,如果共享相同的Connection的话是可以在同一个事务里的。以下图为例,我们有一个MySQL实例监听在3306端口上,然后该实例上有A, B两个DB,那么下面的伪代码是可以跑在同一个事务里的:
begin transaction insert into A.tbl1(name, age) values('admin', 18); insert into B.tbl2(num) values(20); end transaction
有了这层保证,我们就可以透明的实现业务操作和消息发送在同一个事务里了,首先我们在公司所有MySQL实例里初始化出一个message db,这个可以放到自动化流程中,对应用透明。然后我们只要将发消息与业务操作放到同一个DB事务里即可。
我们来看一个实际的场景:在支付场景中,支付成功后我们需要插入一条支付流水,并且发送一条支付完成的消息通知其他系统。那么这里插入支付流水和发送消息就需要是一致的,任何一步没有成功最后都会导致问题。那么就有下面的代码:
@Transactional public void pay(Order order){ PayTransaction t = buildPayTransaction(order); payDao.append(t); producer.sendMessage(buildMessage(t)); }
上面的代码可以用下面的伪代码解释:
@Transactional public void pay(Order order){ PayTransaction t = buildPayTransaction(order); payDao.append(t); //producer.sendMessage(buildMessage(t)); final Message message = buildMessage(t); messageDao.insert(message); //在事务提交后执行 triggerAfterTransactionCommit(()->{ messageClient.send(message); messageDao.delete(message); }); }
实际上在producer.sendMessage执行的时候,消息并没有通过网络发送出去,而仅仅是往业务DB同一个实例上的消息库插入了一条记录,然后注册事务的回调,在这个事务真正提交后消息才从网络发送出去,这个时候如果发送到server成功的话消息会被立即删除掉。而如果消息发送失败则消息就留在消息库里,这个时候我们会有一个补偿任务会将这些消息从消息库里捞出然后重新发送,直到发送成功。整个流程就如下图所示:
1.begin tx开启本地事务
2.do work执行业务操作
3.insert message向同实例消息库插入消息
4.end tx事务提交
5.send message网络向server发送消息
6.reponse server回应消息
7.delete message如果server回复成功则删除消息
8.scan messages补偿任务扫描未发送消息
9.send message补偿任务补偿消息
10.delete messages补偿任务删除补偿成功的消息
服务端存储模型
分析了客户端为了一致性所作的设计后,我们再来看看服务端的存储设计。我会从两个方面来介绍:类似Kafka之类的基于partition存储模型有什么问题,以及我们是如何解决的。
基于partition存储模型的问题
我们都知道Kafka和RocketMQ都是基于partition的存储模型,也就是每个subject分为一个或多个partition,而Server收到消息后将其分发到某个partition上,而Consumer消费的时候是与partition对应的。比如,我们某个subject a分配了3个partition(p1, p2, p3),有3个消费者(c1, c2, c3)消费该消息,则会建立c1 - p1, c2 - p2, c3 - p3这样的消费关系。
那么如果我们的consumer个数比partition个数多呢?则有的consumer会是空闲的。
而如果partition个数比consumer个数多呢?则可能存在有的consumer消费的partition个数会比其他的consumer多的情况。
那么合理的分配策略只有是partition个数与consumer个数成倍数关系。
以上都是基于partition的MQ所带来的负载均衡问题。因为这种静态的绑定的关系,还会导致Consumer扩容缩容麻烦。也就是使用Kafka或者RocketMQ这种基于partition的消息队列时,如果遇到处理速度跟不上时,光简单的增加Consumer并不能马上提高处理能力,需要对应的增加partition个数,而特别在Kafka里partition是一个比较重的资源,增加太多parition还需要考虑整个集群的处理能力;当高峰期过了之后,如果想缩容Consumer也比较麻烦,因为partition只能增加,不能减少。
跟扩容相关的另外一个问题是,已经堆积的消息是不能快速消费的。比如开始的时候我们分配了2个partition,由2个Consumer来消费,但是突然发送方大量发送消息(这个在日常运维中经常遇到),导致消息快速的堆积,这个时候我们如何能快速扩容消费这些消息呢?其实增加partition和Consumer都是没有用的,增加的Consumer爱莫能助,因为堆积的那3个partition只能由2个Consumer来消费,这个时候你只能纵向扩展,而不能横向扩展,而我们都知道纵向扩展很多时候是不现实的,或者执行比较重的再均衡操作。
去哪儿消息队列存储模型
上面已经介绍了基于partition的存储模型存在的问题,那么这些问题对于我们是问题吗?或者我们的场景是否能克服这些问题呢?
现在去哪儿网的系统架构基本上都是消息驱动的,也就是绝大多数业务流程都是靠消息来驱动,那么这样的架构有什么特征呢:
•消息主题特别多现在生产上已有4W+消息主题。这是业务中使用的消息与数据流处理中使用的最大的不同,数据流中一般消息主题少,但是每个消息主题的吞吐量都极大。而业务中的消息是主题极多,但是有很多主题他的量是极小的。
•消息消费的扇出大也就是一个消息主题有几十个甚至上百个不同的应用订阅是非常常见的。以去哪儿酒店订单状态变更的消息为例,目前有将近70多个不同的系统来订阅这个消息。
结合前面对基于partition的存储模型的讨论,我们觉得这种存储模型不太容易符合我们的需求。
虽然我们并不想采用基于partition的存储模型,但是Kafka和RocketMQ里很多设计我们还是可以学习的:
•顺序append文件,提供很好的性能
•顺序消费文件,使用offset表示消费进度,不用给每个消息记录消费状态,成本极低
•将所有subject的消息合并在一起,减少parition数量,单一集群可以支撑更多的subject(RocketMQ)
在设计QMQ的存储模型时,觉得这几点是非常重要的。那如何在不使用基于partition的情况下,又能得到这些特性呢?正所谓有前辈大师说:计算机中所有问题都可以通过添加一个中间层来解决,一个中间层解决不了那就添加两个。
我们通过添加一层拉取的log(pull log)来动态映射consumer与partition的逻辑关系,这样不仅解决了consumer的动态扩容缩容问题,还可以继续使用一个offset表示消费进度。而pull log与consumer一一对应。
下图是QMQ的存储模型。
先解释一下上图中的数字的意义。上图中方框上方的数字,表示该方框在自己log中的偏移,而方框内的数字是该项的内容。比如message log方框上方的数字:3,6,9表示这几条消息在message log中的偏移。而consume log中方框内的数字3,6,9,20正对应着messzge log的偏移,表示这几个位置上的消息都是topic1的消息,consume log方框上方的1,2,3,4表示这几个方框在consume log中的逻辑偏移。下面的pull log方框内的内容对应着consume log的逻辑偏移,而pull log方框外的数字表示pull log的逻辑偏移。
这样存储中有三种重要的log:
•message log所有subject的消息进入该log,消息的主存储
•consume log consume log存储的是message log的索引信息
•pull log每个consumer拉取消息的时候会产生pull log, pull log记录的是拉取的消息在consume log中的sequence
那么消费者就可以使用pull log上的sequence来表示消费进度,这样一来我们就解耦了consumer与partition之间的耦合关系,两者可以任意的扩展。
延迟消息队列存储模型
除了对实时消息的支持,QMQ还支持了任意时间的延时消息,在开源版本的RocektMQ里提供了多种固定延迟level的延时消息支持,也就是你可以发送几个固定的延时时间的延时消息,比如延时10s, 30s…,但是基于我们现有的业务特征,我们觉得这种不同延时level的延时消息并不能满足我们的需要,我们更多的是需要任意时间延时。在OTA场景中,客人经常是预订未来某个时刻的酒店或者机票,这个时间是不固定的,我们无法使用几个固定的延时level来实现这个场景。
我们的延时/定时消息使用的是两层hash wheel timer来实现的。第一层位于磁盘上,每个小时为一个刻度,每个刻度会生成一个日志文件,根据业务特征,我们觉得支持两年内任意时间延时就够了,那么最多会生成2*366*24=17568个文件。第二层在内存中,当消息的投递时间即将到来的时候,会将这个小时的消息索引(偏移量,投递时间等)从磁盘文件加载到内存中的hash wheel timer上。
在延时/定时消息里也存在三种log:
•message log和实时消息里的message log类似,收到消息后append到该log, append成功后就立即返回
•schedule log按照投递时间组织,每个小时一个。该log是回放message log后根据延时时间放置对应的log上,这是上面描述的两层hash wheel timer的第一层,位于磁盘上
•dispatch log延时/定时消息投递后写入,主要用于在应用重启后能够确定哪些消息已经投递
总结
消息队列是构建微服务架构很关键的基础设施,本文根据我们自己的实际使用场景,并且对目前市面上一些活跃的开源产品进行对比,提出去哪儿网的消息队列的设计与实现。本文只是从宏观架构上做出一些探讨,具体的实现细节也有很多有趣的地方,目前去哪儿网的消息队列QMQ也已经在GitHub上开源,欢迎大家试用,欢迎PR,谢谢。