In [12]:
# Generate Random Data
import itertools
import random
students = ['John', 'Mike','Matt']
subjects = ['Math', 'Sci', 'Geography', 'History']
random.seed(1)
data = []
 
for (student, subject) in itertools.product(students, subjects):
    data.append((student, subject, random.randint(0, 100)))
 
# Create Schema Object
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
            StructField("student", StringType(), nullable=False),
            StructField("subject", StringType(), nullable=False),
            StructField("score", IntegerType(), nullable=False)
    ])
 
# Create DataFrame 
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
rdd = sc.parallelize(data)
df = sqlContext.createDataFrame(rdd, schema)
 
# Define udf
from pyspark.sql.functions import udf
def scoreToCategory(score):
    if score >= 80: return 'A'
    elif score >= 60: return 'B'
    elif score >= 35: return 'C'
    else: return 'D'
 
udfScoreToCategory=udf(scoreToCategory, StringType())
df.withColumn("category", udfScoreToCategory("score")).show(10)


+-------+---------+-----+--------+
|student|  subject|score|category|
+-------+---------+-----+--------+
|   John|     Math|   13|       D|
|   John|      Sci|   85|       A|
|   John|Geography|   77|       B|
|   John|  History|   25|       D|
|   Mike|     Math|   50|       C|
|   Mike|      Sci|   45|       C|
|   Mike|Geography|   65|       B|
|   Mike|  History|   79|       B|
|   Matt|     Math|    9|       D|
|   Matt|      Sci|    2|       D|
+-------+---------+-----+--------+
only showing top 10 rows


In [5]:
# Generate Random Data
import itertools
import random
students = ['John', 'Mike','Matt']
subjects = ['Math', 'Sci', 'Geography', 'History']
random.seed(1)
data = []
 
for (student, subject) in itertools.product(students, subjects):
    data.append((student, subject, random.randint(0, 100)))
 
# Create Schema Object
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
            StructField("student", StringType(), nullable=False),
            StructField("subject", StringType(), nullable=False),
            StructField("score", IntegerType(), nullable=False)
    ])

In [6]:
# Create DataFrame 
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
rdd = sc.parallelize(data)
df = sqlContext.createDataFrame(rdd, schema)

In [7]:
rdd


Out[7]:
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475

In [8]:
# Define udf
from pyspark.sql.functions import udf
def scoreToCategory(score):
    if score >= 80: return 'A'
    elif score >= 60: return 'B'
    elif score >= 35: return 'C'
    else: return 'D'

In [11]:
udfScoreToCategory=udf(scoreToCategory, StringType())
df.withColumn("category", udfScoreToCategory("score")).show(10)


+-------+---------+-----+--------+
|student|  subject|score|category|
+-------+---------+-----+--------+
|   John|     Math|   13|       D|
|   John|      Sci|   85|       A|
|   John|Geography|   77|       B|
|   John|  History|   25|       D|
|   Mike|     Math|   50|       C|
|   Mike|      Sci|   45|       C|
|   Mike|Geography|   65|       B|
|   Mike|  History|   79|       B|
|   Matt|     Math|    9|       D|
|   Matt|      Sci|    2|       D|
+-------+---------+-----+--------+
only showing top 10 rows


In [10]:
udfScoreToCategory


Out[10]:
<pyspark.sql.functions.UserDefinedFunction at 0x7f0688b55350>

In [ ]: