随着大数据技术和业务需求的发展变化,数据应用对实时性要求越来越高,以流计算和离线处理为代表的Lambda架构成为大数据平台的事实标准。如何借助大数据基础组件快速开发流计算框架与查询应用成为这其中的核心问题。本文将从以下三个方面展开论述:Lambda架构构建大数据平台的实践经验;AWS部署实践;OLAP查询演进。
一、Lambda架构构建大数据平台的实践经验
1.Lambda架构概述
我们的数据来源于广告的投放和展示,我们的服务部署在全球6个数据中心,每天有近10亿的展示日志,累计产生超过3TB的视频广告数据,并且有18个月的数据需要持久化。
广告数据在视频过程中通过投放和展示产出,这种类型的数据是一个Request跟随多个ACK记录。在数据生成和传输过程中,我们使用Google Protocol Buffers来作为数据存储格式,嵌套数据结构,经过我们的处理框架,以Parquet文件形式存储到HDFS。
根据我们的数据内容,我们提供4种类型的数据应用。一是报表统计类,负责为客户产生各种形式的统计报表;二是追踪业务流量的变化,为客户提供定制化实时日志;三是分析查询服务,也就是OLAP;四是预测类应用,为我们线上广告服务器提供反馈。
图1 以Hadoop/Kafka/HBase为核心的Lambda框架
基于我们的数据特点和应用需求,我们在内部采用一套以Hadoop、Kafka、HBase为核心的Lambda处理架构(如图1所示)。各个数据中心的广告服务器实时地传输数据到Kafka Cluster,然后MirrorMaker会把多个数据中心的数据聚集到一个全局的Kafka Cluster。我们的流式处理框架会从全局的Kafka Cluster消费数据,处理之后写入HBase Table,并由Hadoop离线作业将HBase Table转换成Parquet文件。OLAP(Presto)服务会同时查询HBase和HDFS,对外提供数据的实时查询。针对Kafka Topic, HBase Table和HDFS上Parquet文件,还支持Spark和MapReduce作业进行处理。
2.分布式资源管理平台
基于Lambda架构的需求,我们将分布式平台的服务分为两类:一类是计算服务,一类是核心服务(如图2所示)。计算服务追求吞吐和速度,核心服务关注服务的延迟和稳定性。计算服务包括各种类型的批处理、流式计算、内存计算以及OLAP查询相关的计算。为了统一管理,所有的计算服务都要部署在YARN上。核心服务包括分布式存储HDFS、HBase、监控组件和系统内核服务。
图2 分布式平台服务划分
为了共享资源、提高资源利用率,这些服务在每台机器上都会部署。在计算服务这一层,我们统一使用YARN来管理。那么,在这样的一个背景下,如何解决计算服务与核心服务的资源竞争,在保证服务稳定的前提下,更有效地使用资源,成为了Lambda架构面临的重要的挑战。
(1)CPU资源的竞争
不受限制的计算密集型服务负载变化存在不确定性,我们可以看到机器存在一些时刻,基本全部的CPU被计算服务占用,直接会影响底下核心服务的质量。如图3所示,这台机器在持续高CPU情况下,直接导致了机器宕机。计算服务导致服务器异常高负载以及宕机,影响核心服务的稳定性。
图3 计算服务高负载的影响
(2)网络IO资源竞争
Presto是我们分布式平台的数据分析服务,它负责对超过18个月的数据进行SQL查询,图4是ATOP工具记录的各项系统指标,可以看出Presto-server占用了超过83%的网络资源。对于网络IO资源的竞争,会直接影响到HDFS DataNode传输Block的效率以及HBase的延迟。图5可以看到HBase RPC的服务延迟受到了带宽的影响,RPC延迟会随着节点网卡带宽的负载的升高而变大。
图4 计算服务网络高负载的ATOP截图
图5 网络带宽和HBase RPC延迟的关系图
因此,基于CPU和网络IO资源的隔离,是计算服务和以存储为目的的核心服务共存的必要条件。为了解决资源竞争的问题,我们采用了Linux CGroup技术,分别从CPU资源和网络资源进行隔离。
(1)CPU资源隔离
计算服务和核心服务使用cpuset cgroup实现隔离。例如,我们服务器有32 CPU核心,我们将前8个CPU核分配给核心服务使用,后面24个核分配给计算服务。
在每一层服务内部,我们使用cpu.shares实现资源竞争共享资源。在这一点上,YARN本身支持基于任务的vcore设置cpu.shares,灵活实现了共享式竞争。
如图6所示,对比测试显示了基于CPU隔离能够有效控制机器的资源利用率。机器B是设置了cpuset隔离,机器A是没有设置cpuset,可以看出在同样计算任务负载的情况下,机器B可以很好地控制资源利用率,保证了核心服务的稳定性。
图6 系统CPU利用率对比图
通过粗粒度cpuset隔离和分层内部细粒度cpu.shares的竞争,使得我们在保证核心服务的CPU资源的同时,允许我们的计算任务在允许的范围内竞争资源。
(2)网络IO资源隔离
在网络资源的隔离上,我们是通过net_cl cgroup对于进程按照网络资源进行分组,然后tc(htb模式)限制网络IO资源。例如,在我们的分布式平台,我们分别对于HDFS、HBase、YARN计算服务进行网络带宽限制。节点网卡总带宽1000mbit,我们对于YARN、HBase、HDFS按照480mbit、260mbit、260mbit进行分配。设置代码示例如下。
#设置tc htb类别和流量控制规则 $ tc class add dev eth0 parent 1: classid 1:1 htb rate 1000mbit $ tc class add dev eth0 parent 1:1 classid 1:10 htb rate 480mbit ceil 1000mbit $ tc class add dev eth0 parent 1:1 classid 1:11 htb rate 260mbit ceil 1000mbit $ tc class add dev eth0 parent 1:1 classid 1:12 htb rate 260mbit ceil 1000mbit #创建cgroup net_cls分组(yarn-service, hbase-service, hdfs- service) $ cgcreate -g net_cls:/yarn-service $ cgset -r net_cls.classid=0x10010 yarn-service $ cgcreate -g net_cls:/hbase-service $ cgset -r net_cls.classid=0x10011 hbase-service $ cgcreate -g net_cls:/hdfs-service $ cgset -r net_cls.classid=0x10012 hdfs-service #设置tc filter按照cgroup进行过滤 $ tc filter add dev eth1 parent 1: protocol ip handle 1: cgroup #启动服务进程并加入对应的cgroup $ cgexec -g net_cls:{$group_name} start-service-command
经过调整之后,YARN计算服务在整个环境下的网络带宽使用被得到更合理的控制,这样在核心服务存在较高网络IO需求时,服务能够获得稳定的吞吐和延迟。例如,我们通过实验测试了在YARN启动数据密集作业的同时,提交大规模的HDFS/HBase读写的场景下,对比了设置网络资源隔离带来的效果,如图7所示。在不影响整体网络IO资源使用率的前提下,有效保护了HDFS、HBase核心服务的带宽。结合CPU资源隔离,有效地实现了核心服务、计算服务共置机器,资源高效共享。
图7 网络资源隔离效果图
3.应用编排和CD
通过Linux CGroup技术解决了Lambda架构的服务如何在同一个环境中更好地共享资源的问题,我们的计算服务由YARN统一调度和部署。那么如何让服务类型的作业部署到YARN,成为了我们需要解决的一个问题。
服务类型的分布式应用,它需要解决如下几个关键问题:
第一是维护应用每个组件的状态。我们认为部署在YARN环境的应用,需要满足Share-nothing的分布式架构,从而可以保证在机器宕机等不可抗拒的因素的干扰下,服务的正常执行。
第二是维护内部逻辑拓扑。流式处理作业,一般包含多个层次的逻辑关系,需要一种语言来描述不同层次之间的依赖,保证服务的正常执行。
第三是支持弹性伸缩。在逻辑和依赖允许的情况下,支持按需增加和减少实例个数。
基于此,我们采用了Apache Slider。Slider是一种分布式应用框架,它帮助一个分布式应用可以灵活地部署到YARN上。分布式应用可以使用Slider描述出资源需求和内部的逻辑层次,Slider可以监控服务的状态,并结合YARN的特性,实现自动恢复。
本质上,Slider应用就是运行在YARN上的Application,它也是有一个AppMaster作为中心任务,Slider Agent被部署到YARN的工作节点,用来启动子任务。我们Slider Client可以通过API向Slider AppMaster提交请求,通过Slider Agent执行相关管理工作。
Slider支持Docker化的应用,每个工作节点会通过Docker Pull从Docker Registry请求指定版本的Image,并按照Agent的指令启动Docker Container。Slider部署Application的架构图如图8。
图8 容器类应用Slider部署
在我们的流数据处理框架里,通过Slider来管理作业的状态和逻辑,Docker Image负责业务逻辑和环境的组装发布。在实现按需动态伸缩的过程中,需要在指定节点关闭或者增加实例。这是在Slider的系统不支持的,因此,我们扩展了Slider Flex功能,实现指定节点启动/关闭某个组件的实例。在Docker部署实践上,我们解决了Slider Agent记录的Docker容器状态和Docker Container本身状态不一致的问题。为了实现灰度发布,增加支持App不退出更新Docker Image的特性,动态重启部分实例,实现部分升级的功能。
在完成了对于Slider的改造和优化之后,我们将Slider用于我们的持续集成(CD)流程中。现在Slider支持整体发布和灰度发布两种,可以基于Application、Component、Instance进行升级,极大地提升了我们CD的流程的效率。
经过Slider+Docker对于服务进行组装和改造之后,持续集成(CD)在大数据处理系统里面的流程包括如下几个步骤(如图9):
图9 Docker容器化的持续集成
1)通过Git提交代码。2)Jenkins执行Regression Test。3)完成Regression Test的代码,打包成Docker Image,提交到Docker Registry上。4)部署到Testing环境。5)Testing环境验证无误之后,提交到Production环境。
在Testing和Production环境中,根据升级的需求选择合适的发布策略,例如,如果Application升级之后的输出数据和输入数据有变化,会做整体升级,对于模块代码Bug Fix或者内部优化,在不改变处理逻辑的情况下,对Component会进行灰度发布。