In [1]:
import pyspark
from pyspark import SparkContext 
from pyspark.sql import SQLContext
from pyspark.sql.types import *         # for defining schema with various datatypes
import pyspark.sql.functions as func    # for ETL, data processing on Dataframes
from pyspark.sql.functions import udf

import pandas as pd
import numpy as np

sc = SparkContext()        # creating sparkcontext
sql = SQLContext(sc)       # creating SQLcontext

data_opath = "../output/csv/"

In [7]:
def getSeasonRatingsDF(ratingType,seasonNum):
    filename = ratingType+str(seasonNum)+".csv"
    resultRDD = sc.textFile(data_opath + filename)
    resultHeader = resultRDD.filter(lambda l : ratingType in l)
    resultRDDNoHeader = resultRDD.subtract(resultHeader)
    if(ratingType == "batsman"):
        resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(',')).map(lambda p: (p[0],p[1], int(p[2]), int(p[3]),\
                                                        int(p[4]), int(p[5]), int(p[6]), int(p[7]), int(p[8])))
    elif(ratingType == "bowler"):
        resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(',')).map(lambda p: (p[0],p[1], int(p[2]), int(p[3]),\
                                                        int(p[4]), int(p[5]), int(p[6]), int(p[7]), int(p[8]), int(p[9]), int(p[10])))
    elif(ratingType == "fielder"):
        resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(',')).map(lambda p: (p[0],p[1], int(p[2]), int(p[3])))
    resultDF = sql.createDataFrame(resultTempRDD, resultRDD.first().split(','))
    resultDF = resultDF.drop('')
    return resultDF.dropDuplicates([ratingType, "overall"]).sort("overall", ascending=0)


def calcStrikeRate(hit0, hit1, hit2, hit3, hit4, hit6):
    totalBalls = hit0 + hit1 + hit2 + hit3 + hit4 + hit6
    totalRuns = hit1 + 2*hit2 + 3*hit3 + 4*hit4 + 6*hit6
    SR = (totalRuns*100.0)/totalBalls
    return float(round(SR,2))


def calcEconomyRate(ball0, ball1, ball2, ball3, ball4, ball5, ball6):
    totalBalls = ball0 + ball1 + ball2 + ball3 + ball4 + ball5 + ball6
    totalOvers = totalBalls/6
    partialOver = totalBalls%6
    totalOvers = totalOvers+(partialOver/10.0)
    totalRuns = ball1 + 2*ball2 + 3*ball3 + 4*ball4 + 5*ball5 + 6*ball6
    if(totalOvers):
        ER = totalRuns/totalOvers
    else:
        ER = 0
    return float(round(ER,2))


def getStrikeRatesDF(srcDF):
    calcStrikeRateUDF = udf(calcStrikeRate, FloatType())
    resultDF = srcDF.withColumn("strikerate",calcStrikeRateUDF(srcDF.run0,\
                srcDF.run1, srcDF.run2, srcDF.run3, srcDF.run4, srcDF.run6))
    return resultDF.sort('overall',ascending=0)


def getEconomyRatesDF(srcDF):
    calcEcoRateUDF = udf(calcEconomyRate, FloatType())
    resultDF = srcDF.withColumn("economyrate", calcEcoRateUDF(srcDF["run0"],\
                srcDF["run1"], srcDF["run2"], srcDF["run3"], srcDF["run4"], srcDF["run5"], srcDF["run6"],))
    return resultDF.sort("overall",ascending=0)
    
    
def calcBatsmanOverall(overall, rate):
    overallScore = long(overall + 2*rate)
    return overallScore


def calcBowlerOverall(overall, rate):
    overallScore = long(overall - 100*rate)
    if(overallScore < 0):
        overallScore = 0
    return overallScore


def updateOverallDF(srcDF, col1, col2, ratingType):
    if( ratingType == "batsman"):
        calcOverallUDF = udf(calcBatsmanOverall, LongType())
    else:
        calcOverallUDF = udf(calcBowlerOverall, LongType())

    resultDF = srcDF.withColumn(col1, calcOverallUDF(srcDF[col1], srcDF[col2]))
    return resultDF.sort([col1,col2],ascending=0)


col1 = "overall"
col2 = "economyrate"
ratingType = "bowler"

for i in range(2008,2016+1):
    resDF1 = getSeasonRatingsDF(ratingType, i)
    if(ratingType == "batsman"):
        resDF2 = getStrikeRatesDF(resDF1)
        resDF3 = updateOverallDF(resDF2, col1, col2, ratingType)
        resPDF3 = resDF3.toPandas()
        for j in range(resPDF3.count()[0]):
            resPDF3[col2][j] = round(resPDF3[col2][j],2)
    elif(ratingType == "bowler"):
        resDF2 = getEconomyRatesDF(resDF1)
        resDF3 = updateOverallDF(resDF2, col1, col2, ratingType)
        resPDF3 = resDF3.toPandas()
        for j in range(resPDF3.count()[0]):
            resPDF3[col2][j] = round(resPDF3[col2][j],2)
    else:
        resDF3 = resDF1
        resPDF3 = resDF3.toPandas()

    filename = ratingType+str(i)+"(2).csv"
    resPDF3.to_csv(data_opath + filename)


/home/vivek/anaconda2/lib/python2.7/site-packages/ipykernel_launcher.py:88: SettingWithCopyWarning: 
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy

In [11]:
def getSeasonRatingsDF2():
    resultRDD = sc.textFile(data_opath+"batsman.csv")
    resultHeader = resultRDD.filter(lambda l : "batsman" in l)
    resultRDDNoHeader = resultRDD.subtract(resultHeader)
    resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(','))\
                    .map(lambda p: (p[0],p[1], int(p[2]), int(p[3]),\
                    int(p[4]), int(p[5]), int(p[6]), int(p[7]), int(p[8])))
    resultDF = sql.createDataFrame(resultTempRDD, resultRDD.first().split(','))
    resultDF = resultDF.drop('')
    return resultDF.sort('overall',ascending=0)

resDF1 = getSeasonRatingsDF2()
resDF2 = getStrikeRatesDF(resDF1)
resDF3 = updateBatsmanOverallDF(resDF2, col1, col2)
resPDF3 = resDF3.toPandas()
for j in range(resPDF3.count()[0]):
    resPDF3['strikerate'][j] = round(resPDF3['strikerate'][j],2)
resPDF3.to_csv(ratingType+str(2008)+"_"+str(2016)+".csv")
resPDF3 = resDF3.toPandas()
for j in range(resPDF3.count()[0]):
    resPDF3['strikerate'][j] = round(resPDF3['strikerate'][j],2)

filename = ratingType+str(2008)+"_"+str(2016)+".csv"
resPDF3.to_csv(data_opath + filename)
# #     resDF3.write.c]sv(ratingType+str(i)+"(2).c


/home/vivek/anaconda2/lib/python2.7/site-packages/ipykernel_launcher.py:16: SettingWithCopyWarning: 
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  app.launch_new_instance()
/home/vivek/anaconda2/lib/python2.7/site-packages/ipykernel_launcher.py:20: SettingWithCopyWarning: 
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy

In [ ]:
def getSeasonRatingsDF2():
    resultRDD = sc.textFile(data_opath+".csv")
    resultHeader = resultRDD.filter(lambda l : "batsman" in l)
    resultRDDNoHeader = resultRDD.subtract(resultHeader)
    resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(','))\
                    .map(lambda p: (p[0],p[1], int(p[2]), int(p[3]),\
                    int(p[4]), int(p[5]), int(p[6]), int(p[7]), int(p[8])))
    resultDF = sql.createDataFrame(resultTempRDD, resultRDD.first().split(','))
    resultDF = resultDF.drop('')
    return resultDF.sort('overall',ascending=0)

resDF1 = getSeasonRatingsDF2()
resDF2 = getStrikeRatesDF(resDF1)
resDF3 = updateBatsmanOverallDF(resDF2, col1, col2)
resPDF3 = resDF3.toPandas()
for j in range(resPDF3.count()[0]):
    resPDF3['strikerate'][j] = round(resPDF3['strikerate'][j],2)
resPDF3.to_csv(ratingType+str(2008)+"_"+str(2016)+".csv")
resPDF3 = resDF3.toPandas()
for j in range(resPDF3.count()[0]):
    resPDF3['strikerate'][j] = round(resPDF3['strikerate'][j],2)

filename = ratingType+str(2008)+"_"+str(2016)+".csv"
resPDF3.to_csv(data_opath + filename)
# #     resDF3.write.c]sv(ratingType+str(i)+"(2).c

In [11]:
def getSeasonRatingsDF3(ratingType):
    filename = ratingType+".csv"
    resultRDD = sc.textFile(data_opath + filename)
    resultHeader = resultRDD.filter(lambda l : ratingType in l)
    resultRDDNoHeader = resultRDD.subtract(resultHeader)
    if(ratingType == "batsman"):
        resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(',')).map(lambda p: (p[0],p[1], int(p[2]), int(p[3]),\
                                                        int(p[4]), int(p[5]), int(p[6]), int(p[7]), int(p[8])))
    elif(ratingType == "bowler"):
        resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(',')).map(lambda p: (p[0],p[1], int(p[2]), int(p[3]),\
                                                        int(p[4]), int(p[5]), int(p[6]), int(p[7]), int(p[8]), int(p[9]), int(p[10])))
    elif(ratingType == "fielder"):
        resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(',')).map(lambda p: (p[0],p[1], int(p[2]), int(p[3])))
    resultDF = sql.createDataFrame(resultTempRDD, resultRDD.first().split(','))
    resultDF = resultDF.drop('')
    return resultDF.dropDuplicates([ratingType, "overall"]).sort("overall", ascending=0)


ratingType="fielder"
resDF1 = getSeasonRatingsDF3(ratingType)
if(ratingType == "batsman"):
    resDF2 = getStrikeRatesDF(resDF1)
    resDF3 = updateOverallDF(resDF2, col1, col2, ratingType)
    resPDF3 = resDF3.toPandas()
    for j in range(resPDF3.count()[0]):
        resPDF3[col2][j] = round(resPDF3[col2][j],2)
elif(ratingType == "bowler"):
    resDF2 = getEconomyRatesDF(resDF1)
    resDF3 = updateOverallDF(resDF2, col1, col2, ratingType)
    resPDF3 = resDF3.toPandas()
    for j in range(resPDF3.count()[0]):
        resPDF3[col2][j] = round(resPDF3[col2][j],2)
else:
    resDF3 = resDF1
    resPDF3 = resDF3.toPandas()

filename = ratingType+str(2008)+"_"+str(2016)+".csv"
resPDF3.to_csv(data_opath + filename)