In [1]:
import pyspark
from pyspark import SparkContext 
from pyspark.sql import SQLContext
from pyspark.sql.types import *         # for defining schema with various datatypes
import pyspark.sql.functions as func    # for ETL, data processing on Dataframes
from pyspark.sql.functions import udf

import pandas as pd
import numpy as np

sc = SparkContext()        # creating sparkcontext
sql = SQLContext(sc)       # creating SQLcontext

data_opath = "../output/csv/"

In [27]:
def getDF(ratingType, filename):
    resultRDD = sc.textFile(data_opath + filename)
    resultHeader = resultRDD.filter(lambda l : ratingType in l)
    resultRDDNoHeader = resultRDD.subtract(resultHeader)
    if(ratingType == "batsman"):
        resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(',')).map(lambda p: (p[0],p[1], int(p[2]), int(p[3]),int(p[4]), int(p[5]), int(p[6]),int(p[7]), int(p[8]), round(float(p[9]),2)))
    elif(ratingType == "bowler"):
        resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(',')).map(lambda p: (p[0],p[1], int(p[2]), int(p[3]),int(p[4]), int(p[5]), int(p[6]),int(p[7]), int(p[8]), int(p[9]), int(p[10]), round(float(p[11]),2)))
    elif(ratingType == "fielder"):
        resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(',')).map(lambda p: (p[0],p[1], int(p[2]), int(p[3])))
   
    resultDF = sql.createDataFrame(resultTempRDD, resultRDD.first().split(','))
    resultDF = resultDF.drop('')
    resultDF = resultDF.dropDuplicates([ratingType])
    return resultDF


def getInitDF(ratingType, seasonLBound, seasonUBound):
    filename = ratingType+str(seasonLBound)+"_"+str(seasonUBound)+".csv"
    resultDF = getDF(ratingType, filename)
    resultDF = resultDF.select(ratingType)
    return resultDF.sort(ratingType)


def getSeasonRangeRatingsDF(ratingType,seasonNum):
    filename = ratingType+str(seasonNum)+".csv"
    resultDF = getDF(ratingType, filename)
    resultDF = resultDF.select(ratingType, "overall")
    return resultDF.sort(ratingType)


def fixNoneValues(score):
    if(score == None):
        return 0
    else:
        return score


def getPreSum(currOverall, prevPreSum):
    return currOverall+prevPreSum


def joinOverallDF(aDF, bDF, colName):
    return aDF.join(bDF, ratingType, "left").withColumnRenamed("overall", colName)


def fixJoinOverallDF(srcDF, colName):
    fixNoneValuesUDF = udf(fixNoneValues, LongType())
    return srcDF.withColumn(colName,fixNoneValuesUDF(srcDF[colName]))


def calcPreSumOverall(srcDF, seasonLBound, seasonNum, ratingType):
    getPreSumUDF = udf(getPreSum, LongType())
    idx = seasonNum - seasonLBound + 1
    if(1 == idx):
        return srcDF
    else:
        currColName = ratingType+"presumoverall"+str(seasonNum)
        prevColName = ratingType+"presumoverall"+str(seasonNum-1)
        srcDF = srcDF.withColumn(currColName, getPreSumUDF(srcDF[currColName], srcDF[prevColName]))
        return srcDF

In [28]:
ratingType = "fielder"
seasonLBound = 2008
seasonUBound = 2016
finalDF = getInitDF(ratingType, seasonLBound, seasonUBound)

for i in range(seasonLBound, seasonUBound+1):
    srcDF = getSeasonRangeRatingsDF(ratingType, i)
    colName = ratingType+"presumoverall"+str(i)
    finalDF = joinOverallDF(finalDF, srcDF, colName)
    finalDF = fixJoinOverallDF(finalDF, colName)
    finalDF = calcPreSumOverall(finalDF, seasonLBound, i, ratingType)

finalDF = finalDF.sort(ratingType+"presumoverall"+str(seasonUBound), ascending=0)
filename = ratingType+"preSumOverall"+str(seasonLBound)+"_"+str(seasonUBound)+".csv"
finalDF.toPandas().to_csv(data_opath + filename)