SQL functions


SparkContext and SparkSession


In [1]:
from pyspark import SparkContext
sc = SparkContext(master = 'local')

from pyspark.sql import SparkSession
spark = SparkSession.builder \
          .appName("Python Spark SQL basic example") \
          .config("spark.some.config.option", "some-value") \
          .getOrCreate()

Import data


In [2]:
iris = spark.read.csv('data/iris.csv', header=True, inferSchema=True)
iris.show(5)


+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows


In [3]:
prostate = spark.read.csv('data/prostate.csv', header=True, inferSchema=True)
prostate.show(5)


+------------+-----------+---+------------+---+------------+-------+-----+------------+
|      lcavol|    lweight|age|        lbph|svi|         lcp|gleason|pgg45|        lpsa|
+------------+-----------+---+------------+---+------------+-------+-----+------------+
|-0.579818495|2.769458829| 50|-1.386294361|  0|-1.386294361|      6|    0|-0.430782916|
|-0.994252273|3.319625728| 58|-1.386294361|  0|-1.386294361|      6|    0|-0.162518929|
|-0.510825624|2.691243083| 74|-1.386294361|  0|-1.386294361|      7|   20|-0.162518929|
|-1.203972804|3.282789151| 58|-1.386294361|  0|-1.386294361|      6|    0|-0.162518929|
| 0.751416089|3.432372999| 62|-1.386294361|  0|-1.386294361|      6|    0| 0.371563556|
+------------+-----------+---+------------+---+------------+-------+-----+------------+
only showing top 5 rows

Functions


In [23]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import numpy as np
import pandas as pd

abs


In [6]:
prostate.select('lpsa', abs(prostate.lpsa).alias('abs(lpsa)')).show(5)


+------------+-----------+
|        lpsa|  abs(lpsa)|
+------------+-----------+
|-0.430782916|0.430782916|
|-0.162518929|0.162518929|
|-0.162518929|0.162518929|
|-0.162518929|0.162518929|
| 0.371563556|0.371563556|
+------------+-----------+
only showing top 5 rows

acos


In [35]:
pdf = pd.DataFrame({
    'x': list(-np.random.rand(5)) + list(np.random.rand(5))
})
df = spark.createDataFrame(pdf)
df.show(5)


+--------------------+
|                   x|
+--------------------+
| -0.9338356359616288|
|-0.17825806390480148|
| -0.8287101229670288|
| -0.9203268470931772|
| -0.5717064564704842|
+--------------------+
only showing top 5 rows


In [36]:
df.select('x', acos(df.x)).show(5)


+--------------------+------------------+
|                   x|           ACOS(x)|
+--------------------+------------------+
| -0.9338356359616288| 2.775786316206805|
|-0.17825806390480148|1.7500122036992714|
| -0.8287101229670288|2.5475953864778313|
| -0.9203268470931772|2.7397115973218678|
| -0.5717064564704842|2.1793805606775827|
+--------------------+------------------+
only showing top 5 rows

add_months


In [37]:
import datetime

In [43]:
base = datetime.date.today()
date_list = [base + datetime.timedelta(days=x) for x in list(range(0, 10))*10]
pdf = pd.DataFrame({
    'dates': date_list
})
df = spark.createDataFrame(pdf)
df.show(5)


+----------+
|     dates|
+----------+
|2017-06-25|
|2017-06-26|
|2017-06-27|
|2017-06-28|
|2017-06-29|
+----------+
only showing top 5 rows


In [44]:
df.select('dates', add_months(df.dates, 2).alias('new_dates')).show(5)


+----------+----------+
|     dates| new_dates|
+----------+----------+
|2017-06-25|2017-08-25|
|2017-06-26|2017-08-26|
|2017-06-27|2017-08-27|
|2017-06-28|2017-08-28|
|2017-06-29|2017-08-29|
+----------+----------+
only showing top 5 rows

approx_count_distinct


In [45]:
prostate.select(approx_count_distinct(prostate.gleason)).show(5)


+------------------------------+
|approx_count_distinct(gleason)|
+------------------------------+
|                             4|
+------------------------------+


In [48]:
iris.select(approx_count_distinct(iris.species)).show(5)


+------------------------------+
|approx_count_distinct(species)|
+------------------------------+
|                             3|
+------------------------------+

array


In [49]:
iris.show(5)


+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows


In [54]:
df_arr = iris.select('species', array(['sepal_length', 'sepal_width', 'petal_length', 'petal_width']).alias('features'))
df_arr.show(5)


+-------+--------------------+
|species|            features|
+-------+--------------------+
| setosa|[5.1, 3.5, 1.4, 0.2]|
| setosa|[4.9, 3.0, 1.4, 0.2]|
| setosa|[4.7, 3.2, 1.3, 0.2]|
| setosa|[4.6, 3.1, 1.5, 0.2]|
| setosa|[5.0, 3.6, 1.4, 0.2]|
+-------+--------------------+
only showing top 5 rows

array_contains


In [57]:
df = df_arr.select('species', 'features', array_contains(df_arr.features, 1.4).alias('new_features'))
df.show(5)


+-------+--------------------+------------+
|species|            features|new_features|
+-------+--------------------+------------+
| setosa|[5.1, 3.5, 1.4, 0.2]|        true|
| setosa|[4.9, 3.0, 1.4, 0.2]|        true|
| setosa|[4.7, 3.2, 1.3, 0.2]|       false|
| setosa|[4.6, 3.1, 1.5, 0.2]|       false|
| setosa|[5.0, 3.6, 1.4, 0.2]|        true|
+-------+--------------------+------------+
only showing top 5 rows


In [58]:
df.filter(df.new_features).show(5)


+-------+--------------------+------------+
|species|            features|new_features|
+-------+--------------------+------------+
| setosa|[5.1, 3.5, 1.4, 0.2]|        true|
| setosa|[4.9, 3.0, 1.4, 0.2]|        true|
| setosa|[5.0, 3.6, 1.4, 0.2]|        true|
| setosa|[4.6, 3.4, 1.4, 0.3]|        true|
| setosa|[4.4, 2.9, 1.4, 0.2]|        true|
+-------+--------------------+------------+
only showing top 5 rows

asc

asc returns a sort expression, which can be used as argument of sort functions such as pyspark.sql.DataFrame.sort and pyspark.sql.DataFrame.orderBy


In [82]:
prostate.sort(prostate.lpsa.asc()).show(5)


+------------+-----------+---+------------+---+------------+-------+-----+------------+
|      lcavol|    lweight|age|        lbph|svi|         lcp|gleason|pgg45|        lpsa|
+------------+-----------+---+------------+---+------------+-------+-----+------------+
|-0.579818495|2.769458829| 50|-1.386294361|  0|-1.386294361|      6|    0|-0.430782916|
|-0.994252273|3.319625728| 58|-1.386294361|  0|-1.386294361|      6|    0|-0.162518929|
|-1.203972804|3.282789151| 58|-1.386294361|  0|-1.386294361|      6|    0|-0.162518929|
|-0.510825624|2.691243083| 74|-1.386294361|  0|-1.386294361|      7|   20|-0.162518929|
| 0.751416089|3.432372999| 62|-1.386294361|  0|-1.386294361|      6|    0| 0.371563556|
+------------+-----------+---+------------+---+------------+-------+-----+------------+
only showing top 5 rows


In [83]:
prostate.orderBy(prostate.lpsa.asc()).show(5)


+------------+-----------+---+------------+---+------------+-------+-----+------------+
|      lcavol|    lweight|age|        lbph|svi|         lcp|gleason|pgg45|        lpsa|
+------------+-----------+---+------------+---+------------+-------+-----+------------+
|-0.579818495|2.769458829| 50|-1.386294361|  0|-1.386294361|      6|    0|-0.430782916|
|-0.994252273|3.319625728| 58|-1.386294361|  0|-1.386294361|      6|    0|-0.162518929|
|-1.203972804|3.282789151| 58|-1.386294361|  0|-1.386294361|      6|    0|-0.162518929|
|-0.510825624|2.691243083| 74|-1.386294361|  0|-1.386294361|      7|   20|-0.162518929|
| 0.751416089|3.432372999| 62|-1.386294361|  0|-1.386294361|      6|    0| 0.371563556|
+------------+-----------+---+------------+---+------------+-------+-----+------------+
only showing top 5 rows

  • ascii
  • asin
  • atan
  • atan2

avg


In [91]:
prostate.select(avg(prostate.lpsa)).show()


+------------------+
|         avg(lpsa)|
+------------------+
|2.4783868787422683|
+------------------+

  • base64
  • bin
  • bitwiseNOT
  • broadcast
  • bround

cbrt


In [93]:
prostate.select('lpsa', cbrt(prostate.lpsa)).show(5)


+------------+-------------------+
|        lpsa|         CBRT(lpsa)|
+------------+-------------------+
|-0.430782916|-0.7552420410177275|
|-0.162518929|-0.5457176294010901|
|-0.162518929|-0.5457176294010901|
|-0.162518929|-0.5457176294010901|
| 0.371563556| 0.7189152621521183|
+------------+-------------------+
only showing top 5 rows

ceil


In [94]:
prostate.select('lpsa', ceil(prostate.lpsa)).show(5)


+------------+----------+
|        lpsa|CEIL(lpsa)|
+------------+----------+
|-0.430782916|         0|
|-0.162518929|         0|
|-0.162518929|         0|
|-0.162518929|         0|
| 0.371563556|         1|
+------------+----------+
only showing top 5 rows

coalesce

Return the first column that is not null.


In [95]:
df = spark.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b"))
df.show()


+----+----+
|   a|   b|
+----+----+
|null|null|
|   1|null|
|null|   2|
+----+----+


In [96]:
df.select(coalesce(df.a, df.b)).show()


+--------------+
|coalesce(a, b)|
+--------------+
|          null|
|             1|
|             2|
+--------------+

col

Returns a Column based on the given column name. It can save your some typing when the dataframe is very long.


In [97]:
prostate.show(5)


+------------+-----------+---+------------+---+------------+-------+-----+------------+
|      lcavol|    lweight|age|        lbph|svi|         lcp|gleason|pgg45|        lpsa|
+------------+-----------+---+------------+---+------------+-------+-----+------------+
|-0.579818495|2.769458829| 50|-1.386294361|  0|-1.386294361|      6|    0|-0.430782916|
|-0.994252273|3.319625728| 58|-1.386294361|  0|-1.386294361|      6|    0|-0.162518929|
|-0.510825624|2.691243083| 74|-1.386294361|  0|-1.386294361|      7|   20|-0.162518929|
|-1.203972804|3.282789151| 58|-1.386294361|  0|-1.386294361|      6|    0|-0.162518929|
| 0.751416089|3.432372999| 62|-1.386294361|  0|-1.386294361|      6|    0| 0.371563556|
+------------+-----------+---+------------+---+------------+-------+-----+------------+
only showing top 5 rows


In [98]:
prostate.select(col('lcavol'), col('age')).show(5)


+------------+---+
|      lcavol|age|
+------------+---+
|-0.579818495| 50|
|-0.994252273| 58|
|-0.510825624| 74|
|-1.203972804| 58|
| 0.751416089| 62|
+------------+---+
only showing top 5 rows

collect_list


In [122]:
pdf = pd.DataFrame({
    'x':[1, 2, 2, 3, 4,4,4,4]
})
df = spark.createDataFrame(pdf)
df.show()


+---+
|  x|
+---+
|  1|
|  2|
|  2|
|  3|
|  4|
|  4|
|  4|
|  4|
+---+


In [123]:
df.select(collect_list(df.x)).show()


+--------------------+
|     collect_list(x)|
+--------------------+
|[1, 2, 2, 3, 4, 4...|
+--------------------+

collect_set


In [124]:
df.select(collect_set(df.x)).show()


+--------------+
|collect_set(x)|
+--------------+
|  [1, 2, 3, 4]|
+--------------+

concat


In [144]:
df = spark.createDataFrame([['a', '1'], ['b', '2']], ['x', 'v'])
df.show()


+---+---+
|  x|  v|
+---+---+
|  a|  1|
|  b|  2|
+---+---+


In [145]:
df.select('x', 'v', concat(df.x, df.v).alias('concate(x,v)')).show()


+---+---+------------+
|  x|  v|concate(x,v)|
+---+---+------------+
|  a|  1|          a1|
|  b|  2|          b2|
+---+---+------------+

concat_ws


In [147]:
df.select('x', 'v', concat_ws('_', df.x, df.v).alias('concate(x,v)')).show()


+---+---+------------+
|  x|  v|concate(x,v)|
+---+---+------------+
|  a|  1|         a_1|
|  b|  2|         b_2|
+---+---+------------+

conv

corr


In [148]:
prostate.show(5)


+------------+-----------+---+------------+---+------------+-------+-----+------------+
|      lcavol|    lweight|age|        lbph|svi|         lcp|gleason|pgg45|        lpsa|
+------------+-----------+---+------------+---+------------+-------+-----+------------+
|-0.579818495|2.769458829| 50|-1.386294361|  0|-1.386294361|      6|    0|-0.430782916|
|-0.994252273|3.319625728| 58|-1.386294361|  0|-1.386294361|      6|    0|-0.162518929|
|-0.510825624|2.691243083| 74|-1.386294361|  0|-1.386294361|      7|   20|-0.162518929|
|-1.203972804|3.282789151| 58|-1.386294361|  0|-1.386294361|      6|    0|-0.162518929|
| 0.751416089|3.432372999| 62|-1.386294361|  0|-1.386294361|      6|    0| 0.371563556|
+------------+-----------+---+------------+---+------------+-------+-----+------------+
only showing top 5 rows


In [150]:
prostate.select(corr(prostate.age, prostate.lpsa)).show(5)


+-------------------+
|    corr(age, lpsa)|
+-------------------+
|0.16959284228582772|
+-------------------+

cos

cosh

count


In [151]:
prostate.select(count(prostate.lpsa)).show()


+-----------+
|count(lpsa)|
+-----------+
|         97|
+-----------+

countDistinct


In [152]:
iris.select(count(iris.species)).show()


+--------------+
|count(species)|
+--------------+
|           150|
+--------------+

covar_pop

population covariance: $\frac{1}{n}\sum_{i=1}^n(x_{i} - \bar{x})(y_{i} - \bar{y})$


In [157]:
prostate.select(covar_pop(prostate.age, prostate.lpsa)).show()


+--------------------+
|covar_pop(age, lpsa)|
+--------------------+
|  1.4424746293984458|
+--------------------+

covar_samp

sample covariance: $\frac{1}{n-1}\sum_{i=1}^n(x_{i} - \bar{x})(y_{i} - \bar{y})$


In [158]:
prostate.select(covar_samp(prostate.age, prostate.lpsa)).show()


+---------------------+
|covar_samp(age, lpsa)|
+---------------------+
|   1.4575004067880128|
+---------------------+

create_map


In [159]:
iris.show(5)


+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows


In [163]:
df = iris.select(create_map('species', 'sepal_length'))
df.show(5)


+--------------------------+
|map(species, sepal_length)|
+--------------------------+
|        Map(setosa -> 5.1)|
|        Map(setosa -> 4.9)|
|        Map(setosa -> 4.7)|
|        Map(setosa -> 4.6)|
|        Map(setosa -> 5.0)|
+--------------------------+
only showing top 5 rows


In [166]:
df.dtypes


Out[166]:
[('map(species, sepal_length)', 'map<string,double>')]

cume_dist

current_date


In [22]:
df = spark.createDataFrame([[1],[2],[3],[4]], ['x'])
df.show()


+---+
|  x|
+---+
|  1|
|  2|
|  3|
|  4|
+---+


In [25]:
df.select('x', current_date()).show()


+---+--------------+
|  x|current_date()|
+---+--------------+
|  1|    2017-06-27|
|  2|    2017-06-27|
|  3|    2017-06-27|
|  4|    2017-06-27|
+---+--------------+

current_tmestamp


In [27]:
df.select('x', current_timestamp()).show(truncate=False)


+---+-----------------------+
|x  |current_timestamp()    |
+---+-----------------------+
|1  |2017-06-27 01:18:48.383|
|2  |2017-06-27 01:18:48.383|
|3  |2017-06-27 01:18:48.383|
|4  |2017-06-27 01:18:48.383|
+---+-----------------------+

date_add


In [29]:
df2 = df.select('x', current_date().alias('current_date'))
df2.show(5)


+---+------------+
|  x|current_date|
+---+------------+
|  1|  2017-06-27|
|  2|  2017-06-27|
|  3|  2017-06-27|
|  4|  2017-06-27|
+---+------------+


In [32]:
df2.select('x', 'current_date', date_add(df2.current_date, 10)).show()


+---+------------+--------------------------+
|  x|current_date|date_add(current_date, 10)|
+---+------------+--------------------------+
|  1|  2017-06-27|                2017-07-07|
|  2|  2017-06-27|                2017-07-07|
|  3|  2017-06-27|                2017-07-07|
|  4|  2017-06-27|                2017-07-07|
+---+------------+--------------------------+

date_format


In [33]:
df2.select('x', 'current_date', date_format('current_date', 'MM/dd/yyyy').alias('new_date')).show()


+---+------------+----------+
|  x|current_date|  new_date|
+---+------------+----------+
|  1|  2017-06-27|06/27/2017|
|  2|  2017-06-27|06/27/2017|
|  3|  2017-06-27|06/27/2017|
|  4|  2017-06-27|06/27/2017|
+---+------------+----------+


In [ ]: