In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import * # for defining schema with various datatypes
import pyspark.sql.functions as func # for ETL, data processing on Dataframes
from pyspark.sql.functions import udf
import pandas as pd
import numpy as np
sc = SparkContext() # creating sparkcontext
sql = SQLContext(sc) # creating SQLcontext
data_opath = "../output/csv/"
In [7]:
def getSeasonRatingsDF(ratingType,seasonNum):
filename = ratingType+str(seasonNum)+".csv"
resultRDD = sc.textFile(data_opath + filename)
resultHeader = resultRDD.filter(lambda l : ratingType in l)
resultRDDNoHeader = resultRDD.subtract(resultHeader)
if(ratingType == "batsman"):
resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(',')).map(lambda p: (p[0],p[1], int(p[2]), int(p[3]),\
int(p[4]), int(p[5]), int(p[6]), int(p[7]), int(p[8])))
elif(ratingType == "bowler"):
resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(',')).map(lambda p: (p[0],p[1], int(p[2]), int(p[3]),\
int(p[4]), int(p[5]), int(p[6]), int(p[7]), int(p[8]), int(p[9]), int(p[10])))
elif(ratingType == "fielder"):
resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(',')).map(lambda p: (p[0],p[1], int(p[2]), int(p[3])))
resultDF = sql.createDataFrame(resultTempRDD, resultRDD.first().split(','))
resultDF = resultDF.drop('')
return resultDF.dropDuplicates([ratingType, "overall"]).sort("overall", ascending=0)
def calcStrikeRate(hit0, hit1, hit2, hit3, hit4, hit6):
totalBalls = hit0 + hit1 + hit2 + hit3 + hit4 + hit6
totalRuns = hit1 + 2*hit2 + 3*hit3 + 4*hit4 + 6*hit6
SR = (totalRuns*100.0)/totalBalls
return float(round(SR,2))
def calcEconomyRate(ball0, ball1, ball2, ball3, ball4, ball5, ball6):
totalBalls = ball0 + ball1 + ball2 + ball3 + ball4 + ball5 + ball6
totalOvers = totalBalls/6
partialOver = totalBalls%6
totalOvers = totalOvers+(partialOver/10.0)
totalRuns = ball1 + 2*ball2 + 3*ball3 + 4*ball4 + 5*ball5 + 6*ball6
if(totalOvers):
ER = totalRuns/totalOvers
else:
ER = 0
return float(round(ER,2))
def getStrikeRatesDF(srcDF):
calcStrikeRateUDF = udf(calcStrikeRate, FloatType())
resultDF = srcDF.withColumn("strikerate",calcStrikeRateUDF(srcDF.run0,\
srcDF.run1, srcDF.run2, srcDF.run3, srcDF.run4, srcDF.run6))
return resultDF.sort('overall',ascending=0)
def getEconomyRatesDF(srcDF):
calcEcoRateUDF = udf(calcEconomyRate, FloatType())
resultDF = srcDF.withColumn("economyrate", calcEcoRateUDF(srcDF["run0"],\
srcDF["run1"], srcDF["run2"], srcDF["run3"], srcDF["run4"], srcDF["run5"], srcDF["run6"],))
return resultDF.sort("overall",ascending=0)
def calcBatsmanOverall(overall, rate):
overallScore = long(overall + 2*rate)
return overallScore
def calcBowlerOverall(overall, rate):
overallScore = long(overall - 100*rate)
if(overallScore < 0):
overallScore = 0
return overallScore
def updateOverallDF(srcDF, col1, col2, ratingType):
if( ratingType == "batsman"):
calcOverallUDF = udf(calcBatsmanOverall, LongType())
else:
calcOverallUDF = udf(calcBowlerOverall, LongType())
resultDF = srcDF.withColumn(col1, calcOverallUDF(srcDF[col1], srcDF[col2]))
return resultDF.sort([col1,col2],ascending=0)
col1 = "overall"
col2 = "economyrate"
ratingType = "bowler"
for i in range(2008,2016+1):
resDF1 = getSeasonRatingsDF(ratingType, i)
if(ratingType == "batsman"):
resDF2 = getStrikeRatesDF(resDF1)
resDF3 = updateOverallDF(resDF2, col1, col2, ratingType)
resPDF3 = resDF3.toPandas()
for j in range(resPDF3.count()[0]):
resPDF3[col2][j] = round(resPDF3[col2][j],2)
elif(ratingType == "bowler"):
resDF2 = getEconomyRatesDF(resDF1)
resDF3 = updateOverallDF(resDF2, col1, col2, ratingType)
resPDF3 = resDF3.toPandas()
for j in range(resPDF3.count()[0]):
resPDF3[col2][j] = round(resPDF3[col2][j],2)
else:
resDF3 = resDF1
resPDF3 = resDF3.toPandas()
filename = ratingType+str(i)+"(2).csv"
resPDF3.to_csv(data_opath + filename)
In [11]:
def getSeasonRatingsDF2():
resultRDD = sc.textFile(data_opath+"batsman.csv")
resultHeader = resultRDD.filter(lambda l : "batsman" in l)
resultRDDNoHeader = resultRDD.subtract(resultHeader)
resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(','))\
.map(lambda p: (p[0],p[1], int(p[2]), int(p[3]),\
int(p[4]), int(p[5]), int(p[6]), int(p[7]), int(p[8])))
resultDF = sql.createDataFrame(resultTempRDD, resultRDD.first().split(','))
resultDF = resultDF.drop('')
return resultDF.sort('overall',ascending=0)
resDF1 = getSeasonRatingsDF2()
resDF2 = getStrikeRatesDF(resDF1)
resDF3 = updateBatsmanOverallDF(resDF2, col1, col2)
resPDF3 = resDF3.toPandas()
for j in range(resPDF3.count()[0]):
resPDF3['strikerate'][j] = round(resPDF3['strikerate'][j],2)
resPDF3.to_csv(ratingType+str(2008)+"_"+str(2016)+".csv")
resPDF3 = resDF3.toPandas()
for j in range(resPDF3.count()[0]):
resPDF3['strikerate'][j] = round(resPDF3['strikerate'][j],2)
filename = ratingType+str(2008)+"_"+str(2016)+".csv"
resPDF3.to_csv(data_opath + filename)
# # resDF3.write.c]sv(ratingType+str(i)+"(2).c
In [ ]:
def getSeasonRatingsDF2():
resultRDD = sc.textFile(data_opath+".csv")
resultHeader = resultRDD.filter(lambda l : "batsman" in l)
resultRDDNoHeader = resultRDD.subtract(resultHeader)
resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(','))\
.map(lambda p: (p[0],p[1], int(p[2]), int(p[3]),\
int(p[4]), int(p[5]), int(p[6]), int(p[7]), int(p[8])))
resultDF = sql.createDataFrame(resultTempRDD, resultRDD.first().split(','))
resultDF = resultDF.drop('')
return resultDF.sort('overall',ascending=0)
resDF1 = getSeasonRatingsDF2()
resDF2 = getStrikeRatesDF(resDF1)
resDF3 = updateBatsmanOverallDF(resDF2, col1, col2)
resPDF3 = resDF3.toPandas()
for j in range(resPDF3.count()[0]):
resPDF3['strikerate'][j] = round(resPDF3['strikerate'][j],2)
resPDF3.to_csv(ratingType+str(2008)+"_"+str(2016)+".csv")
resPDF3 = resDF3.toPandas()
for j in range(resPDF3.count()[0]):
resPDF3['strikerate'][j] = round(resPDF3['strikerate'][j],2)
filename = ratingType+str(2008)+"_"+str(2016)+".csv"
resPDF3.to_csv(data_opath + filename)
# # resDF3.write.c]sv(ratingType+str(i)+"(2).c
In [11]:
def getSeasonRatingsDF3(ratingType):
filename = ratingType+".csv"
resultRDD = sc.textFile(data_opath + filename)
resultHeader = resultRDD.filter(lambda l : ratingType in l)
resultRDDNoHeader = resultRDD.subtract(resultHeader)
if(ratingType == "batsman"):
resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(',')).map(lambda p: (p[0],p[1], int(p[2]), int(p[3]),\
int(p[4]), int(p[5]), int(p[6]), int(p[7]), int(p[8])))
elif(ratingType == "bowler"):
resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(',')).map(lambda p: (p[0],p[1], int(p[2]), int(p[3]),\
int(p[4]), int(p[5]), int(p[6]), int(p[7]), int(p[8]), int(p[9]), int(p[10])))
elif(ratingType == "fielder"):
resultTempRDD = resultRDDNoHeader.map(lambda k : k.split(',')).map(lambda p: (p[0],p[1], int(p[2]), int(p[3])))
resultDF = sql.createDataFrame(resultTempRDD, resultRDD.first().split(','))
resultDF = resultDF.drop('')
return resultDF.dropDuplicates([ratingType, "overall"]).sort("overall", ascending=0)
ratingType="fielder"
resDF1 = getSeasonRatingsDF3(ratingType)
if(ratingType == "batsman"):
resDF2 = getStrikeRatesDF(resDF1)
resDF3 = updateOverallDF(resDF2, col1, col2, ratingType)
resPDF3 = resDF3.toPandas()
for j in range(resPDF3.count()[0]):
resPDF3[col2][j] = round(resPDF3[col2][j],2)
elif(ratingType == "bowler"):
resDF2 = getEconomyRatesDF(resDF1)
resDF3 = updateOverallDF(resDF2, col1, col2, ratingType)
resPDF3 = resDF3.toPandas()
for j in range(resPDF3.count()[0]):
resPDF3[col2][j] = round(resPDF3[col2][j],2)
else:
resDF3 = resDF1
resPDF3 = resDF3.toPandas()
filename = ratingType+str(2008)+"_"+str(2016)+".csv"
resPDF3.to_csv(data_opath + filename)