2.1 数据管道
即使是做最基本的分析,我们也需要一些数据。事实上,找到正确的数据可能是数据科学中最难解决的问题之一。第1章已经介绍过,我们获得数据的方式可以是简单的,也可以是复杂的,一切都取决于需求。在实践中,我们将这个决策分成两种不同的种类:临时性的和预定的。
- 临时性的数据获取。对于原型设计和小规模分析,这是最常采用的方法,因为它在实施时通常不需要任何额外的软件。用户想获取一些数据,在需要时从数据源下载即可。这种方法通常就是单击Web链接,将数据存储在方便读取的地方,尽管数据可能仍然需要版本控制。
- 预定的数据获取。在可控环境下,进行大规模生产分析;还有一种很好的场景:将数据集采集到数据湖以备将来使用。随着物联网(IoT)的大规模发展,许多场景产生了大量数据,如果数据没有被及时采集,它将永远消失。这些数据中的大部分在当前还没有明显的用途,但是将来可能会有,所以我们得有这样的心态:只要有需要,就收集所有的数据;只有在我们确信它一定没用时,才删除它。
显然,我们需要灵活的方法来支持各种各样的数据获取选项。
2.1.1 通用采集框架
有许多方法来获取数据,包括系统自带的bash脚本以及高端的商业工具。本节的目的是介绍一个高度灵活的框架,我们用它来进行小规模的数据采集,然后当我们的需求变化多端时,它可以扩展为一个完整的、可协同管理的工作流。该框架采用Apache NiFi进行构建。通过NiFi我们能够建立大规模的、集成的数据管道,可以在全球范围内移动数据。此外,它还具备很好的灵活性,易于构建的流水线,甚至比使用bash或一些传统的脚本方法更快。
如果基于许多因素而采用临时性的数据获取方法来从数据源中获取相同的数据集,应该认真考虑是否换用预定的数据获取方法,或者至少选用更健壮的存储方式,并引入版本控制。
我们选择使用Apache NiFi,因为它提供了这样一种解决方案:可以创建许多不同复杂程度的管道,可以扩展到真正的大数据和物联网水平。它还提供了一个好用的拖放界面(使用所谓基于流程的编程)。在工作流生成的模式、模板和模块的帮助下,它能自动处理许多在传统上一直困扰开发者的复杂特性问题,如多线程、连接管理和可扩展性的处理等。而对我们来说,它将帮助我们快速建立简单的管道原型,并在需要的时候扩展到全功能版本。
它的文档组织得很好,参照NiFi官方网站上的信息,你可以轻松地让它运行起来,它在浏览器里的运行界面如图2-1所示。
图2-1 NiFi运行界面
这里我们鼓励读者将NiFi的安装作为练习,后续章节中我们会用到它。
2.1.2 GDELT数据集简介
现在NiFi已经在运行,我们可以开始采集数据了。让我们从GDELT的全球新闻媒体数据开始,在GDELT网站中能找到以下简要说明。
“在15分钟内,GDELT监控世界各地突发的新闻报道,对其进行翻译,处理识别出所有的事件、计数、引文、人物、组织、地点、主题、情感、相关图像、视频和嵌入的社交媒体帖子,将它放到全局上下文中,并通过一个实时开放的元数据管道,使这些数据可用于全球的开放式研究。
作为全球最大的情感分析应用,我们希望跨越众多语言和学科的边界、汇集众多情感和主题维度,并将其应用于全球的实时突发新闻中,这将帮助我们在如何理解情感方面进入全新时代,它可以帮助我们更好地了解如何语境化、解释、响应以及理解全球事件。”
我想你会认同,这是很有挑战性的事!因此,不要拖延。暂停一下,这里不再进行详细说明了,我们会采用比说明更直接的方式。在接下来的章节中使用它们时,我们将详细介绍GDELT的方方面面。
要开始处理这些开放数据,我们需要深入元数据管道,将新闻流采集到平台里。我们该怎么做呢?先从寻找可用数据开始吧。
1.实时探索GDELT
GDELT网站上会发布最新文件的列表,这个列表每15分钟更新一次。在NiFi里,我们可以建立一个数据流,它以这个列表为来源对GDELT网站进行轮询,获取文件并保存到HDFS,以便以后使用。
在NiFi数据流设计器里,通过将一个处理器拖曳到画布里来创建一个HTTP连接器,然后选择GetHTTP
功能,如图2-2所示。
图2-2 NiFi数据流设计器
为了配置这个处理器,你要输入文件列表的URL:
http://data.gdeltproject.org/gdeltv2/lastupdate.txt
此外,还要为下载的文件列表提供一个临时文件名。在本例中,我们使用NiFi的表达式语言UUID()
来生成一个通用的唯一键值,以确保文件不会被覆盖,如图2-3所示。
图2-3 NiFi处理器配置
值得注意的是,对于这种类型的处理器(GetHTTP
方法),NiFi支持多种用于轮询和检索的调度和定时选项。现在,我们先使用默认选项,让NiFi为我们管理轮询间隔。
图2-4展示了GDELT中的一个最新文件列表示例。
图2-4 GDELT最新文件列表示例
接下来,我们将解析全球知识图(GKG)新闻流的统一资源定位符(URL),以便稍后能获取它。将处理器拖曳到画布上,创建一个正则表达式解析器,然后选择ExtractText
。现在,在现有处理器下面放置一个新处理器,并在上下两个处理器之间直接拖曳出一条连线。最后在弹出的连接对话框里选择success
关系。
操作示例如图2-5所示。
图2-5 解析GKG文件URL
接下来,配置ExtractText
处理器,使用正则表达式对文件列表中的相关文本进行匹配,例如:
([^ ]*gkg.csv.*)
基于这个正则表达式,NiFi将创建一个与流设计相关联的新属性(本例中为url
),它将在每个特定实例通过流的时候,获取一个新值。它甚至可以被配置为支持多线程,示例如图2-6所示。
值得注意的是,虽然这是一个相当具体的例子,但该技术是为通用目标设计的,可以在许多情况下使用。
图2-6 配置处理器属性
2.首个GDELT流
现在我们已经有了GKG流的URL,就可以通过配置一个InvokeHTTP
处理器来获取它。使用之前创建的url
属性作为远程端点,像之前的示例中那样,通过拖放连线来进行操作,如图2-7所示。
图2-7 配置InvokeHTTP处理器
剩下的就是用一个UnpackContent
处理器来解压压缩的内容(使用基本的.zip
格式),并使用PutHDFS
处理器保存内容到HDFS,如图2-8所示。
图2-8 配置压缩和保存处理器
3.通过发布和订阅进行改进
到目前为止,这个流程看起来是点到点的模式,也就是说,如果我们引入一个新的数据消费者,例如Spark-streaming作业,那么这个流就必须改变。流设计可能如图2-9所示。
如果再加一个数据消费者,流就必须再次改变。事实上,每添加一个新的消费者,流就会变得更复杂,特别是要加入所有的错误处理时。显然这并不令人满意,因为引入或移除数据的消费者(或生产者)可能是我们的日常操作,甚至是高频操作。另外,保持流程尽可能简单和可重复使用也是一种更好的策略。
因此,我们不直接将其写入HDFS,而是采用更灵活的模式,将其发布到Apache Kafka。这样,我们可以随时新增或删除消费者,而不用改变数据采集管道。在需要时,我们也可以从Kafka写入HDFS,甚至可以设计一个独立的NiFi流,或者直接用Spark-streaming连接到Kafka。
图2-9 引入新的数据消费者的流设计
为了演示,我们可以将一个处理器拖曳到画布里,选择PutKafka
,以此创建一个Kafka写入器,如图2-10所示。
图2-10 创建一个Kafka写入器
现在,我们已经得到一个简单的流,它可以连续地轮询可用文件列表。在Web可用时,定期检索新流的最新副本、解压缩内容,并将记录逐条流式传输到Kafka中,形成一个持久的、可容错的、分布式的消息队列,供Spark-streaming处理,或存储在HDFS中。更重要的是,我们连一行bash脚本代码都不用写!