In [13]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
In [14]:
sqlContext = SQLContext(sc)
dataFile = sc.textFile("/user/root/analytic_out/part-r-00000")
dataFile.count()
Out[14]:
In [15]:
header = dataFile.first()
header
Out[15]:
In [16]:
fields = [StructField(field_name, StringType(), True) for field_name in header.split('|')]
fields
len(fields)
fields[0].dataType = StringType()
fields[1].dataType = FloatType()
fields[2].dataType = FloatType()
fields[3].dataType = FloatType()
fields[4].dataType = FloatType()
fields[5].dataType = FloatType()
fields[6].dataType = FloatType()
fields[7].dataType = FloatType()
fields[8].dataType = FloatType()
fields[9].dataType = FloatType()
schema = StructType(fields)
schema
Out[16]:
In [17]:
dataHeader = dataFile.filter(lambda l: "id" in l)
dataHeader.collect()
Out[17]:
In [18]:
dataNoHeader = dataFile.subtract(dataHeader)
dataNoHeader.count()
Out[18]:
In [19]:
data_temp = dataNoHeader.map(lambda k: k.split("|")).map(lambda p: ( p[0], float(p[1]), float(p[2]), float(p[3]), float(p[4]), float(p[5]), float(p[6]), float(p[7]), float(p[8]), float(p[9]) )
)
#data_temp.take(1)
In [20]:
data_df = sqlContext.createDataFrame(data_temp, schema)
#data_df.head(2)
In [21]:
#data_df.groupBy("ELECEXP").count().show()
In [22]:
#data_df.dtypes
In [23]:
data_df.printSchema()
In [24]:
data_df.registerTempTable("data")
sqlContext.sql("SELECT COUNT(*) FROM data WHERE STAFF > 40").show()