In [12]:
# Generate Random Data
import itertools
import random
students = ['John', 'Mike','Matt']
subjects = ['Math', 'Sci', 'Geography', 'History']
random.seed(1)
data = []
for (student, subject) in itertools.product(students, subjects):
data.append((student, subject, random.randint(0, 100)))
# Create Schema Object
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("student", StringType(), nullable=False),
StructField("subject", StringType(), nullable=False),
StructField("score", IntegerType(), nullable=False)
])
# Create DataFrame
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
rdd = sc.parallelize(data)
df = sqlContext.createDataFrame(rdd, schema)
# Define udf
from pyspark.sql.functions import udf
def scoreToCategory(score):
if score >= 80: return 'A'
elif score >= 60: return 'B'
elif score >= 35: return 'C'
else: return 'D'
udfScoreToCategory=udf(scoreToCategory, StringType())
df.withColumn("category", udfScoreToCategory("score")).show(10)
In [5]:
# Generate Random Data
import itertools
import random
students = ['John', 'Mike','Matt']
subjects = ['Math', 'Sci', 'Geography', 'History']
random.seed(1)
data = []
for (student, subject) in itertools.product(students, subjects):
data.append((student, subject, random.randint(0, 100)))
# Create Schema Object
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("student", StringType(), nullable=False),
StructField("subject", StringType(), nullable=False),
StructField("score", IntegerType(), nullable=False)
])
In [6]:
# Create DataFrame
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
rdd = sc.parallelize(data)
df = sqlContext.createDataFrame(rdd, schema)
In [7]:
rdd
Out[7]:
In [8]:
# Define udf
from pyspark.sql.functions import udf
def scoreToCategory(score):
if score >= 80: return 'A'
elif score >= 60: return 'B'
elif score >= 35: return 'C'
else: return 'D'
In [11]:
udfScoreToCategory=udf(scoreToCategory, StringType())
df.withColumn("category", udfScoreToCategory("score")).show(10)
In [10]:
udfScoreToCategory
Out[10]:
In [ ]: