In [1]:
from pyspark import SparkContext
sc = SparkContext(master = 'local')
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
In [2]:
iris = spark.read.csv('data/iris.csv', header=True, inferSchema=True)
iris.show(5)
In [3]:
prostate = spark.read.csv('data/prostate.csv', header=True, inferSchema=True)
prostate.show(5)
In [23]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import numpy as np
import pandas as pd
In [6]:
prostate.select('lpsa', abs(prostate.lpsa).alias('abs(lpsa)')).show(5)
In [35]:
pdf = pd.DataFrame({
'x': list(-np.random.rand(5)) + list(np.random.rand(5))
})
df = spark.createDataFrame(pdf)
df.show(5)
In [36]:
df.select('x', acos(df.x)).show(5)
In [37]:
import datetime
In [43]:
base = datetime.date.today()
date_list = [base + datetime.timedelta(days=x) for x in list(range(0, 10))*10]
pdf = pd.DataFrame({
'dates': date_list
})
df = spark.createDataFrame(pdf)
df.show(5)
In [44]:
df.select('dates', add_months(df.dates, 2).alias('new_dates')).show(5)
In [45]:
prostate.select(approx_count_distinct(prostate.gleason)).show(5)
In [48]:
iris.select(approx_count_distinct(iris.species)).show(5)
In [49]:
iris.show(5)
In [54]:
df_arr = iris.select('species', array(['sepal_length', 'sepal_width', 'petal_length', 'petal_width']).alias('features'))
df_arr.show(5)
In [57]:
df = df_arr.select('species', 'features', array_contains(df_arr.features, 1.4).alias('new_features'))
df.show(5)
In [58]:
df.filter(df.new_features).show(5)
In [82]:
prostate.sort(prostate.lpsa.asc()).show(5)
In [83]:
prostate.orderBy(prostate.lpsa.asc()).show(5)
asciiasinatanatan2
In [91]:
prostate.select(avg(prostate.lpsa)).show()
base64binbitwiseNOTbroadcastbround
In [93]:
prostate.select('lpsa', cbrt(prostate.lpsa)).show(5)
In [94]:
prostate.select('lpsa', ceil(prostate.lpsa)).show(5)
In [95]:
df = spark.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b"))
df.show()
In [96]:
df.select(coalesce(df.a, df.b)).show()
In [97]:
prostate.show(5)
In [98]:
prostate.select(col('lcavol'), col('age')).show(5)
In [122]:
pdf = pd.DataFrame({
'x':[1, 2, 2, 3, 4,4,4,4]
})
df = spark.createDataFrame(pdf)
df.show()
In [123]:
df.select(collect_list(df.x)).show()
In [124]:
df.select(collect_set(df.x)).show()
In [144]:
df = spark.createDataFrame([['a', '1'], ['b', '2']], ['x', 'v'])
df.show()
In [145]:
df.select('x', 'v', concat(df.x, df.v).alias('concate(x,v)')).show()
In [147]:
df.select('x', 'v', concat_ws('_', df.x, df.v).alias('concate(x,v)')).show()
In [148]:
prostate.show(5)
In [150]:
prostate.select(corr(prostate.age, prostate.lpsa)).show(5)
In [151]:
prostate.select(count(prostate.lpsa)).show()
In [152]:
iris.select(count(iris.species)).show()
In [157]:
prostate.select(covar_pop(prostate.age, prostate.lpsa)).show()
In [158]:
prostate.select(covar_samp(prostate.age, prostate.lpsa)).show()
In [159]:
iris.show(5)
In [163]:
df = iris.select(create_map('species', 'sepal_length'))
df.show(5)
In [166]:
df.dtypes
Out[166]:
In [22]:
df = spark.createDataFrame([[1],[2],[3],[4]], ['x'])
df.show()
In [25]:
df.select('x', current_date()).show()
In [27]:
df.select('x', current_timestamp()).show(truncate=False)
In [29]:
df2 = df.select('x', current_date().alias('current_date'))
df2.show(5)
In [32]:
df2.select('x', 'current_date', date_add(df2.current_date, 10)).show()
In [33]:
df2.select('x', 'current_date', date_format('current_date', 'MM/dd/yyyy').alias('new_date')).show()
In [ ]: