Apache Spark 2:Data Processing and Real-Time Analytics
上QQ阅读APP看书,第一时间看更新

TCP stream

There is a possibility of using the Spark Streaming Context method called socketTextStream to stream data via TCP/IP, by specifying a hostname and port number. The Scala-based code example in this section will receive data on port 10777 that was supplied using the netcat Linux command.

The netcat command is a Linux/Unix command which allows you to send and receive data to or from local or remote IP destinations using TCP or UDP. This way every shell script can play the role of a full network client or server. The following is a good tutorial on how to use netcat: http://www.binarytides.com/netcat-tutorial-for-beginners/.

The code sample starts by importing Spark, the context, and the streaming classes. The object class named stream2 is defined as it is the main method with arguments:

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object stream2 {
def main(args: Array[String]) {

The number of arguments passed to the class is checked to ensure that it is the hostname and port number. A Spark configuration object is created with an application name defined. The Spark and streaming contexts are then created. Then, a streaming batch time of 10 seconds is set:

if ( args.length < 2 ) {
System.err.println("Usage: stream2 <host> <port>")
System.exit(1)
}

val hostname = args(0).trim
val portnum = args(1).toInt
val appName = "Stream example 2"
val conf = new SparkConf()
conf.setAppName(appName)
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10) )

A DStream called rawDstream is created by calling the socketTextStream method of the streaming context using the hostname and port name parameters:

val rawDstream = ssc.socketTextStream( hostname, portnum )

A top-ten word count is created from the raw stream data by splitting words with spacing. Then, a (key, value) pair is created as (word,1), which is reduced by the key value, this being the word. So now, there is a list of words and their associated counts. The key and value are swapped so the list becomes (count and word). Then, a sort is done on the key, which is now the count. Finally, the top 10 items in the RDD within the DStream are taken and printed out:

val wordCount = rawDstream
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey(_+_)
.map(item => item.swap)
.transform(rdd => rdd.sortByKey(false))
.foreachRDD( rdd =>
{ rdd.take(10).foreach(x=>println("List : " + x)) }
)

The code closes with the Spark Streaming start and awaitTermination methods being called to start the stream processing and await process termination:

    ssc.start()
ssc.awaitTermination()
} // end main
} // end stream2

The data for this application is provided, as we stated previously, by the Linux Netcat (nc) command. The Linux cat command dumps the contents of a log file, which is piped to nc. The lk options force Netcat to listen for connections and keep on listening if the connection is lost. This example shows that the port being used is 10777:

 [root@hc2nn log]# pwd
/var/log
[root@hc2nn log]# cat ./anaconda.storage.log | nc -lk 10777

The output from this TCP-based stream processing is shown here. The actual output is not as important as the method demonstrated. However, the data shows, as expected, a list of 10 log file words in descending count order. Note that the top word is empty because the stream was not filtered for empty words:

 List : (17104,)
List : (2333,=)
List : (1656,:)
List : (1603,;)
List : (1557,DEBUG)
List : (564,True)
List : (495,False)
List : (411,None)
List : (356,at)
List : (335,object)

This is interesting if you want to stream data using Apache Spark Streaming based on TCP/IP from a host and port. However, what about more exotic methods? What if you wish to stream data from a messaging system or via memory-based channels? What if you want to use some of the big data tools available today such as Flume and Kafka? The next sections will examine these options, but, first, we will demonstrate how streams can be based on files.