Apache Spark 2:Data Processing and Real-Time Analytics
上QQ阅读APP看书,第一时间看更新

Data preparation

The challenge data comes in three ZIP packages but we only use two of them. One contains categorical data, one contains continuous data, and the last one contains timestamps of measurements, which we will ignore for now.

If you extract the data, you'll get three large CSV files. So the first thing that we want to do is re-encode them into parquet in order to be more space-efficient:

def convert(filePrefix : String) = {
val basePath = "yourBasePath"
var df = spark
.read
.option("header",true)
.option("inferSchema", "true")
.csv("basePath+filePrefix+".csv")
df = df.repartition(1)
df.write.parquet(basePath+filePrefix+".parquet")
}

convert("train_numeric")
convert("train_date")
convert("train_categorical")

First, we define a function convert that just reads the .csv file and rewrites it as a .parquet file. As you can see, this saves a lot of space:

Now we read the files in again as DataFrames from the parquet files:

var df_numeric = spark.read.parquet(basePath+"train_numeric.parquet")

var df_categorical = spark.read.parquet(basePath+"train_categorical.parquet")

Here is the output of the same:

This is very high-dimensional data; therefore, we will take only a subset of the columns for this illustration:

df_categorical.createOrReplaceTempView("dfcat")
var dfcat = spark.sql("select Id, L0_S22_F545 from dfcat")

In the following picture, you can see the unique categorical values of that column:

Now let's do the same with the numerical dataset:

df_numeric.createOrReplaceTempView("dfnum")
var dfnum = spark.sql("select Id,L0_S0_F0,L0_S0_F2,L0_S0_F4,Response from dfnum")

Here is the output of the same:

Finally, we rejoin these two relations:

var df = dfcat.join(dfnum,"Id")
df.createOrReplaceTempView("df")

Then we have to do some NA treatment:

var df_notnull = spark.sql("""
select
Response as label,
case
when L0_S22_F545 is null then 'NA'
else L0_S22_F545 end as L0_S22_F545,
case
when L0_S0_F0 is null then 0.0
else L0_S0_F0 end as L0_S0_F0,
case
when L0_S0_F2 is null then 0.0
else L0_S0_F2 end as L0_S0_F2,
case
when L0_S0_F4 is null then 0.0
else L0_S0_F4 end as L0_S0_F4
from df
""")