In [16]:
import datetime
from pytz import timezone
print "Last run @%s" % (datetime.datetime.now(timezone('US/Pacific')))
#
from pyspark.context import SparkContext
print "Running Spark Version %s" % (sc.version)
#
from pyspark.conf import SparkConf
conf = SparkConf()
print conf.toDebugString()
In [17]:
# Read Train & Test Datasets
train = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('titanic-r/train.csv')
test = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('titanic-r/test.csv')
In [18]:
train.dtypes
Out[18]:
In [19]:
train.describe().show()
In [20]:
train.show(2)
In [21]:
import pyspark.sql.functions as F
train_1 = train.select(train['PassengerId'],
train['Survived'].cast("integer").alias("Survived"),
train['Pclass'].cast("integer").alias("Pclass"),
F.when(train['Sex'] == 'female', 1).otherwise(0).alias("Gender"),
train['Age'].cast("integer").alias("Age"),
train['SibSp'].cast("integer").alias("SibSp"),
train['Parch'].cast("integer").alias("Parch"),
train['Fare'].cast("float").alias("Fare"))
In [22]:
train.count()
Out[22]:
In [23]:
train_1.count()
Out[23]:
In [24]:
train_1.show(2)
In [25]:
train_1.describe().show()
In [26]:
# Replace null age by 30
# Do we have nulls ?
train_1.filter(train_1['Age'].isNull()).show(40)
In [27]:
# Replace null age by 30
train_1.na.fill(30,'Age').show(40)
In [28]:
# Replace null age by 30
train_2 = train_1.na.fill(30,'Age')
In [29]:
train_2.crosstab("Gender","Survived").show()
In [30]:
print "F = %3.2f%% M = %3.2f%%" % ( (100*233.0/(233+81)), (100*109.0/(109+468)) )
In [31]:
#
# 1 : Simple Model (M=Survived)
#
test.show(2)
In [32]:
out = test.select(test['PassengerId'],
F.when(test['Sex'] == 'female', 1).otherwise(0).alias("Survived"))
In [33]:
out.show(2)
In [34]:
out.coalesce(1).write.mode('overwrite').format('com.databricks.spark.csv')\
.options(header='true').save('titanic-r/spark-sub-01.csv')
In [35]:
# Submit
# Rank : 2586 Score : 0.76555
In [36]:
#
# Would age be a better predictor ?
#
train_1.na.drop().crosstab("Age","Survived").show()
In [37]:
#
# *** Home work : See if Pclass, SibSp or Parch is a better indication and change survival accordingly¶
#
In [38]:
from pyspark.mllib.regression import LabeledPoint
def parse_passenger_list(r):
return LabeledPoint(r[1],[r[2],r[3],r[4],r[5],r[6],r[7]])
In [39]:
train_rdd = train_2.map(lambda x: parse_passenger_list(x))
In [40]:
train_rdd.count()
Out[40]:
In [41]:
train_rdd.first()
Out[41]:
In [42]:
from pyspark.mllib.tree import DecisionTree
model = DecisionTree.trainClassifier(train_rdd, numClasses=2,categoricalFeaturesInfo={})
In [43]:
print(model)
# print(model.toDebugString())
In [44]:
# Transform test and predict
import pyspark.sql.functions as F
test_1 = test.select(test['PassengerId'],
test['Pclass'].cast("integer").alias("Pclass"),
F.when(test['Sex'] == 'female', 1).otherwise(0).alias("Gender"),
test['Age'].cast("integer").alias("Age"),
test['SibSp'].cast("integer").alias("SibSp"),
test['Parch'].cast("integer").alias("Parch"),
test['Fare'].cast("float").alias("Fare"))
In [45]:
test_1.show(2)
In [46]:
# Do we have nulls ?
test_1.filter(test_1['Age'].isNull()).show(40)
In [47]:
test_1.groupBy().avg('Age').show()
In [48]:
# Replace null age by 30.24 - the mean
test_2 = test_1.na.fill(30,'Age')
In [49]:
# parse test data for predictions
from pyspark.mllib.regression import LabeledPoint
def parse_test(r):
return (r[1],r[2],r[3],r[4],r[5],r[6])
In [50]:
test_rdd = test_2.map(lambda x: parse_test(x))
In [51]:
test_rdd.count()
Out[51]:
In [52]:
predictions = model.predict(test_rdd)
In [53]:
predictions.first()
Out[53]:
In [54]:
out_rdd = test_2.map(lambda x: x[0]).zip(predictions)
In [55]:
out_rdd.first()
Out[55]:
In [56]:
out_df = out_rdd.toDF(['PassengerId','Survived'])
In [57]:
out_df.show(2)
In [58]:
out_1 = out_df.select(out_df['PassengerId'],
out_df['Survived'].cast('integer').alias('Survived'))
In [59]:
out_1.show(2)
In [60]:
out_1.coalesce(1).write.mode('overwrite').format('com.databricks.spark.csv')\
.options(header='true').save('titanic-r/spark-sub-02.csv')
In [61]:
# Submit
# Rank : 2038 +549 Score : 0.77512
In [62]:
from pyspark.mllib.tree import RandomForest
model_rf = RandomForest.trainClassifier(train_rdd, numClasses=2,categoricalFeaturesInfo={},numTrees=42)
In [63]:
print(model_rf)
#print(model_rf.toDebugString())
In [64]:
pred_rf = model_rf.predict(test_rdd).coalesce(1)
In [65]:
pred_rf.first()
Out[65]:
In [66]:
out_rf = test_2.map(lambda x: x[0]).coalesce(1).zip(pred_rf)
In [67]:
out_rf.first()
Out[67]:
In [68]:
out_df_rf = out_rf.toDF(['PassengerId','Survived'])
In [69]:
out_2 = out_df_rf.select(out_df_rf['PassengerId'],
out_df_rf['Survived'].cast('integer').alias('Survived'))
In [70]:
out_2.coalesce(1).write.mode('overwrite').format('com.databricks.spark.csv')\
.options(header='true').save('titanic-r/spark-sub-03.csv')
In [71]:
# Submit
# Rank : 1550 +488 Score : 0.78469
In [72]:
# Looks like we are on a roll ! Let us try SVM !
In [73]:
from pyspark.mllib.classification import SVMWithSGD
model_svm = SVMWithSGD.train(train_rdd, iterations=100)
In [74]:
pred_svm = model_svm.predict(test_rdd).coalesce(1)
out_svm = test_2.map(lambda x: x[0]).coalesce(1).zip(pred_svm)
out_df_svm = out_svm.toDF(['PassengerId','Survived'])
In [75]:
out_3 = out_df_svm.select(out_df_svm['PassengerId'],
out_df_svm['Survived'].cast('integer').alias('Survived'))
In [76]:
out_3.coalesce(1).write.mode('overwrite').format('com.databricks.spark.csv')\
.options(header='true').save('titanic-r/spark-sub-04.csv')
In [77]:
# Not good. Only 0.39713 !
http://www.slideshare.net/ksankar/data-science-folk-knowledge
In [ ]: