In [13]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *

In [14]:
sqlContext = SQLContext(sc)
dataFile = sc.textFile("/user/root/analytic_out/part-r-00000")
dataFile.count()


Out[14]:
605

In [15]:
header = dataFile.first()
header


Out[15]:
u'id\tYEAR|NETWORK|LABOREXP|STAFF|ELECEXP|KWH|TOTCOST|LABOR|ELEC|CAPITAL'

In [16]:
fields = [StructField(field_name, StringType(), True) for field_name in header.split('|')]
fields
len(fields)
fields[0].dataType = StringType()
fields[1].dataType = FloatType()
fields[2].dataType = FloatType()
fields[3].dataType = FloatType()
fields[4].dataType = FloatType()
fields[5].dataType = FloatType()
fields[6].dataType = FloatType()
fields[7].dataType = FloatType()
fields[8].dataType = FloatType()
fields[9].dataType = FloatType()

schema = StructType(fields)
schema


Out[16]:
StructType(List(StructField(id	YEAR,StringType,true),StructField(NETWORK,FloatType,true),StructField(LABOREXP,FloatType,true),StructField(STAFF,FloatType,true),StructField(ELECEXP,FloatType,true),StructField(KWH,FloatType,true),StructField(TOTCOST,FloatType,true),StructField(LABOR,FloatType,true),StructField(ELEC,FloatType,true),StructField(CAPITAL,FloatType,true)))

In [17]:
dataHeader = dataFile.filter(lambda l: "id" in l)
dataHeader.collect()


Out[17]:
[u'id\tYEAR|NETWORK|LABOREXP|STAFF|ELECEXP|KWH|TOTCOST|LABOR|ELEC|CAPITAL']

In [18]:
dataNoHeader = dataFile.subtract(dataHeader)
dataNoHeader.count()


Out[18]:
604

In [19]:
data_temp = dataNoHeader.map(lambda k: k.split("|")).map(lambda p: ( p[0], float(p[1]), float(p[2]), float(p[3]), float(p[4]), float(p[5]), float(p[6]), float(p[7]), float(p[8]), float(p[9]) )
)
#data_temp.take(1)

In [20]:
data_df = sqlContext.createDataFrame(data_temp, schema)
#data_df.head(2)

In [21]:
#data_df.groupBy("ELECEXP").count().show()

In [22]:
#data_df.dtypes

In [23]:
data_df.printSchema()


root
 |-- id	YEAR: string (nullable = true)
 |-- NETWORK: float (nullable = true)
 |-- LABOREXP: float (nullable = true)
 |-- STAFF: float (nullable = true)
 |-- ELECEXP: float (nullable = true)
 |-- KWH: float (nullable = true)
 |-- TOTCOST: float (nullable = true)
 |-- LABOR: float (nullable = true)
 |-- ELEC: float (nullable = true)
 |-- CAPITAL: float (nullable = true)


In [24]:
data_df.registerTempTable("data")
sqlContext.sql("SELECT COUNT(*) FROM data WHERE STAFF > 40").show()


c0 
397