In [1]:
# Import findspark
import findspark
# Initialize and provide path
findspark.init("/home/henrique/Downloads/spark")
# Or use this alternative
#findspark.init()
In [2]:
# Import SparkSession
from pyspark.sql import SparkSession
# Build the SparkSession
spark = SparkSession.builder \
.master("local") \
.appName("Linear Regression Model") \
.config("spark.executor.memory", "1gb") \
.getOrCreate()
sc = spark.sparkContext
In [3]:
import pyspark
sql = pyspark.sql.SQLContext(sc)
df = (sql.read
.format("com.databricks.spark.csv") # Choose the bib to oad csv
.option("header", "true") # Use the first line as header
.option("inferSchema", "true") # Try to infer data type - if this is not set all the typer will be str
.load("games.csv")) # File name
In [4]:
df
Out[4]:
In [5]:
excludes = [
't1_ban1',
't1_ban2',
't1_ban3',
't1_ban4',
't1_ban5',
't1_champ1_sum1',
't1_champ1_sum2',
't1_champ1id',
't1_champ2_sum1',
't1_champ2_sum2',
't1_champ2id',
't1_champ3_sum1',
't1_champ3_sum2',
't1_champ3id',
't1_champ4_sum1',
't1_champ4_sum2',
't1_champ4id',
't1_champ5_sum1',
't1_champ5_sum2',
't1_champ5id',
't2_ban1',
't2_ban2',
't2_ban3',
't2_ban4',
't2_ban5',
't2_champ1_sum1',
't2_champ1_sum2',
't2_champ1id',
't2_champ2_sum1',
't2_champ2_sum2',
't2_champ2id',
't2_champ3_sum1',
't2_champ3_sum2',
't2_champ3id',
't2_champ4_sum1',
't2_champ4_sum2',
't2_champ4id',
't2_champ5_sum1',
't2_champ5_sum2',
't2_champ5id']
In [6]:
for exclude in excludes:
df = df.drop(exclude)
print(df.columns)
In [7]:
df.printSchema()
In [8]:
df.dtypes
Out[8]:
In [9]:
df.select('gameId','t1_inhibitorKills','t2_towerKills','winner').show(15)
In [ ]:
In [10]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import PCA
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import GaussianMixture
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
In [11]:
# Renaming winner to label
df = df.withColumnRenamed("winner","label")
df.printSchema()
In [12]:
feat_fields = ['gameDuration',
'seasonId',
'firstBlood',
'firstTower',
'firstInhibitor',
'firstBaron',
'firstDragon',
'firstRiftHerald',
't1_towerKills',
't1_inhibitorKills',
't2_baronKills',
't1_dragonKills',
't1_riftHeraldKills',
't2_towerKills',
't2_inhibitorKills',
't2_baronKills',
't2_dragonKills',
't2_riftHeraldKills']
In [13]:
assembler = VectorAssembler(inputCols=feat_fields, outputCol="features")
output = assembler.transform(df)
In [14]:
# The df will contain all the old Coluns and a new one features
# which will contain features we want
output.select('gameDuration','seasonId', 'features').show(20)
In [26]:
(trainingData, testData) = output.randomSplit([0.7, 0.3], seed = 1234)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))
In [27]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0.8, family = "binomial")
lrModel = lr.fit(trainingData)
In [28]:
import matplotlib.pyplot as plt
import numpy as np
beta = np.sort(lrModel.coefficients)
plt.plot(beta)
plt.ylabel('Beta Coefficients')
plt.show()
In [29]:
trainingSummary = lrModel.summary
# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
plt.plot(objectiveHistory)
plt.ylabel('Objective Function')
plt.xlabel('Iteration')
plt.show()
In [30]:
# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))
#trainingSummary.roc.show(n=10, truncate=15)
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()
In [31]:
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()
In [32]:
predictions = lrModel.transform(testData)
In [33]:
predictions.select("label","prediction","probability")\
.show(n=10, truncate=40)
In [34]:
print("Number of correct prediction: " + str(predictions.filter(predictions['prediction'] == predictions['label']).count()))
print("Total of elements: " + str(testData.count()))
print(str(predictions.filter(predictions['prediction'] == predictions['label']).count()/testData.count()*100) + '%')
In [35]:
predictions.filter(predictions['prediction'] == predictions['label'])\
.select("gameId","probability","label","prediction").show(20)
In [36]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
print("Training: Area Under ROC: " + str(trainingSummary.areaUnderROC))
# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Test: Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))
In [ ]: