In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import * # for defining schema with various datatypes
import pyspark.sql.functions as func # for ETL, data processing on Dataframes
from pyspark.sql.functions import udf
import pandas as pd
import numpy as np
sc = SparkContext() # creating sparkcontext
sql = SQLContext(sc) # creating SQLcontext
data_opath = "../output/csv/"
In [27]:
def getDF(ratingType, filename):
resultRDD = sc.textFile(data_opath + filename)
resultHeader = resultRDD.filter(lambda l : ratingType in l)
resultRDDNoHeader = resultRDD.subtract(resultHeader)
if(ratingType == "batsman"):
resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(',')).map(lambda p: (p[0],p[1], int(p[2]), int(p[3]),int(p[4]), int(p[5]), int(p[6]),int(p[7]), int(p[8]), round(float(p[9]),2)))
elif(ratingType == "bowler"):
resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(',')).map(lambda p: (p[0],p[1], int(p[2]), int(p[3]),int(p[4]), int(p[5]), int(p[6]),int(p[7]), int(p[8]), int(p[9]), int(p[10]), round(float(p[11]),2)))
elif(ratingType == "fielder"):
resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(',')).map(lambda p: (p[0],p[1], int(p[2]), int(p[3])))
resultDF = sql.createDataFrame(resultTempRDD, resultRDD.first().split(','))
resultDF = resultDF.drop('')
resultDF = resultDF.dropDuplicates([ratingType])
return resultDF
def getInitDF(ratingType, seasonLBound, seasonUBound):
filename = ratingType+str(seasonLBound)+"_"+str(seasonUBound)+".csv"
resultDF = getDF(ratingType, filename)
resultDF = resultDF.select(ratingType)
return resultDF.sort(ratingType)
def getSeasonRangeRatingsDF(ratingType,seasonNum):
filename = ratingType+str(seasonNum)+".csv"
resultDF = getDF(ratingType, filename)
resultDF = resultDF.select(ratingType, "overall")
return resultDF.sort(ratingType)
def fixNoneValues(score):
if(score == None):
return 0
else:
return score
def getPreSum(currOverall, prevPreSum):
return currOverall+prevPreSum
def joinOverallDF(aDF, bDF, colName):
return aDF.join(bDF, ratingType, "left").withColumnRenamed("overall", colName)
def fixJoinOverallDF(srcDF, colName):
fixNoneValuesUDF = udf(fixNoneValues, LongType())
return srcDF.withColumn(colName,fixNoneValuesUDF(srcDF[colName]))
def calcPreSumOverall(srcDF, seasonLBound, seasonNum, ratingType):
getPreSumUDF = udf(getPreSum, LongType())
idx = seasonNum - seasonLBound + 1
if(1 == idx):
return srcDF
else:
currColName = ratingType+"presumoverall"+str(seasonNum)
prevColName = ratingType+"presumoverall"+str(seasonNum-1)
srcDF = srcDF.withColumn(currColName, getPreSumUDF(srcDF[currColName], srcDF[prevColName]))
return srcDF
In [28]:
ratingType = "fielder"
seasonLBound = 2008
seasonUBound = 2016
finalDF = getInitDF(ratingType, seasonLBound, seasonUBound)
for i in range(seasonLBound, seasonUBound+1):
srcDF = getSeasonRangeRatingsDF(ratingType, i)
colName = ratingType+"presumoverall"+str(i)
finalDF = joinOverallDF(finalDF, srcDF, colName)
finalDF = fixJoinOverallDF(finalDF, colName)
finalDF = calcPreSumOverall(finalDF, seasonLBound, i, ratingType)
finalDF = finalDF.sort(ratingType+"presumoverall"+str(seasonUBound), ascending=0)
filename = ratingType+"preSumOverall"+str(seasonLBound)+"_"+str(seasonUBound)+".csv"
finalDF.toPandas().to_csv(data_opath + filename)