In [1]:
import datetime
from pytz import timezone
print "Last run @%s" % (datetime.datetime.now(timezone('US/Pacific')))
In [2]:
from pyspark.context import SparkContext
print "Running Spark Version %s" % (sc.version)
In [3]:
from pyspark.conf import SparkConf
conf = SparkConf()
print conf.toDebugString()
In [4]:
sqlCxt = pyspark.sql.SQLContext(sc)
In [5]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('spark-csv/cars.csv')
df.coalesce(1).select('year', 'model').write.format('com.databricks.spark.csv').save('newcars.csv')
In [6]:
df.show()
In [7]:
df_cars = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('car-data/car-milage.csv')
In [8]:
df_cars_x = sqlContext.read.load('cars_1.parquet')
df_cars_x.dtypes
Out[8]:
In [9]:
df_cars.show(40)
In [10]:
df_cars.describe().show()
In [11]:
df_cars.describe(["mpg",'hp']).show()
In [12]:
df_cars.groupby("automatic").avg("mpg")
In [13]:
df_cars.na.drop('any').count()
Out[13]:
In [14]:
df_cars.count()
Out[14]:
In [15]:
df_cars.dtypes
Out[15]:
In [16]:
df_2 = df_cars.select(df_cars.mpg.cast("double").alias('mpg'),df_cars.torque.cast("double").alias('torque'),
df_cars.automatic.cast("integer").alias('automatic'))
In [17]:
df_2.show(40)
In [18]:
df_2.dtypes
Out[18]:
In [19]:
df_2.describe().show()
In [20]:
df_2.groupby("automatic").avg("mpg","torque").show()
In [21]:
df_2.groupBy().avg("mpg","torque").show()
In [22]:
df_2.agg({"*":"count"}).show()
In [23]:
import pyspark.sql.functions as F
df_2.agg(F.min(df_2.mpg)).show()
In [24]:
import pyspark.sql.functions as F
df_2.agg(F.mean(df_2.mpg)).show()
In [25]:
gdf_2 = df_2.groupBy("automatic")
gdf_2.agg({'mpg':'min'}).collect()
gdf_2.agg({'mpg':'min'}).show()
In [26]:
df_cars_1 = df_cars.select(df_cars.mpg.cast("double").alias('mpg'),
df_cars.displacement.cast("double").alias('displacement'),
df_cars.hp.cast("integer").alias('hp'),
df_cars.torque.cast("integer").alias('torque'),
df_cars.CRatio.cast("float").alias('CRatio'),
df_cars.RARatio.cast("float").alias('RARatio'),
df_cars.CarbBarrells.cast("integer").alias('CarbBarrells'),
df_cars.NoOfSpeed.cast("integer").alias('NoOfSpeed'),
df_cars.length.cast("float").alias('length'),
df_cars.width.cast("float").alias('width'),
df_cars.weight.cast("integer").alias('weight'),
df_cars.automatic.cast("integer").alias('automatic'))
In [27]:
gdf_3 = df_cars_1.groupBy("automatic")
gdf_3.agg({'mpg':'mean'}).show()
In [28]:
df_cars_1.avg("mpg","torque").show()
In [29]:
df_cars_1.groupBy().avg("mpg","torque").show()
In [30]:
df_cars_1.groupby("automatic").avg("mpg","torque").show()
In [31]:
df_cars_1.groupby("automatic").avg("mpg","torque","hp","weight").show()
In [32]:
df_cars_1.printSchema()
In [33]:
df_cars_1.show(5)
In [34]:
df_cars_1.describe().show()
In [35]:
df_cars_1.groupBy().agg({"mpg":"mean"}).show()
In [36]:
df_cars_1.show(40)
In [37]:
df_cars_1.corr('hp','weight')
Out[37]:
In [38]:
df_cars_1.corr('RARatio','width')
Out[38]:
In [39]:
df_cars_1.crosstab('automatic','NoOfSpeed').show()
In [40]:
df_cars_1.crosstab('NoOfSpeed','CarbBarrells').show()
In [41]:
df_cars_1.crosstab('automatic','CarbBarrells').show()
In [42]:
# We can see if a column has null values
df_cars_1.select(df_cars_1.torque.isNull()).show()
In [43]:
# We can filter null and non null rows
df_cars_1.filter(df_cars_1.torque.isNull()).show(40) # You can also use isNotNull
In [44]:
df_cars_1.na.drop().count()
Out[44]:
In [45]:
df_cars_1.fillna(9999).show(50)
# This is not what we will do normally. Just to show the effect of fillna
# you can use df_cars_1.na.fill(9999)
In [46]:
# Let us try the interesting when syntax on the HP column
# 0-100=1,101-200=2,201-300=3,others=4
df_cars_1.select(df_cars_1.hp, F.when(df_cars_1.hp <= 100, 1).when(df_cars_1.hp <= 200, 2)
.when(df_cars_1.hp <= 300, 3).otherwise(4).alias("hpCode")).show(40)
In [47]:
df_cars_1.dtypes
Out[47]:
In [48]:
df_cars_1.groupBy('CarbBarrells').count().show()
In [49]:
# If file exists, will give error
# java.lang.RuntimeException: path file:.. /cars_1.parquet already exists.
#
df_cars_1.repartition(1).write.save("cars_1.parquet", format="parquet")
In [50]:
# No error even if the file exists
df_cars_1.repartition(1).write.mode("overwrite").format("parquet").save("cars_1.parquet")
# Use repartition if you want all data in one (or more) file
In [51]:
# Appends to existing file
df_cars_1.repartition(1).write.mode("append").format("parquet").save("cars_1_a.parquet")
# Even with repartition, you will see more files as it is append
In [52]:
df_append = sqlContext.load("cars_1_a.parquet")
# sqlContext.load is deprecated
In [53]:
df_append.count()
Out[53]:
In [54]:
#eventhough parquet is the default format, explicit format("parquet") is clearer
df_append = sqlContext.read.format("parquet").load("cars_1_a.parquet")
df_append.count()
Out[54]:
In [55]:
# for reading parquet files read.parquet is more elegant
df_append = sqlContext.read.parquet("cars_1_a.parquet")
df_append.count()
Out[55]:
In [56]:
# Let us read another file
df_orders = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('NW/NW-Orders.csv')
In [57]:
df_orders.head()
Out[57]:
In [58]:
df_orders.dtypes
Out[58]:
In [59]:
from pyspark.sql.types import StringType, IntegerType,DateType
getYear = F.udf(lambda s: s[-2:], StringType()) #IntegerType())
from datetime import datetime
convertToDate = F.udf(lambda s: datetime.strptime(s, '%m/%d/%y'),DateType())
In [60]:
# You could register the function for sql as follows. We won't use this here
sqlContext.registerFunction("getYear", lambda s: s[-2:])
In [61]:
# let us add an year column
df_orders.select(df_orders['OrderID'],
df_orders['CustomerID'],
df_orders['EmpliyeeID'],
df_orders['OrderDate'],
df_orders['ShipCuntry'].alias('ShipCountry'),
getYear(df_orders['OrderDate'])).show()
In [62]:
# let us add an year column
# Need alias
df_orders_1 = df_orders.select(df_orders['OrderID'],
df_orders['CustomerID'],
df_orders['EmpliyeeID'],
convertToDate(df_orders['OrderDate']).alias('OrderDate'),
df_orders['ShipCuntry'].alias('ShipCountry'),
getYear(df_orders['OrderDate']).alias('Year'))
# df_orders_1 = df_orders_x.withColumn('Year',getYear(df_orders_x['OrderDate'])) # doesn't work. Gives error
In [63]:
df_orders_1.show(1)
In [64]:
df_orders_1.dtypes
Out[64]:
In [65]:
df_orders_1.show()
In [66]:
df_orders_1.where(df_orders_1['ShipCountry'] == 'France').show()
In [67]:
df_orders_1.groupBy("CustomerID","Year").count().orderBy('count',ascending=False).show()
In [68]:
df_orders_1.groupBy("CustomerID","Year").count().orderBy('count',ascending=False).show()
In [69]:
# save by partition (year)
df_orders_1.write.mode("overwrite").partitionBy("Year").format("parquet").save("orders_1.parquet")
# load defaults to parquet
In [70]:
df_orders_2 = sqlContext.read.parquet("orders_1.parquet")
df_orders_2.explain(True)
df_orders_3 = df_orders_2.filter(df_orders_2.Year=='96')
df_orders_3.explain(True)
In [71]:
df_orders_3.count()
Out[71]:
In [72]:
df_orders_3.explain(True)
In [73]:
df_orders_2.count()
Out[73]:
In [74]:
df_orders_1.printSchema()
In [75]:
# import pyspark.sql.Row
df = sc.parallelize([10,100,1000]).map(lambda x: {"num":x}).toDF()
In [76]:
df.show()
In [77]:
import pyspark.sql.functions as F
df.select(F.log(df.num)).show()
In [78]:
df.select(F.log10(df.num)).show()
In [79]:
df = sc.parallelize([0,10,100,1000]).map(lambda x: {"num":x}).toDF()
In [80]:
df.show()
In [81]:
df.select(F.log(df.num)).show()
In [82]:
df.select(F.log1p(df.num)).show()
In [83]:
df_cars_1.select(df_cars_1['CarbBarrells'], F.sqrt(df_cars_1['mpg'])).show()
In [84]:
df = sc.parallelize([(3,4),(5,12),(7,24),(9,40),(11,60),(13,84)]).map(lambda x: {"a":x[0],"b":x[1]}).toDF()
In [85]:
df.show()
In [86]:
df.select(df['a'],df['b'],F.hypot(df['a'],df['b']).alias('hypot')).show()
In [87]:
df_a = sc.parallelize( [{"X1":"A","X2":1},{"X1":"B","X2":2},{"X1":"C","X2":3}] ).toDF()
df_b = sc.parallelize( [{"X1":"A","X3":True},{"X1":"B","X3":False},{"X1":"D","X3":True}] ).toDF()
In [88]:
df_a.show()
In [89]:
df_b.show()
In [90]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'inner')\
.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show()
In [91]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'outer')\
.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show() # same as 'full' or 'fullouter'
# Spark doesn't merge the key columns and so need to alias the column names to distinguih between the columns
In [92]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'left_outer')\
.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show() # same as 'left'
In [93]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'right_outer')\
.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show() # same as 'right'
In [94]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'right')\
.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show()
In [95]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'full')\
.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show()# same as 'fullouter'
In [96]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'leftsemi').show() # same as semijoin
#.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show()
In [97]:
#anti-join = df.subtract('leftsemi')
df_a.subtract(df_a.join(df_b, df_a['X1'] == df_b['X1'], 'leftsemi')).show()
#.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show()
In [98]:
c = [{"X1":"A","X2":1},{"X1":"B","X2":2},{"X1":"C","X2":3}]
d = [{"X1":"A","X2":1},{"X1":"B","X2":2},{"X1":"D","X2":4}]
df_c = sc.parallelize(c).toDF()
df_d = sc.parallelize(d).toDF()
In [99]:
df_c.show()
In [100]:
df_d.show()
In [101]:
df_c.intersect(df_d).show()
In [102]:
df_c.subtract(df_d).show()
In [103]:
df_d.subtract(df_c).show()
In [104]:
e = [{"X1":"A","X2":1},{"X1":"B","X2":2},{"X1":"C","X2":3}]
f = [{"X1":"D","X2":4},{"X1":"E","X2":5},{"X1":"F","X2":6}]
df_e = sc.parallelize(e).toDF()
df_f = sc.parallelize(f).toDF()
In [105]:
df_e.unionAll(df_f).show()
In [106]:
# df_a.join(df_b, df_a['X1'] == df_b['X1'], 'semijoin')\
# .select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show()
# Gives error Unsupported join type 'semijoin'.
# Supported join types include: 'inner', 'outer', 'full', 'fullouter', 'leftouter', 'left', 'rightouter',
# 'right', 'leftsemi'
In [107]:
# SQL on tables
In [108]:
df_a.registerTempTable("tableA")
sqlContext.sql("select * from tableA").show()
In [109]:
df_b.registerTempTable("tableB")
sqlContext.sql("select * from tableB").show()
In [110]:
sqlContext.sql("select * from tableA JOIN tableB on tableA.X1 = tableB.X1").show()
In [111]:
sqlContext.sql("select * from tableA LEFT JOIN tableB on tableA.X1 = tableB.X1").show()
In [112]:
sqlContext.sql("select * from tableA FULL JOIN tableB on tableA.X1 = tableB.X1").show()
In [ ]: