Imports


In [1]:
import os
import sys

In [2]:
# load PySpark
try:
    # Append PySpark to PYTHONPATH / Spark 2.1.0
    sys.path.append(os.path.join(os.environ["SPARK_HOME"], "python"))
    sys.path.append(os.path.join(os.environ["SPARK_HOME"], "python", "lib",
                                 "py4j-0.10.4-src.zip"))
except KeyError as e:
    print("SPARK_HOME is not set", e)
    sys.exit(1)

In [3]:
import pickle
import pandas as pd
import numpy as np
import sklearn as skl
from pyspark import SparkConf
from pyspark.sql import SparkSession
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, precision_score, recall_score

In [4]:
print("scikit-learn: {}".format(skl.__version__))
print("pandas: {}".format(pd.__version__))
print("numpy: {}".format(np.__version__))


scikit-learn: 0.18.1
pandas: 0.19.1
numpy: 1.11.2


In [5]:
# load data
train_data = pd.read_csv("data/titanic_dataset_train.csv")

In [6]:
train_data.groupby("Sex")["Survived"].value_counts()


Out[6]:
Sex     Survived
female  1           233
        0            81
male    0           468
        1           109
Name: Survived, dtype: int64

In [7]:
train_data.head()


Out[7]:
PassengerId Survived Pclass Name Sex Age SibSp Parch Ticket Fare Cabin Embarked
0 1 0 3 Braund, Mr. Owen Harris male 22.0 1 0 A/5 21171 7.2500 NaN S
1 2 1 1 Cumings, Mrs. John Bradley (Florence Briggs Th... female 38.0 1 0 PC 17599 71.2833 C85 C
2 3 1 3 Heikkinen, Miss. Laina female 26.0 0 0 STON/O2. 3101282 7.9250 NaN S
3 4 1 1 Futrelle, Mrs. Jacques Heath (Lily May Peel) female 35.0 1 0 113803 53.1000 C123 S
4 5 0 3 Allen, Mr. William Henry male 35.0 0 0 373450 8.0500 NaN S

In [8]:
train_data.Survived.value_counts(normalize=True)


Out[8]:
0    0.616162
1    0.383838
Name: Survived, dtype: float64

In [9]:
train_data.describe()


Out[9]:
PassengerId Survived Pclass Age SibSp Parch Fare
count 891.000000 891.000000 891.000000 714.000000 891.000000 891.000000 891.000000
mean 446.000000 0.383838 2.308642 29.699118 0.523008 0.381594 32.204208
std 257.353842 0.486592 0.836071 14.526497 1.102743 0.806057 49.693429
min 1.000000 0.000000 1.000000 0.420000 0.000000 0.000000 0.000000
25% 223.500000 0.000000 2.000000 20.125000 0.000000 0.000000 7.910400
50% 446.000000 0.000000 3.000000 28.000000 0.000000 0.000000 14.454200
75% 668.500000 1.000000 3.000000 38.000000 1.000000 0.000000 31.000000
max 891.000000 1.000000 3.000000 80.000000 8.000000 6.000000 512.329200

In [10]:
train_data.Sex.isnull().value_counts()


Out[10]:
False    891
Name: Sex, dtype: int64

In [11]:
train_data.Embarked.isnull().value_counts()


Out[11]:
False    889
True       2
Name: Embarked, dtype: int64

In [12]:
continuous_variables = train_data[["Pclass", "Age", "SibSp", "Parch", "Fare"]].copy()
continuous_variables.Age = continuous_variables.Age.fillna(-255)

In [13]:
categorical_variables = train_data[["Sex", "Embarked"]].copy()
categorical_variables.Embarked = categorical_variables.Embarked.fillna(-255)

In [14]:
pd.get_dummies(categorical_variables).head()


Out[14]:
Sex_female Sex_male Embarked_-255 Embarked_C Embarked_Q Embarked_S
0 0 1 0 0 0 1
1 1 0 0 1 0 0
2 1 0 0 0 0 1
3 1 0 0 0 0 1
4 0 1 0 0 0 1

In [15]:
pd.concat([continuous_variables.head(), continuous_variables.head()], axis=1)


Out[15]:
Pclass Age SibSp Parch Fare Pclass Age SibSp Parch Fare
0 3 22.0 1 0 7.2500 3 22.0 1 0 7.2500
1 1 38.0 1 0 71.2833 1 38.0 1 0 71.2833
2 3 26.0 0 0 7.9250 3 26.0 0 0 7.9250
3 1 35.0 1 0 53.1000 1 35.0 1 0 53.1000
4 3 35.0 0 0 8.0500 3 35.0 0 0 8.0500

In [16]:
continuous_variables = train_data[["Pclass", "Age", "SibSp", "Parch", "Fare"]].copy()
continuous_variables.Age = continuous_variables.Age.fillna(-255)
categorical_variables = train_data[["Sex", "Embarked"]].copy()
categorical_variables.Embarked = categorical_variables.Embarked.fillna(-255)
    
y = train_data.Survived.values
X = pd.concat([continuous_variables, pd.get_dummies(categorical_variables)], axis=1).values
    
lg_model = LogisticRegression(random_state=1)
skf = StratifiedKFold(n_splits=3)
    
score = []
for train, test in skf.split(X, y):
    fitted_model = lg_model.fit(X[train], y[train])
    y_pred = fitted_model.predict(X[test])
    y_true = y[test]
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred)
    recall = recall_score(y_true, y_pred)
    score.append((accuracy, precision, recall, pickle.dumps(fitted_model), fitted_model, fitted_model.coef_))

Notes

  • When saving the model directly, it only stores the coefficient from the last trained model
  • Serializing the model and then deserializing the model works though

In [17]:
pickle.loads(score[0][3]).coef_


Out[17]:
array([[ -8.61683232e-01,   1.33502542e-03,  -1.97196423e-01,
         -1.85513431e-01,   3.12556318e-03,   1.89302858e+00,
         -8.24340242e-01,   7.47929332e-02,   8.08315494e-01,
          8.92525479e-02,   9.63273672e-02]])

In [18]:
pickle.loads(score[1][3]).coef_


Out[18]:
array([[ -6.52150734e-01,  -7.02098628e-04,  -2.84662818e-01,
         -8.76863787e-02,   8.35325971e-03,   1.66824332e+00,
         -9.08737509e-01,   1.43698892e-01,   4.48824665e-01,
          1.62178989e-01,   4.80327030e-03]])

In [19]:
pickle.loads(score[2][3]).coef_


Out[19]:
array([[ -7.17141076e-01,   8.90771172e-04,  -2.39548127e-01,
          7.91689887e-02,  -2.11482503e-04,   1.78695222e+00,
         -8.48641674e-01,   1.05869373e-01,   2.27665357e-01,
          6.12079781e-01,  -7.30396392e-03]])

In [20]:
score[0][4].coef_


Out[20]:
array([[ -7.17141076e-01,   8.90771172e-04,  -2.39548127e-01,
          7.91689887e-02,  -2.11482503e-04,   1.78695222e+00,
         -8.48641674e-01,   1.05869373e-01,   2.27665357e-01,
          6.12079781e-01,  -7.30396392e-03]])

In [21]:
score[1][4].coef_


Out[21]:
array([[ -7.17141076e-01,   8.90771172e-04,  -2.39548127e-01,
          7.91689887e-02,  -2.11482503e-04,   1.78695222e+00,
         -8.48641674e-01,   1.05869373e-01,   2.27665357e-01,
          6.12079781e-01,  -7.30396392e-03]])

In [22]:
score[2][4].coef_


Out[22]:
array([[ -7.17141076e-01,   8.90771172e-04,  -2.39548127e-01,
          7.91689887e-02,  -2.11482503e-04,   1.78695222e+00,
         -8.48641674e-01,   1.05869373e-01,   2.27665357e-01,
          6.12079781e-01,  -7.30396392e-03]])

Spark


In [23]:
# Create a SparkSession
spark_session = SparkSession.builder.getOrCreate()

In [24]:
# Usually read from database / could also define schema here and change the type
train_data_df = spark_session.read.csv("data/titanic_dataset_train.csv", header=True)

In [25]:
train_data_df.show()


+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|  22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|  38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|  26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|  35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|  35|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|          330877| 8.4583| null|       Q|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|  54|    0|    0|           17463|51.8625|  E46|       S|
|          8|       0|     3|Palsson, Master. ...|  male|   2|    3|    1|          349909| 21.075| null|       S|
|          9|       1|     3|Johnson, Mrs. Osc...|female|  27|    0|    2|          347742|11.1333| null|       S|
|         10|       1|     2|Nasser, Mrs. Nich...|female|  14|    1|    0|          237736|30.0708| null|       C|
|         11|       1|     3|Sandstrom, Miss. ...|female|   4|    1|    1|         PP 9549|   16.7|   G6|       S|
|         12|       1|     1|Bonnell, Miss. El...|female|  58|    0|    0|          113783|  26.55| C103|       S|
|         13|       0|     3|Saundercock, Mr. ...|  male|  20|    0|    0|       A/5. 2151|   8.05| null|       S|
|         14|       0|     3|Andersson, Mr. An...|  male|  39|    1|    5|          347082| 31.275| null|       S|
|         15|       0|     3|Vestrom, Miss. Hu...|female|  14|    0|    0|          350406| 7.8542| null|       S|
|         16|       1|     2|Hewlett, Mrs. (Ma...|female|  55|    0|    0|          248706|     16| null|       S|
|         17|       0|     3|Rice, Master. Eugene|  male|   2|    4|    1|          382652| 29.125| null|       Q|
|         18|       1|     2|Williams, Mr. Cha...|  male|null|    0|    0|          244373|     13| null|       S|
|         19|       0|     3|Vander Planke, Mr...|female|  31|    1|    0|          345763|     18| null|       S|
|         20|       1|     3|Masselmani, Mrs. ...|female|null|    0|    0|            2649|  7.225| null|       C|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
only showing top 20 rows


In [26]:
train_data_df.dtypes


Out[26]:
[('PassengerId', 'string'),
 ('Survived', 'string'),
 ('Pclass', 'string'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'string'),
 ('SibSp', 'string'),
 ('Parch', 'string'),
 ('Ticket', 'string'),
 ('Fare', 'string'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

In [27]:
def train_and_cv_model(data):
    columns = ["Survived", "Pclass", "Age", "Name",
              "Sibsp", "Parch", "Ticket", "Fare",
              "Cabin", "Embarked"]
    data_df = pd.DataFrame(data, columns=columns)
    
    # has to be converted to a number i.e. Survived is of type string due to Spark
    y = data_df.Survived.values.astype(np.float32) 
    
    continuous_variables = data_df[["Pclass", "Age", "Sibsp", "Parch", "Fare"]].copy()
    continuous_variables.Age = continuous_variables.Age.fillna(-255)
    categorical_variables = data_df[["Embarked"]].copy()
    categorical_variables.Embarked = categorical_variables.Embarked.fillna(-255)
    
    X = pd.concat([continuous_variables, pd.get_dummies(categorical_variables)], axis=1).values
    
    lg_model = LogisticRegression(random_state=1)
    skf = StratifiedKFold(n_splits=3)
    
    score = []
    for train, test in skf.split(X, y):
        fitted_model = lg_model.fit(X[train], y[train])
        y_pred = fitted_model.predict(X[test])
        y_true = y[test]
        accuracy = accuracy_score(y_true, y_pred)
        precision = precision_score(y_true, y_pred)
        recall = recall_score(y_true, y_pred)
        score.append((accuracy, precision, recall, pickle.dumps(fitted_model)))
    return score

In [28]:
train_data_grouped = (train_data_df
 .rdd
 .map(lambda x: (x.Sex, (
                x.Survived,
                x.Pclass,
                x.Age,
                x.Name,
                x.SibSp,
                x.Parch,
                x.Ticket,
                x.Fare,
                x.Cabin,
                x.Embarked
            )))
 .groupByKey()
 )

In [29]:
train_data_grouped.map(lambda x: (x[0], train_and_cv_model(list(x[1])))).take(2)


Out[29]:
[('female',
  [(0.73333333333333328,
    0.79761904761904767,
    0.85897435897435892,
    b'\x80\x03csklearn.linear_model.logistic\nLogisticRegression\nq\x00)\x81q\x01}q\x02(X\x11\x00\x00\x00intercept_scalingq\x03K\x01X\x0b\x00\x00\x00multi_classq\x04X\x03\x00\x00\x00ovrq\x05X\x10\x00\x00\x00_sklearn_versionq\x06X\x06\x00\x00\x000.18.1q\x07X\x07\x00\x00\x00verboseq\x08K\x00X\x0c\x00\x00\x00random_stateq\tK\x01X\n\x00\x00\x00warm_startq\n\x89X\x08\x00\x00\x00classes_q\x0bcnumpy.core.multiarray\n_reconstruct\nq\x0ccnumpy\nndarray\nq\rK\x00\x85q\x0eC\x01bq\x0f\x87q\x10Rq\x11(K\x01K\x02\x85q\x12cnumpy\ndtype\nq\x13X\x02\x00\x00\x00f4q\x14K\x00K\x01\x87q\x15Rq\x16(K\x03X\x01\x00\x00\x00<q\x17NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq\x18b\x89C\x08\x00\x00\x00\x00\x00\x00\x80?q\x19tq\x1abX\x03\x00\x00\x00tolq\x1bG?\x1a6\xe2\xeb\x1cC-X\x07\x00\x00\x00n_iter_q\x1ch\x0ch\rK\x00\x85q\x1dh\x0f\x87q\x1eRq\x1f(K\x01K\x01\x85q h\x13X\x02\x00\x00\x00i4q!K\x00K\x01\x87q"Rq#(K\x03h\x17NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq$b\x89C\x04\r\x00\x00\x00q%tq&bX\x05\x00\x00\x00coef_q\'h\x0ch\rK\x00\x85q(h\x0f\x87q)Rq*(K\x01K\x01K\t\x86q+h\x13X\x02\x00\x00\x00f8q,K\x00K\x01\x87q-Rq.(K\x03h\x17NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq/b\x88CH w\xd5\xdf\xa5\xfc\xf0\xbf\xcb\xd64\xfeW:O\xbfj\x87\xe7\xf3\xefN\xd8\xbf\xe44\xd8\x8f\xbe\xde\xd8\xbf\xf4\xf8\x19\xcc\xd5\x96\x87?\x92ko\xea8\x02\xad?\xc2\xd1\r?\x9b*\xfa?` \x86&\xac\xb7\xd5?d\xe3x\x04\xf7!\xe8?q0tq1bX\x0c\x00\x00\x00class_weightq2NX\x06\x00\x00\x00solverq3X\t\x00\x00\x00liblinearq4X\x04\x00\x00\x00dualq5\x89X\x06\x00\x00\x00n_jobsq6K\x01X\x01\x00\x00\x00Cq7G?\xf0\x00\x00\x00\x00\x00\x00X\r\x00\x00\x00fit_interceptq8\x88X\x08\x00\x00\x00max_iterq9KdX\x07\x00\x00\x00penaltyq:X\x02\x00\x00\x00l2q;X\n\x00\x00\x00intercept_q<h\x0ch\rK\x00\x85q=h\x0f\x87q>Rq?(K\x01K\x01\x85q@h.\x89C\x08j\xa3\x1f\xc9\xc9H\x06@qAtqBbub.'),
   (0.79047619047619044,
    0.7978723404255319,
    0.96153846153846156,
    b'\x80\x03csklearn.linear_model.logistic\nLogisticRegression\nq\x00)\x81q\x01}q\x02(X\x11\x00\x00\x00intercept_scalingq\x03K\x01X\x0b\x00\x00\x00multi_classq\x04X\x03\x00\x00\x00ovrq\x05X\x10\x00\x00\x00_sklearn_versionq\x06X\x06\x00\x00\x000.18.1q\x07X\x07\x00\x00\x00verboseq\x08K\x00X\x0c\x00\x00\x00random_stateq\tK\x01X\n\x00\x00\x00warm_startq\n\x89X\x08\x00\x00\x00classes_q\x0bcnumpy.core.multiarray\n_reconstruct\nq\x0ccnumpy\nndarray\nq\rK\x00\x85q\x0eC\x01bq\x0f\x87q\x10Rq\x11(K\x01K\x02\x85q\x12cnumpy\ndtype\nq\x13X\x02\x00\x00\x00f4q\x14K\x00K\x01\x87q\x15Rq\x16(K\x03X\x01\x00\x00\x00<q\x17NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq\x18b\x89C\x08\x00\x00\x00\x00\x00\x00\x80?q\x19tq\x1abX\x03\x00\x00\x00tolq\x1bG?\x1a6\xe2\xeb\x1cC-X\x07\x00\x00\x00n_iter_q\x1ch\x0ch\rK\x00\x85q\x1dh\x0f\x87q\x1eRq\x1f(K\x01K\x01\x85q h\x13X\x02\x00\x00\x00i4q!K\x00K\x01\x87q"Rq#(K\x03h\x17NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq$b\x89C\x04\r\x00\x00\x00q%tq&bX\x05\x00\x00\x00coef_q\'h\x0ch\rK\x00\x85q(h\x0f\x87q)Rq*(K\x01K\x01K\t\x86q+h\x13X\x02\x00\x00\x00f8q,K\x00K\x01\x87q-Rq.(K\x03h\x17NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq/b\x88CH\xba]\x8c\xd4I\xd4\xe2\xbf,\xab5|Q4\xfd>`;\x012\xe4\xeb\xe2\xbf\xb7\xb6Q\x19\x93\xd4\xdd\xbfR\xafE{\x15\xae\xab?\x99\xee\x93\x15E^~?n\xcaS\xde\xa58\xd7?\x86\r\xca\xa1}}\xed?|x\x06\x15\xf1\xce\xd3?q0tq1bX\x0c\x00\x00\x00class_weightq2NX\x06\x00\x00\x00solverq3X\t\x00\x00\x00liblinearq4X\x04\x00\x00\x00dualq5\x89X\x06\x00\x00\x00n_jobsq6K\x01X\x01\x00\x00\x00Cq7G?\xf0\x00\x00\x00\x00\x00\x00X\r\x00\x00\x00fit_interceptq8\x88X\x08\x00\x00\x00max_iterq9KdX\x07\x00\x00\x00penaltyq:X\x02\x00\x00\x00l2q;X\n\x00\x00\x00intercept_q<h\x0ch\rK\x00\x85q=h\x0f\x87q>Rq?(K\x01K\x01\x85q@h.\x89C\x08S+\xd1\xd2\x02\x9f\xf9?qAtqBbub.'),
   (0.80769230769230771,
    0.79381443298969068,
    1.0,
    b'\x80\x03csklearn.linear_model.logistic\nLogisticRegression\nq\x00)\x81q\x01}q\x02(X\x11\x00\x00\x00intercept_scalingq\x03K\x01X\x0b\x00\x00\x00multi_classq\x04X\x03\x00\x00\x00ovrq\x05X\x10\x00\x00\x00_sklearn_versionq\x06X\x06\x00\x00\x000.18.1q\x07X\x07\x00\x00\x00verboseq\x08K\x00X\x0c\x00\x00\x00random_stateq\tK\x01X\n\x00\x00\x00warm_startq\n\x89X\x08\x00\x00\x00classes_q\x0bcnumpy.core.multiarray\n_reconstruct\nq\x0ccnumpy\nndarray\nq\rK\x00\x85q\x0eC\x01bq\x0f\x87q\x10Rq\x11(K\x01K\x02\x85q\x12cnumpy\ndtype\nq\x13X\x02\x00\x00\x00f4q\x14K\x00K\x01\x87q\x15Rq\x16(K\x03X\x01\x00\x00\x00<q\x17NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq\x18b\x89C\x08\x00\x00\x00\x00\x00\x00\x80?q\x19tq\x1abX\x03\x00\x00\x00tolq\x1bG?\x1a6\xe2\xeb\x1cC-X\x07\x00\x00\x00n_iter_q\x1ch\x0ch\rK\x00\x85q\x1dh\x0f\x87q\x1eRq\x1f(K\x01K\x01\x85q h\x13X\x02\x00\x00\x00i4q!K\x00K\x01\x87q"Rq#(K\x03h\x17NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq$b\x89C\x04\x0e\x00\x00\x00q%tq&bX\x05\x00\x00\x00coef_q\'h\x0ch\rK\x00\x85q(h\x0f\x87q)Rq*(K\x01K\x01K\t\x86q+h\x13X\x02\x00\x00\x00f8q,K\x00K\x01\x87q-Rq.(K\x03h\x17NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq/b\x88CH\xb1n\x99\xe4\xe8\x8a\xea\xbf`\xbd\x95\x0e=\xe3A?n\x12\x92x\xfc\xc3\xd5\xbf\xcb@\x10:\x8fr\x93\xbf\x1f\x93\x0c\xed\x84\r\x83?\x9c\x16\x14\xe2\xceu\xae?\x97l\x05\xde\xa5V\xe8?\xd5Cu\n*\xbd\xf7?\x16\x91\x85\xcf\n\xcb\xca?q0tq1bX\x0c\x00\x00\x00class_weightq2NX\x06\x00\x00\x00solverq3X\t\x00\x00\x00liblinearq4X\x04\x00\x00\x00dualq5\x89X\x06\x00\x00\x00n_jobsq6K\x01X\x01\x00\x00\x00Cq7G?\xf0\x00\x00\x00\x00\x00\x00X\r\x00\x00\x00fit_interceptq8\x88X\x08\x00\x00\x00max_iterq9KdX\x07\x00\x00\x00penaltyq:X\x02\x00\x00\x00l2q;X\n\x00\x00\x00intercept_q<h\x0ch\rK\x00\x85q=h\x0f\x87q>Rq?(K\x01K\x01\x85q@h.\x89C\x08v\xa6<e\xc6\x1a\x04@qAtqBbub.')]),
 ('male',
  [(0.79274611398963735,
    0.2857142857142857,
    0.054054054054054057,
    b'\x80\x03csklearn.linear_model.logistic\nLogisticRegression\nq\x00)\x81q\x01}q\x02(X\x11\x00\x00\x00intercept_scalingq\x03K\x01X\x0b\x00\x00\x00multi_classq\x04X\x03\x00\x00\x00ovrq\x05X\x10\x00\x00\x00_sklearn_versionq\x06X\x06\x00\x00\x000.18.1q\x07X\x07\x00\x00\x00verboseq\x08K\x00X\x0c\x00\x00\x00random_stateq\tK\x01X\n\x00\x00\x00warm_startq\n\x89X\x08\x00\x00\x00classes_q\x0bcnumpy.core.multiarray\n_reconstruct\nq\x0ccnumpy\nndarray\nq\rK\x00\x85q\x0eC\x01bq\x0f\x87q\x10Rq\x11(K\x01K\x02\x85q\x12cnumpy\ndtype\nq\x13X\x02\x00\x00\x00f4q\x14K\x00K\x01\x87q\x15Rq\x16(K\x03X\x01\x00\x00\x00<q\x17NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq\x18b\x89C\x08\x00\x00\x00\x00\x00\x00\x80?q\x19tq\x1abX\x03\x00\x00\x00tolq\x1bG?\x1a6\xe2\xeb\x1cC-X\x07\x00\x00\x00n_iter_q\x1ch\x0ch\rK\x00\x85q\x1dh\x0f\x87q\x1eRq\x1f(K\x01K\x01\x85q h\x13X\x02\x00\x00\x00i4q!K\x00K\x01\x87q"Rq#(K\x03h\x17NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq$b\x89C\x04\x0f\x00\x00\x00q%tq&bX\x05\x00\x00\x00coef_q\'h\x0ch\rK\x00\x85q(h\x0f\x87q)Rq*(K\x01K\x01K\x08\x86q+h\x13X\x02\x00\x00\x00f8q,K\x00K\x01\x87q-Rq.(K\x03h\x17NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq/b\x88C@x\x0c\xb0\xc0/i\xe5\xbf\xcb{&[\x1a?]?\x8dY\xc95\xfdz\xbc\xbf\x0bp\x04\x88\xaah\xd6?%\x0f\xaa6]`]?N*\xd28\x1b\x86\xdc?@TS\x01x=\xc8\xbf)\x87\xb1\xb3\x88w\xcc\xbfq0tq1bX\x0c\x00\x00\x00class_weightq2NX\x06\x00\x00\x00solverq3X\t\x00\x00\x00liblinearq4X\x04\x00\x00\x00dualq5\x89X\x06\x00\x00\x00n_jobsq6K\x01X\x01\x00\x00\x00Cq7G?\xf0\x00\x00\x00\x00\x00\x00X\r\x00\x00\x00fit_interceptq8\x88X\x08\x00\x00\x00max_iterq9KdX\x07\x00\x00\x00penaltyq:X\x02\x00\x00\x00l2q;X\n\x00\x00\x00intercept_q<h\x0ch\rK\x00\x85q=h\x0f\x87q>Rq?(K\x01K\x01\x85q@h.\x89C\x08\x0c\xe0}\xf2\xd6\\\xa1?qAtqBbub.'),
   (0.80729166666666663,
    0.42857142857142855,
    0.083333333333333329,
    b'\x80\x03csklearn.linear_model.logistic\nLogisticRegression\nq\x00)\x81q\x01}q\x02(X\x11\x00\x00\x00intercept_scalingq\x03K\x01X\x0b\x00\x00\x00multi_classq\x04X\x03\x00\x00\x00ovrq\x05X\x10\x00\x00\x00_sklearn_versionq\x06X\x06\x00\x00\x000.18.1q\x07X\x07\x00\x00\x00verboseq\x08K\x00X\x0c\x00\x00\x00random_stateq\tK\x01X\n\x00\x00\x00warm_startq\n\x89X\x08\x00\x00\x00classes_q\x0bcnumpy.core.multiarray\n_reconstruct\nq\x0ccnumpy\nndarray\nq\rK\x00\x85q\x0eC\x01bq\x0f\x87q\x10Rq\x11(K\x01K\x02\x85q\x12cnumpy\ndtype\nq\x13X\x02\x00\x00\x00f4q\x14K\x00K\x01\x87q\x15Rq\x16(K\x03X\x01\x00\x00\x00<q\x17NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq\x18b\x89C\x08\x00\x00\x00\x00\x00\x00\x80?q\x19tq\x1abX\x03\x00\x00\x00tolq\x1bG?\x1a6\xe2\xeb\x1cC-X\x07\x00\x00\x00n_iter_q\x1ch\x0ch\rK\x00\x85q\x1dh\x0f\x87q\x1eRq\x1f(K\x01K\x01\x85q h\x13X\x02\x00\x00\x00i4q!K\x00K\x01\x87q"Rq#(K\x03h\x17NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq$b\x89C\x04\x13\x00\x00\x00q%tq&bX\x05\x00\x00\x00coef_q\'h\x0ch\rK\x00\x85q(h\x0f\x87q)Rq*(K\x01K\x01K\x08\x86q+h\x13X\x02\x00\x00\x00f8q,K\x00K\x01\x87q-Rq.(K\x03h\x17NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq/b\x88C@\xa0\xb7Y\'\xfc{\xd9\xbf\xcd\x196\x1ee&L\xbf\x99\xfe\xc4o\xb1;\xc8\xbf\xde\x9d\x03\xc9\xd1\xf5\xd9?71b\xc9\xd5ys?(\xd5\xd3\x97\x7f1\xc8?\x9e\xbc\xce\xcf9\xe2\xe4\xbf\x03\xa7am\x1d-\xc2\xbfq0tq1bX\x0c\x00\x00\x00class_weightq2NX\x06\x00\x00\x00solverq3X\t\x00\x00\x00liblinearq4X\x04\x00\x00\x00dualq5\x89X\x06\x00\x00\x00n_jobsq6K\x01X\x01\x00\x00\x00Cq7G?\xf0\x00\x00\x00\x00\x00\x00X\r\x00\x00\x00fit_interceptq8\x88X\x08\x00\x00\x00max_iterq9KdX\x07\x00\x00\x00penaltyq:X\x02\x00\x00\x00l2q;X\n\x00\x00\x00intercept_q<h\x0ch\rK\x00\x85q=h\x0f\x87q>Rq?(K\x01K\x01\x85q@h.\x89C\x08\x1f22E!a\xe3\xbfqAtqBbub.'),
   (0.8125,
    0.0,
    0.0,
    b'\x80\x03csklearn.linear_model.logistic\nLogisticRegression\nq\x00)\x81q\x01}q\x02(X\x11\x00\x00\x00intercept_scalingq\x03K\x01X\x0b\x00\x00\x00multi_classq\x04X\x03\x00\x00\x00ovrq\x05X\x10\x00\x00\x00_sklearn_versionq\x06X\x06\x00\x00\x000.18.1q\x07X\x07\x00\x00\x00verboseq\x08K\x00X\x0c\x00\x00\x00random_stateq\tK\x01X\n\x00\x00\x00warm_startq\n\x89X\x08\x00\x00\x00classes_q\x0bcnumpy.core.multiarray\n_reconstruct\nq\x0ccnumpy\nndarray\nq\rK\x00\x85q\x0eC\x01bq\x0f\x87q\x10Rq\x11(K\x01K\x02\x85q\x12cnumpy\ndtype\nq\x13X\x02\x00\x00\x00f4q\x14K\x00K\x01\x87q\x15Rq\x16(K\x03X\x01\x00\x00\x00<q\x17NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq\x18b\x89C\x08\x00\x00\x00\x00\x00\x00\x80?q\x19tq\x1abX\x03\x00\x00\x00tolq\x1bG?\x1a6\xe2\xeb\x1cC-X\x07\x00\x00\x00n_iter_q\x1ch\x0ch\rK\x00\x85q\x1dh\x0f\x87q\x1eRq\x1f(K\x01K\x01\x85q h\x13X\x02\x00\x00\x00i4q!K\x00K\x01\x87q"Rq#(K\x03h\x17NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq$b\x89C\x04\x0e\x00\x00\x00q%tq&bX\x05\x00\x00\x00coef_q\'h\x0ch\rK\x00\x85q(h\x0f\x87q)Rq*(K\x01K\x01K\x08\x86q+h\x13X\x02\x00\x00\x00f8q,K\x00K\x01\x87q-Rq.(K\x03h\x17NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq/b\x88C@v\x9c\x94\x93;@\xe2\xbf\xeb\x92D1A\xd6Q?\x89\xeajvt\x1d\xb6\xbf\x12\xf1\x16no\xc7\xd9?\x11\xa4\x13Sc`z\xbf\x08\x084f\xe7O\xd4?\x9a\x86\xc2\x88"\xf8\xd7\xbf\x9aJjE\x891\x81\xbfq0tq1bX\x0c\x00\x00\x00class_weightq2NX\x06\x00\x00\x00solverq3X\t\x00\x00\x00liblinearq4X\x04\x00\x00\x00dualq5\x89X\x06\x00\x00\x00n_jobsq6K\x01X\x01\x00\x00\x00Cq7G?\xf0\x00\x00\x00\x00\x00\x00X\r\x00\x00\x00fit_interceptq8\x88X\x08\x00\x00\x00max_iterq9KdX\x07\x00\x00\x00penaltyq:X\x02\x00\x00\x00l2q;X\n\x00\x00\x00intercept_q<h\x0ch\rK\x00\x85q=h\x0f\x87q>Rq?(K\x01K\x01\x85q@h.\x89C\x08\xd6D\xe7\xb2\x1d\xc7\xb0\xbfqAtqBbub.')])]
Different solution if there are many columns

In [30]:
train_data_df.columns


Out[30]:
['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [31]:
columns = train_data_df.columns
def create_df(row):
    return pd.DataFrame(row, columns=columns)

In [32]:
train_data_grouped = (train_data_df
 .rdd
 .map(lambda x: (x.Sex, tuple(x))) # converts rows explicty to list -> trick
 .groupByKey()
 )

In [33]:
train_data_grouped.mapValues(lambda x: create_df(list(x))).take(1)[0][1].head()


Out[33]:
PassengerId Survived Pclass Name Sex Age SibSp Parch Ticket Fare Cabin Embarked
0 2 1 1 Cumings, Mrs. John Bradley (Florence Briggs Th... female 38 1 0 PC 17599 71.2833 C85 C
1 3 1 3 Heikkinen, Miss. Laina female 26 0 0 STON/O2. 3101282 7.925 None S
2 4 1 1 Futrelle, Mrs. Jacques Heath (Lily May Peel) female 35 1 0 113803 53.1 C123 S
3 9 1 3 Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg) female 27 0 2 347742 11.1333 None S
4 10 1 2 Nasser, Mrs. Nicholas (Adele Achem) female 14 1 0 237736 30.0708 None C

In [ ]: