Click on a picture to view pyspark docs


In [1]:
# versions
import IPython
print("pyspark version:" + str(sc.version))
print("Ipython version:" + str(IPython.__version__))


pyspark version:1.6.1
Ipython version:4.2.0

In [2]:
# agg
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.agg({"amt":"avg"})
x.show()
y.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-------------------+
|           avg(amt)|
+-------------------+
|0.20000000000000004|
+-------------------+


In [3]:
# alias
from pyspark.sql.functions import col
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.alias('transactions')
x.show()
y.show()
y.select(col("transactions.to")).show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+
|   to|
+-----+
|  Bob|
|Carol|
| Dave|
+-----+


In [4]:
# cache
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.cache()
print(x.count()) # first action materializes x in memory
print(x.count()) # later actions avoid IO overhead


3
3

In [5]:
# coalesce
x_rdd = sc.parallelize([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)],2)
x = sqlContext.createDataFrame(x_rdd, ['from','to','amt'])
y = x.coalesce(numPartitions=1)
print(x.rdd.getNumPartitions())
print(y.rdd.getNumPartitions())


2
1

In [6]:
# collect
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.collect() # creates list of rows on driver
x.show()
print(y)


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2), Row(from=u'Carol', to=u'Dave', amt=0.3)]

In [7]:
# columns
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.columns #creates list of column names on driver
x.show()
print(y)


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

['from', 'to', 'amt']

In [8]:
# corr
x = sqlContext.createDataFrame([("Alice","Bob",0.1,0.001),("Bob","Carol",0.2,0.02),("Carol","Dave",0.3,0.02)], ['from','to','amt','fee'])
y = x.corr(col1="amt",col2="fee")
x.show()
print(y)


+-----+-----+---+-----+
| from|   to|amt|  fee|
+-----+-----+---+-----+
|Alice|  Bob|0.1|0.001|
|  Bob|Carol|0.2| 0.02|
|Carol| Dave|0.3| 0.02|
+-----+-----+---+-----+

0.866025403784

In [9]:
# count
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.show()
print(x.count())


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

3

In [10]:
# cov
x = sqlContext.createDataFrame([("Alice","Bob",0.1,0.001),("Bob","Carol",0.2,0.02),("Carol","Dave",0.3,0.02)], ['from','to','amt','fee'])
y = x.cov(col1="amt",col2="fee")
x.show()
print(y)


+-----+-----+---+-----+
| from|   to|amt|  fee|
+-----+-----+---+-----+
|Alice|  Bob|0.1|0.001|
|  Bob|Carol|0.2| 0.02|
|Carol| Dave|0.3| 0.02|
+-----+-----+---+-----+

0.00095

In [11]:
# crosstab
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.crosstab(col1='from',col2='to')
x.show()
y.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-------+----+-----+---+
|from_to|Dave|Carol|Bob|
+-------+----+-----+---+
|    Bob|   0|    1|  0|
|  Alice|   0|    0|  1|
|  Carol|   1|    0|  0|
+-------+----+-----+---+


In [12]:
# cube
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Alice","Carol",0.2)], ['from','to','amt'])
y = x.cube('from','to')
x.show()
print(y) # y is a grouped data object, aggregations will be applied to all numerical columns
y.sum().show() 
y.max().show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|Alice|Carol|0.2|
+-----+-----+---+

<pyspark.sql.group.GroupedData object at 0x7fc5383bcc50>
+-----+-----+-------------------+
| from|   to|           sum(amt)|
+-----+-----+-------------------+
|Alice|Carol|                0.2|
|Alice|  Bob|                0.1|
|Alice| null|0.30000000000000004|
| null|Carol|                0.2|
| null|  Bob|                0.1|
| null| null|0.30000000000000004|
+-----+-----+-------------------+

+-----+-----+--------+
| from|   to|max(amt)|
+-----+-----+--------+
|Alice|Carol|     0.2|
|Alice|  Bob|     0.1|
|Alice| null|     0.2|
| null|Carol|     0.2|
| null|  Bob|     0.1|
| null| null|     0.2|
+-----+-----+--------+


In [13]:
# describe
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.show()
x.describe().show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-------+-------------------+
|summary|                amt|
+-------+-------------------+
|  count|                  3|
|   mean|0.20000000000000004|
| stddev|0.09999999999999998|
|    min|                0.1|
|    max|                0.3|
+-------+-------------------+


In [14]:
# distinct
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3),("Bob","Carol",0.2)], ['from','to','amt'])
y = x.distinct()
x.show()
y.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
|  Bob|Carol|0.2|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
|Alice|  Bob|0.1|
+-----+-----+---+


In [15]:
# drop
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.drop('amt')
x.show()
y.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+
| from|   to|
+-----+-----+
|Alice|  Bob|
|  Bob|Carol|
|Carol| Dave|
+-----+-----+


In [16]:
# dropDuplicates / drop_duplicates
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Bob","Carol",0.3),("Bob","Carol",0.2)], ['from','to','amt'])
y = x.dropDuplicates(subset=['from','to'])
x.show()
y.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|  Bob|Carol|0.3|
|  Bob|Carol|0.2|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|  Bob|Carol|0.2|
|Alice|  Bob|0.1|
+-----+-----+---+


In [17]:
# dropna
x = sqlContext.createDataFrame([(None,"Bob",0.1),("Bob","Carol",None),("Carol",None,0.3),("Bob","Carol",0.2)], ['from','to','amt'])
y = x.dropna(how='any',subset=['from','to'])
x.show()
y.show()


+-----+-----+----+
| from|   to| amt|
+-----+-----+----+
| null|  Bob| 0.1|
|  Bob|Carol|null|
|Carol| null| 0.3|
|  Bob|Carol| 0.2|
+-----+-----+----+

+----+-----+----+
|from|   to| amt|
+----+-----+----+
| Bob|Carol|null|
| Bob|Carol| 0.2|
+----+-----+----+


In [18]:
# dtypes
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.dtypes
x.show()
print(y)


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[('from', 'string'), ('to', 'string'), ('amt', 'double')]

In [19]:
# explain
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.show()
x.agg({"amt":"avg"}).explain(extended = True)


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

== Parsed Logical Plan ==
'Aggregate ['avg(amt#296) AS avg(amt)#297]
+- LogicalRDD [from#294,to#295,amt#296], MapPartitionsRDD[194] at applySchemaToPythonRDD at null:-1

== Analyzed Logical Plan ==
avg(amt): double
Aggregate [(avg(amt#296),mode=Complete,isDistinct=false) AS avg(amt)#297]
+- LogicalRDD [from#294,to#295,amt#296], MapPartitionsRDD[194] at applySchemaToPythonRDD at null:-1

== Optimized Logical Plan ==
Aggregate [(avg(amt#296),mode=Complete,isDistinct=false) AS avg(amt)#297]
+- Project [amt#296]
   +- LogicalRDD [from#294,to#295,amt#296], MapPartitionsRDD[194] at applySchemaToPythonRDD at null:-1

== Physical Plan ==
TungstenAggregate(key=[], functions=[(avg(amt#296),mode=Final,isDistinct=false)], output=[avg(amt)#297])
+- TungstenExchange SinglePartition, None
   +- TungstenAggregate(key=[], functions=[(avg(amt#296),mode=Partial,isDistinct=false)], output=[sum#301,count#302L])
      +- Project [amt#296]
         +- Scan ExistingRDD[from#294,to#295,amt#296]

In [20]:
# fillna
x = sqlContext.createDataFrame([(None,"Bob",0.1),("Bob","Carol",None),("Carol",None,0.3)], ['from','to','amt'])
y = x.fillna(value='unknown',subset=['from','to'])
x.show()
y.show()


+-----+-----+----+
| from|   to| amt|
+-----+-----+----+
| null|  Bob| 0.1|
|  Bob|Carol|null|
|Carol| null| 0.3|
+-----+-----+----+

+-------+-------+----+
|   from|     to| amt|
+-------+-------+----+
|unknown|    Bob| 0.1|
|    Bob|  Carol|null|
|  Carol|unknown| 0.3|
+-------+-------+----+


In [21]:
# filter
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.filter("amt > 0.1")
x.show()
y.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+


In [22]:
# first
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.first()
x.show()
print(y)


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

Row(from=u'Alice', to=u'Bob', amt=0.1)

In [23]:
# flatMap
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.flatMap(lambda x: (x[0],x[2])) 
print(y) # implicit coversion to RDD
y.collect()


PythonRDD[227] at RDD at PythonRDD.scala:43
Out[23]:
[u'Alice', 0.1, u'Bob', 0.2, u'Carol', 0.3]

In [24]:
# foreach
from __future__ import print_function

# setup
fn = './foreachExampleDataFrames.txt' 
open(fn, 'w').close()  # clear the file
def fappend(el,f):
    '''appends el to file f'''
    print(el,file=open(f, 'a+') )

# example
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.foreach(lambda x: fappend(x,fn)) # writes into foreachExampleDataFrames.txt
x.show() # original dataframe
print(y) # foreach returns 'None'
# print the contents of the file
with open(fn, "r") as foreachExample:
    print (foreachExample.read())


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

None
Row(from=u'Carol', to=u'Dave', amt=0.3)
Row(from=u'Bob', to=u'Carol', amt=0.2)
Row(from=u'Alice', to=u'Bob', amt=0.1)


In [25]:
# foreachPartition
from __future__ import print_function

#setup
fn = './foreachPartitionExampleDataFrames.txt'
open(fn, 'w').close()  # clear the file
def fappend(partition,f):
    '''append all elements in partition to file f'''
    print([el for el in partition],file=open(f, 'a+'))

x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x = x.repartition(2) # force 2 partitions
y = x.foreachPartition(lambda x: fappend(x,fn)) # writes into foreachPartitionExampleDataFrames.txt

x.show() # original dataframe
print(y) # foreach returns 'None'
# print the contents of the file
with open(fn, "r") as foreachExample:
    print (foreachExample.read())


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

None
[]
[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2), Row(from=u'Carol', to=u'Dave', amt=0.3)]


In [26]:
# freqItems
x = sqlContext.createDataFrame([("Bob","Carol",0.1), \
                                ("Alice","Dave",0.1), \
                                ("Alice","Bob",0.1), \
                                ("Alice","Bob",0.5), \
                                ("Carol","Bob",0.1)], \
                               ['from','to','amt'])
y = x.freqItems(cols=['from','amt'],support=0.8)
x.show()
y.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|  Bob|Carol|0.1|
|Alice| Dave|0.1|
|Alice|  Bob|0.1|
|Alice|  Bob|0.5|
|Carol|  Bob|0.1|
+-----+-----+---+

+--------------+-------------+
|from_freqItems|amt_freqItems|
+--------------+-------------+
|       [Alice]|        [0.1]|
+--------------+-------------+


In [27]:
# groupBy
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Alice","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.groupBy('from')
x.show()
print(y)


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|Alice|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

<pyspark.sql.group.GroupedData object at 0x7fc53831f5d0>

In [28]:
# groupBy(col1).avg(col2)
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Alice","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.groupBy('from').avg('amt')
x.show()
y.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|Alice|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-------------------+
| from|           avg(amt)|
+-----+-------------------+
|Carol|                0.3|
|Alice|0.15000000000000002|
+-----+-------------------+


In [29]:
# head
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.head(2)
x.show()
print(y)


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2)]

In [30]:
# intersect
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Alice",0.2),("Carol","Dave",0.1)], ['from','to','amt'])
z = x.intersect(y)
x.show()
y.show()
z.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Alice|0.2|
|Carol| Dave|0.1|
+-----+-----+---+

+-----+---+---+
| from| to|amt|
+-----+---+---+
|Alice|Bob|0.1|
+-----+---+---+


In [31]:
# isLocal
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.isLocal()
x.show()
print(y)


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

False

In [32]:
# join
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = sqlContext.createDataFrame([('Alice',20),("Bob",40),("Dave",80)], ['name','age'])
z = x.join(y,x.to == y.name,'inner').select('from','to','amt','age')
x.show()
y.show()
z.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+---+
| name|age|
+-----+---+
|Alice| 20|
|  Bob| 40|
| Dave| 80|
+-----+---+

+-----+----+---+---+
| from|  to|amt|age|
+-----+----+---+---+
|Carol|Dave|0.3| 80|
|Alice| Bob|0.1| 40|
+-----+----+---+---+


In [33]:
# limit
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.limit(2)
x.show()
y.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
+-----+-----+---+


In [34]:
# map
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.map(lambda x: x.amt+1)
x.show()
print(y.collect())  # output is RDD


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[1.1, 1.2, 1.3]

In [35]:
# mapPartitions
def amt_sum(partition):
    '''sum the value in field amt'''
    yield sum([el.amt for el in partition])
    
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x = x.repartition(2) # force 2 partitions
y = x.mapPartitions(lambda p: amt_sum(p))
x.show()
print(x.rdd.glom().collect()) # flatten elements on the same partition
print(y.collect())
print(y.glom().collect())


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2), Row(from=u'Carol', to=u'Dave', amt=0.3)], []]
[0.6000000000000001, 0]
[[0.6000000000000001], [0]]

In [36]:
# na
x = sqlContext.createDataFrame([(None,"Bob",0.1),("Bob","Carol",None),("Carol",None,0.3),("Bob","Carol",0.2)], ['from','to','amt'])
y = x.na  # returns an object for handling missing values, supports drop, fill, and replace methods
x.show()
print(y)
y.drop().show()
y.fill({'from':'unknown','to':'unknown','amt':0}).show()
y.fill(0).show()


+-----+-----+----+
| from|   to| amt|
+-----+-----+----+
| null|  Bob| 0.1|
|  Bob|Carol|null|
|Carol| null| 0.3|
|  Bob|Carol| 0.2|
+-----+-----+----+

<pyspark.sql.dataframe.DataFrameNaFunctions object at 0x7fc538392b90>
+----+-----+---+
|from|   to|amt|
+----+-----+---+
| Bob|Carol|0.2|
+----+-----+---+

+-------+-------+---+
|   from|     to|amt|
+-------+-------+---+
|unknown|    Bob|0.1|
|    Bob|  Carol|0.0|
|  Carol|unknown|0.3|
|    Bob|  Carol|0.2|
+-------+-------+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
| null|  Bob|0.1|
|  Bob|Carol|0.0|
|Carol| null|0.3|
|  Bob|Carol|0.2|
+-----+-----+---+


In [37]:
# orderBy
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.orderBy(['from'],ascending=[False])
x.show()
y.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Carol| Dave|0.3|
|  Bob|Carol|0.2|
|Alice|  Bob|0.1|
+-----+-----+---+


In [38]:
# persist
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.persist(storageLevel=StorageLevel(True,True,False,True,1)) # StorageLevel(useDisk,useMemory,useOffHeap,deserialized,replication=1)
x.show()
x.is_cached


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

Out[38]:
True

In [39]:
# printSchema
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.show()
x.printSchema()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

root
 |-- from: string (nullable = true)
 |-- to: string (nullable = true)
 |-- amt: double (nullable = true)


In [40]:
# randomSplit
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.randomSplit([0.5,0.5])
x.show()
y[0].show()
y[1].show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+----+---+---+
|from| to|amt|
+----+---+---+
+----+---+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+


In [41]:
# rdd
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.rdd
x.show()
print(y.collect())


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2), Row(from=u'Carol', to=u'Dave', amt=0.3)]

In [42]:
# registerTempTable
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.registerTempTable(name="TRANSACTIONS")
y = sqlContext.sql('SELECT * FROM TRANSACTIONS WHERE amt > 0.1')
x.show()
y.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+


In [43]:
# repartition
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.repartition(3)
print(x.rdd.getNumPartitions())
print(y.rdd.getNumPartitions())


4
3

In [44]:
# replace
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.replace('Dave','David',['from','to'])
x.show()
y.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol|David|0.3|
+-----+-----+---+


In [45]:
# rollup
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.rollup(['from','to'])
x.show()
print(y) # y is a grouped data object, aggregations will be applied to all numerical columns
y.sum().show()
y.max().show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

<pyspark.sql.group.GroupedData object at 0x7fc5382f7850>
+-----+-----+------------------+
| from|   to|          sum(amt)|
+-----+-----+------------------+
|Alice|  Bob|               0.1|
|  Bob|Carol|               0.2|
|Alice| null|               0.1|
|Carol| Dave|               0.3|
|  Bob| null|               0.2|
|Carol| null|               0.3|
| null| null|0.6000000000000001|
+-----+-----+------------------+

+-----+-----+--------+
| from|   to|max(amt)|
+-----+-----+--------+
|Alice|  Bob|     0.1|
|  Bob|Carol|     0.2|
|Alice| null|     0.1|
|Carol| Dave|     0.3|
|  Bob| null|     0.2|
|Carol| null|     0.3|
| null| null|     0.3|
+-----+-----+--------+


In [46]:
# sample
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.sample(False,0.5)
x.show()
y.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+----+-----+---+
|from|   to|amt|
+----+-----+---+
| Bob|Carol|0.2|
+----+-----+---+


In [47]:
# sampleBy
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Alice","Carol",0.2),("Alice","Alice",0.3), \
                               ('Alice',"Dave",0.4),("Bob","Bob",0.5),("Bob","Carol",0.6)], \
                                ['from','to','amt'])
y = x.sampleBy(col='from',fractions={'Alice':0.1,'Bob':0.9})
x.show()
y.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|Alice|Carol|0.2|
|Alice|Alice|0.3|
|Alice| Dave|0.4|
|  Bob|  Bob|0.5|
|  Bob|Carol|0.6|
+-----+-----+---+

+----+-----+---+
|from|   to|amt|
+----+-----+---+
| Bob|  Bob|0.5|
| Bob|Carol|0.6|
+----+-----+---+


In [48]:
# schema
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.schema
x.show()
print(y)


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

StructType(List(StructField(from,StringType,true),StructField(to,StringType,true),StructField(amt,DoubleType,true)))

In [49]:
# select
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.select(['from','amt'])
x.show()
y.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+---+
| from|amt|
+-----+---+
|Alice|0.1|
|  Bob|0.2|
|Carol|0.3|
+-----+---+


In [50]:
# selectExpr
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.selectExpr(['substr(from,1,1)','amt+10'])
x.show()
y.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+----------------+----------+
|substr(from,1,1)|(amt + 10)|
+----------------+----------+
|               A|      10.1|
|               B|      10.2|
|               C|      10.3|
+----------------+----------+


In [51]:
# show
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+


In [52]:
# sort
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Alice",0.3)], ['from','to','amt'])
y = x.sort(['to'])
x.show()
y.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol|Alice|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Carol|Alice|0.3|
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
+-----+-----+---+


In [53]:
# sortWithinPartitions
x = sqlContext.createDataFrame([('Alice',"Bob",0.1,1),("Bob","Carol",0.2,2),("Carol","Alice",0.3,2)], \
                               ['from','to','amt','p_id']).repartition(2,'p_id')
y = x.sortWithinPartitions(['to'])
x.show()
y.show()
print(x.rdd.glom().collect()) # glom() flattens elements on the same partition
print(y.rdd.glom().collect())


+-----+-----+---+----+
| from|   to|amt|p_id|
+-----+-----+---+----+
|Alice|  Bob|0.1|   1|
|  Bob|Carol|0.2|   2|
|Carol|Alice|0.3|   2|
+-----+-----+---+----+

+-----+-----+---+----+
| from|   to|amt|p_id|
+-----+-----+---+----+
|Alice|  Bob|0.1|   1|
|Carol|Alice|0.3|   2|
|  Bob|Carol|0.2|   2|
+-----+-----+---+----+

[[Row(from=u'Alice', to=u'Bob', amt=0.1, p_id=1)], [Row(from=u'Bob', to=u'Carol', amt=0.2, p_id=2), Row(from=u'Carol', to=u'Alice', amt=0.3, p_id=2)]]
[[Row(from=u'Alice', to=u'Bob', amt=0.1, p_id=1)], [Row(from=u'Carol', to=u'Alice', amt=0.3, p_id=2), Row(from=u'Bob', to=u'Carol', amt=0.2, p_id=2)]]

In [54]:
# stat
x = sqlContext.createDataFrame([("Alice","Bob",0.1,0.001),("Bob","Carol",0.2,0.02),("Carol","Dave",0.3,0.02)], ['from','to','amt','fee'])
y = x.stat
x.show()
print(y)
print(y.corr(col1="amt",col2="fee"))


+-----+-----+---+-----+
| from|   to|amt|  fee|
+-----+-----+---+-----+
|Alice|  Bob|0.1|0.001|
|  Bob|Carol|0.2| 0.02|
|Carol| Dave|0.3| 0.02|
+-----+-----+---+-----+

<pyspark.sql.dataframe.DataFrameStatFunctions object at 0x7fc5382f7a50>
0.866025403784

In [55]:
# subtract
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.1)], ['from','to','amt'])
z = x.subtract(y)
x.show()
y.show()
z.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.1|
+-----+-----+---+

+-----+----+---+
| from|  to|amt|
+-----+----+---+
|Carol|Dave|0.3|
+-----+----+---+


In [56]:
# take
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.take(num=2)
x.show()
print(y)


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2)]

In [57]:
# toDF
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.toDF("seller","buyer","amt")
x.show()
y.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+------+-----+---+
|seller|buyer|amt|
+------+-----+---+
| Alice|  Bob|0.1|
|   Bob|Carol|0.2|
| Carol| Dave|0.3|
+------+-----+---+


In [58]:
# toJSON
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Alice",0.3)], ['from','to','amt'])
y = x.toJSON()
x.show()
print(y.collect())


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol|Alice|0.3|
+-----+-----+---+

[u'{"from":"Alice","to":"Bob","amt":0.1}', u'{"from":"Bob","to":"Carol","amt":0.2}', u'{"from":"Carol","to":"Alice","amt":0.3}']

In [59]:
# toPandas
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.toPandas()
x.show()
print(type(y))
y


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

<class 'pandas.core.frame.DataFrame'>
Out[59]:
from to amt
0 Alice Bob 0.1
1 Bob Carol 0.2
2 Carol Dave 0.3

In [60]:
# unionAll
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2)], ['from','to','amt'])
y = sqlContext.createDataFrame([("Bob","Carol",0.2),("Carol","Dave",0.1)], ['from','to','amt'])
z = x.unionAll(y)
x.show()
y.show()
z.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|  Bob|Carol|0.2|
|Carol| Dave|0.1|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|  Bob|Carol|0.2|
|Carol| Dave|0.1|
+-----+-----+---+


In [61]:
# unpersist
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.cache()
x.count()
x.show()
print(x.is_cached)
x.unpersist()
print(x.is_cached)


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

True
False

In [62]:
# where (filter)
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.where("amt > 0.1")
x.show()
y.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+


In [63]:
# withColumn
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",None),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.withColumn('conf',x.amt.isNotNull())
x.show()
y.show()


+-----+-----+----+
| from|   to| amt|
+-----+-----+----+
|Alice|  Bob| 0.1|
|  Bob|Carol|null|
|Carol| Dave| 0.3|
+-----+-----+----+

+-----+-----+----+-----+
| from|   to| amt| conf|
+-----+-----+----+-----+
|Alice|  Bob| 0.1| true|
|  Bob|Carol|null|false|
|Carol| Dave| 0.3| true|
+-----+-----+----+-----+


In [64]:
# withColumnRenamed
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.withColumnRenamed('amt','amount')
x.show()
y.show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+------+
| from|   to|amount|
+-----+-----+------+
|Alice|  Bob|   0.1|
|  Bob|Carol|   0.2|
|Carol| Dave|   0.3|
+-----+-----+------+


In [65]:
# write
import json
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.write.mode('overwrite').json('./dataframeWriteExample.json')
x.show()
# read the dataframe back in from file
sqlContext.read.json('./dataframeWriteExample.json').show()


+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+---+-----+-----+
|amt| from|   to|
+---+-----+-----+
|0.1|Alice|  Bob|
|0.2|  Bob|Carol|
|0.3|Carol| Dave|
+---+-----+-----+