Main 4


In [1]:
#!/usr/bin/python
# -*- coding: utf-8 -*-

# Author: Gusseppe Bravo <gbravor@uni.pe>
# License: BSD 3 clause

"""
This module provides the logic of the whole project.

"""
import define
#import analyze
import prepare
import feature_selection
import evaluate

import time
import os
import findspark
findspark.init()

from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
#from pyspark import SparkContext, SparkConf

try:
#     spark.stop()
    pass
except:
    pass
    
name = "datasets/buses_10000_filtered.csv"
#     name = "hdfs://King:9000/user/bdata/mta_data/MTA-Bus-Time_.2014-08-01.txt"
response = "tiempoRecorrido"

spark_session = SparkSession.builder \
.master('local')\
.appName("Sparkmach") \
.config("spark.driver.allowMultipleContexts", "true")\
.getOrCreate()
    
    
# conf = SparkConf()\
# .setMaster("local")\
# .setAppName("sparkmach")\
# .set("spark.driver.allowMultipleContexts", "true")

#sparkContext = SparkContext(conf=conf)

currentDir = os.getcwd()
spark_session.sparkContext.addPyFile(currentDir + "/define.py")
#spark_session.sparkContext.addPyFile("/home/vagrant/tesis/sparkmach/sparkmach/sparkmach/analyze.py")
spark_session.sparkContext.addPyFile(currentDir + "/prepare.py")
spark_session.sparkContext.addPyFile(currentDir + "/feature_selection.py")
spark_session.sparkContext.addPyFile(currentDir + "/evaluate.py")

# STEP 0: Define workflow parameters
definer = define.Define(spark_session, data_path=name, response=response).pipeline()

# STEP 1: Analyze data by ploting it
#analyze.Analyze(definer).pipeline()

# STEP 2: Prepare data by scaling, normalizing, etc. 
preparer = prepare.Prepare(definer).pipeline()

#STEP 3: Feature selection
featurer = feature_selection.FeatureSelection(definer).pipeline()

#STEP4: Evalute the algorithms by using the pipelines
# evaluator = evaluate.Evaluate(definer, preparer, featurer).pipeline()


# start = time.time()
# result = main()
# end = time.time()

# print()
# print("Execution time for all the steps: ", end-start)

In [2]:
%%time
evaluator = evaluate.Evaluate(definer, preparer, featurer).pipeline()


+---------------------------+--------------------+------------------+
|Model                      |Score               |Time              |
+---------------------------+--------------------+------------------+
|DecisionTreeRegressor      |0.08964561627029533 |6.327453374862671 |
|GeneralizedLinearRegression|0.013625352415245873|5.869198322296143 |
|Total time                 |0.0                 |12.196651697158813|
+---------------------------+--------------------+------------------+

CPU times: user 444 ms, sys: 66.1 ms, total: 510 ms
Wall time: 13.1 s

In [5]:
import pandas as pd

In [6]:
bus_data = pd.read_csv('datasets/buses_1458098_filtered.csv')
bus_data.head()


Out[6]:
busID ProximaParada Ruta Orientacion rangoHora tiempoRecorrido
0 3433 249 9735 1.0 2.0 856.0
1 3433 249 9735 1.0 5.0 636.0
2 3984 160 12237 1.0 5.0 96.0
3 3984 160 12237 1.0 7.5 32.0
4 5517 255 12521 0.0 11.5 31.0

In [7]:
bus_data.shape


Out[7]:
(1449200, 6)

In [ ]:


In [7]:
spark_session.sparkContext


Out[7]:

SparkContext

Spark UI

Version
v2.2.1
Master
local
AppName
Sparkmach

In [10]:
from pyspark.sql import functions as F

df = spark_session.createDataFrame([
    (1, "a"),
    (2, "b"),
    (3, "c"),
], ["ID", "Text"])

categories = df.select("Text").distinct().rdd.flatMap(lambda x: x).collect()

exprs = [F.when(F.col("Text") == category, 1).otherwise(0).alias(category)
         for category in categories]

df.select("ID", *exprs).show()


+---+---+---+---+
| ID|  c|  b|  a|
+---+---+---+---+
|  1|  0|  0|  1|
|  2|  0|  1|  0|
|  3|  1|  0|  0|
+---+---+---+---+


In [17]:
print(df.select("Text").distinct().rdd.flatMap(lambda x: x).collect())


['c', 'b', 'a']

In [12]:
from pyspark.sql.functions import col
df.sort(col("Score").desc()).show(truncate=False)


+---------------------------+--------------------+
|Model                      |Score               |
+---------------------------+--------------------+
|GBTRegressor               |0.18272008210363277 |
|RandomForestRegressor      |0.12066847476050002 |
|DecisionTreeRegressor      |0.08964561627029533 |
|LinearRegression           |0.013625352415245873|
|GeneralizedLinearRegression|0.013625352415245873|
+---------------------------+--------------------+


In [4]:
df.show(truncate=False)


+---------------------------+--------------------+
|Model                      |Score               |
+---------------------------+--------------------+
|LinearRegression           |0.013625352415245873|
|GeneralizedLinearRegression|0.013625352415245873|
|DecisionTreeRegressor      |0.08964561627029533 |
|RandomForestRegressor      |0.12066847476050002 |
|GBTRegressor               |0.18272008210363277 |
+---------------------------+--------------------+

Main Piero


In [ ]:
%%time
import bus_times
import os
import define
#import analyze
import prepare
import feature_selection
import evaluate


from pyspark.ml.feature import StringIndexer
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row, SparkSession
from pyspark.sql.types import *

name = "datasets/buses_10000_filtered.csv"
#     name = "hdfs://King:9000/user/bdata/mta_data/MTA-Bus-Time_.2014-08-01.txt"
response = "class"


spark_session = SparkSession.builder \
.master('local[*]')\
.appName("Sparkmach") \
.config("spark.driver.allowMultipleContexts", "true")\
.getOrCreate()

currentDir = os.getcwd()
#Piero
spark_session.sparkContext.addPyFile(currentDir + "/bus_times.py") 

#Gusseppe
spark_session.sparkContext.addPyFile(currentDir + "/define.py")
spark_session.sparkContext.addPyFile(currentDir + "/prepare.py")
spark_session.sparkContext.addPyFile(currentDir + "/feature_selection.py")
spark_session.sparkContext.addPyFile(currentDir + "/evaluate.py")

rdd = spark_session.sparkContext.textFile(currentDir + '/datasets/MTA-Bus-Time_.2014-08-01.txt')
# rdd = spark_session.sparkContext.textFile(currentDir + '/datasets/test.txt')
# rdd = sc.textFile('hdfs://King:9000/user/bdata/mta_data/MTA-Bus-Time_.2014-10-31.txt')


classTuple= bus_times.mainFilter(rdd)
halfHourBucket=classTuple.map(lambda x: bus_times.toHalfHourBucket(list(x)))


bucket_schema= StructType([StructField("bus_id",StringType(), True),StructField("route_id",StringType(), True),StructField("next_stop_id",StringType(), True),StructField("direction",StringType(), True),StructField("half_hour_bucket",StringType(), True),StructField("class",StringType(), True) ])
# bucket_schema= StructType([StructField("bus_id",IntegerType(), True),StructField("route_id",StringType(), True),StructField("next_stop_id",StringType(), True),StructField("direction",IntegerType(), True),StructField("half_hour_bucket",FloatType(), True),StructField("class",FloatType(), True) ])

df = spark_session.createDataFrame(halfHourBucket, bucket_schema)
stringIndexer = StringIndexer(inputCol='route_id', outputCol='route_id'+"_Index")
df = stringIndexer.fit(df).transform(df)    

stringIndexer = StringIndexer(inputCol='next_stop_id', outputCol='next_stop_id'+"_Index")
df = stringIndexer.fit(df).transform(df)    
drop_list = ['route_id', 'next_stop_id']

df = df.select([column for column in df.columns if column not in drop_list])


# print('hellllooo')

# STEP 0: Define workflow parameters
definer = define.Define(spark_session, data_path=name, response=response, df=df).pipeline()

# STEP 1: Analyze data by ploting it
#analyze.Analyze(definer).pipeline()

# STEP 2: Prepare data by scaling, normalizing, etc. 
preparer = prepare.Prepare(definer).pipeline()

#STEP 3: Feature selection
featurer = feature_selection.FeatureSelection(definer).pipeline()

In [4]:
%%time
evaluator = evaluate.Evaluate(definer, preparer, featurer).pipeline()


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
~/anaconda3/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

~/anaconda3/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    319                     "An error occurred while calling {0}{1}{2}.\n".
--> 320                     format(target_id, ".", name), value)
    321             else:

Py4JJavaError: An error occurred while calling o282.fit.
: java.lang.IllegalArgumentException: requirement failed: Nothing has been added to this summarizer.
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.variance(MultivariateOnlineSummarizer.scala:204)
	at org.apache.spark.mllib.feature.StandardScaler.fit(StandardScaler.scala:61)
	at org.apache.spark.ml.feature.StandardScaler.fit(StandardScaler.scala:117)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

IllegalArgumentException                  Traceback (most recent call last)
<timed exec> in <module>()

~/Desktop/core/sparkmach/sparkmach/evaluate.py in pipeline(self)
     64         #evaluators = []
     65         self.build_pipelines(self.set_models())
---> 66         self.evaluate_pipelines()
     67         #self.setBestPipelines()
     68 

~/Desktop/core/sparkmach/sparkmach/evaluate.py in evaluate_pipelines(self, ax)
    134 
    135 
--> 136             cvModel = crossval.fit(Evaluate.train)
    137             end = time.time()
    138             prediction = cvModel.transform(Evaluate.test)

~/anaconda3/lib/python3.6/site-packages/pyspark/ml/base.py in fit(self, dataset, params)
     62                 return self.copy(params)._fit(dataset)
     63             else:
---> 64                 return self._fit(dataset)
     65         else:
     66             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

~/anaconda3/lib/python3.6/site-packages/pyspark/ml/tuning.py in _fit(self, dataset)
    230             validation = df.filter(condition)
    231             train = df.filter(~condition)
--> 232             models = est.fit(train, epm)
    233             for j in range(numModels):
    234                 model = models[j]

~/anaconda3/lib/python3.6/site-packages/pyspark/ml/base.py in fit(self, dataset, params)
     57             params = dict()
     58         if isinstance(params, (list, tuple)):
---> 59             return [self.fit(dataset, paramMap) for paramMap in params]
     60         elif isinstance(params, dict):
     61             if params:

~/anaconda3/lib/python3.6/site-packages/pyspark/ml/base.py in <listcomp>(.0)
     57             params = dict()
     58         if isinstance(params, (list, tuple)):
---> 59             return [self.fit(dataset, paramMap) for paramMap in params]
     60         elif isinstance(params, dict):
     61             if params:

~/anaconda3/lib/python3.6/site-packages/pyspark/ml/base.py in fit(self, dataset, params)
     62                 return self.copy(params)._fit(dataset)
     63             else:
---> 64                 return self._fit(dataset)
     65         else:
     66             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

~/anaconda3/lib/python3.6/site-packages/pyspark/ml/pipeline.py in _fit(self, dataset)
    106                     dataset = stage.transform(dataset)
    107                 else:  # must be an Estimator
--> 108                     model = stage.fit(dataset)
    109                     transformers.append(model)
    110                     if i < indexOfLastEstimator:

~/anaconda3/lib/python3.6/site-packages/pyspark/ml/base.py in fit(self, dataset, params)
     62                 return self.copy(params)._fit(dataset)
     63             else:
---> 64                 return self._fit(dataset)
     65         else:
     66             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

~/anaconda3/lib/python3.6/site-packages/pyspark/ml/wrapper.py in _fit(self, dataset)
    263 
    264     def _fit(self, dataset):
--> 265         java_model = self._fit_java(dataset)
    266         return self._create_model(java_model)
    267 

~/anaconda3/lib/python3.6/site-packages/pyspark/ml/wrapper.py in _fit_java(self, dataset)
    260         """
    261         self._transfer_params_to_java()
--> 262         return self._java_obj.fit(dataset._jdf)
    263 
    264     def _fit(self, dataset):

~/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161 
   1162         for temp_arg in temp_args:

~/anaconda3/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     77                 raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
     78             if s.startswith('java.lang.IllegalArgumentException: '):
---> 79                 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
     80             raise
     81     return deco

IllegalArgumentException: 'requirement failed: Nothing has been added to this summarizer.'

In [5]:
df.show(20)


+------+---------+----------------+-----+--------------+------------------+
|bus_id|direction|half_hour_bucket|class|route_id_Index|next_stop_id_Index|
+------+---------+----------------+-----+--------------+------------------+
+------+---------+----------------+-----+--------------+------------------+


In [1]:
spark_session


---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-1-120f76ddb124> in <module>()
----> 1 spark_session

NameError: name 'spark_session' is not defined

In [27]:
# stringIndexer = StringIndexer(inputCol='route_id', outputCol='route_id'+"_Index")
# df = stringIndexer.fit(times_df).transform(times_df)    

# stringIndexer = StringIndexer(inputCol='next_stop_id', outputCol='next_stop_id'+"_Index")
# result = stringIndexer.fit(df).transform(df)    
# drop_list = ['route_id', 'next_stop_id']

# df = df.select([column for column in df.columns if column not in drop_list])


# df.show()

In [ ]:
model1 = RandomForestClassificationModel.load("./models/name.ml")

model1

In [3]:
evaluator.Evaluate.test


---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-3-81601c7341cc> in <module>()
----> 1 evaluator.Evaluate.test

NameError: name 'evaluator' is not defined

In [ ]:


In [ ]:


In [ ]:


In [3]:
definer.data.schema.fields


Out[3]:
[StructField(bus_id,FloatType,true),
 StructField(direction,FloatType,true),
 StructField(half_hour_bucket,FloatType,true),
 StructField(class,FloatType,true),
 StructField(route_id_Index,DoubleType,true),
 StructField(next_stop_id_Index,DoubleType,true)]

In [4]:
definer.data.dropna().count()


Out[4]:
1449761

In [6]:
tt = definer.data.dropna()
tt.count()


Out[6]:
1449761

In [1]:
definer.data.count()


---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-1-eb73c5e8922b> in <module>()
----> 1 definer.data.count()

NameError: name 'definer' is not defined

In [35]:
from pyspark.sql.functions import col
df  = df.withColumn("class", col("class").cast('float'))
df  = df.withColumn("bus_id", col("class").cast('float'))
df  = df.withColumn("direction", col("class").cast('float'))
df.schema


Out[35]:
StructType(List(StructField(bus_id,StringType,true),StructField(direction,StringType,true),StructField(half_hour_bucket,StringType,true),StructField(class,FloatType,true),StructField(route_id_Index,DoubleType,true)))

In [36]:
df.show(4)


+------+---------+----------------+-----+--------------+
|bus_id|direction|half_hour_bucket|class|route_id_Index|
+------+---------+----------------+-----+--------------+
|  4367|        1|             9.0|  0.0|         157.0|
|  4367|        1|            10.0|157.0|         157.0|
|  4991|        1|             9.0| 63.0|         119.0|
|   230|        0|             9.0| 32.0|           9.0|
+------+---------+----------------+-----+--------------+
only showing top 4 rows


In [ ]:
df = rawdata.select(col('house name'), rawdata.price.cast('float').alias('price'))

In [25]:
df.count()


Out[25]:
1458098

In [7]:
times_df.select('next_stop_id').distinct().count()


Out[7]:
15245

In [6]:
times_df.select('route_id').distinct().count()


Out[6]:
314

In [70]:
# times_df.write.csv('datasets/bus_2014-08-01.csv', header=True)

In [50]:
df = spark_session.read\
                .format("txt")\
                .option("header", "true")\
                .option("mode", "DROPMALFORMED")\
                .option("delimiter", "\t")\
                .option("inferSchema", "true")\
                .csv('datasets/MTA-Bus-Time_.2014-08-01.txt')

In [8]:
times_df.count()


Out[8]:
1458098

In [9]:
times_df_t = times_df.limit(40)
times_df_t.show(8)


+------+-------------+------------+---------+----------------+-----+
|bus_id|     route_id|next_stop_id|direction|half_hour_bucket|class|
+------+-------------+------------+---------+----------------+-----+
|  4367|MTA NYCT_BX13|  MTA_100897|        1|             9.0|  0.0|
|  4367|MTA NYCT_BX13|  MTA_100897|        1|            10.0|157.0|
|  4991| MTA NYCT_B83|  MTA_308528|        1|             9.0| 63.0|
|   230| MTA NYCT_B35|  MTA_302697|        0|             9.0| 32.0|
|  3834| MTA NYCT_M10|  MTA_401257|        0|             9.0| 31.0|
|  3834| MTA NYCT_M10|  MTA_401257|        0|            10.5|    0|
|  3742|    MTABC_Q10|  MTA_550316|        1|             9.0| 32.0|
|  6296| MTA NYCT_S44|  MTA_201256|        1|             9.0| 31.0|
+------+-------------+------------+---------+----------------+-----+
only showing top 8 rows


In [20]:
from pyspark.ml.feature import StringIndexer

stringIndexer = StringIndexer(inputCol='route_id', outputCol='route_id'+"_Index")
result = stringIndexer.fit(times_df_t).transform(times_df_t)    

stringIndexer = StringIndexer(inputCol='next_stop_id', outputCol='next_stop_id'+"_Index")
result = stringIndexer.fit(result).transform(result)    

result.show()


+------+-------------+------------+---------+----------------+-----+--------------+------------------+
|bus_id|     route_id|next_stop_id|direction|half_hour_bucket|class|route_id_Index|next_stop_id_Index|
+------+-------------+------------+---------+----------------+-----+--------------+------------------+
|  4367|MTA NYCT_BX13|  MTA_100897|        1|             9.0|  0.0|           7.0|               5.0|
|  4367|MTA NYCT_BX13|  MTA_100897|        1|            10.0|157.0|           7.0|               5.0|
|  4991| MTA NYCT_B83|  MTA_308528|        1|             9.0| 63.0|          14.0|              10.0|
|   230| MTA NYCT_B35|  MTA_302697|        0|             9.0| 32.0|          10.0|              17.0|
|  3834| MTA NYCT_M10|  MTA_401257|        0|             9.0| 31.0|           3.0|               8.0|
|  3834| MTA NYCT_M10|  MTA_401257|        0|            10.5|    0|           3.0|               8.0|
|  3742|    MTABC_Q10|  MTA_550316|        1|             9.0| 32.0|           9.0|              12.0|
|  6296| MTA NYCT_S44|  MTA_201256|        1|             9.0| 31.0|           4.0|               3.0|
|  6296| MTA NYCT_S44|  MTA_201256|        1|             1.0|728.0|           4.0|               3.0|
|  6296| MTA NYCT_S44|  MTA_201256|        1|             3.0|250.0|           4.0|               3.0|
|   393| MTA NYCT_B68|  MTA_305700|        0|             9.0| 31.0|          13.0|              14.0|
|   496|   MTABC_B103|  MTA_304072|        0|             9.0|  0.0|           2.0|               2.0|
|   496|   MTABC_B103|  MTA_304072|        0|            16.5| 95.0|           2.0|               2.0|
|   496|   MTABC_B103|  MTA_304072|        0|             3.0| 93.0|           2.0|               2.0|
|   496|   MTABC_B103|  MTA_304072|        0|             6.5| 31.0|           2.0|               2.0|
|  3706|    MTABC_Q52|  MTA_550441|        0|             9.0| 62.0|          11.0|              15.0|
|  4762|MTA NYCT_BX40|  MTA_103525|        1|             9.0| 62.0|          12.0|              18.0|
|  3400|   MTABC_BXM9|  MTA_450082|        0|             9.0|655.0|           6.0|               7.0|
|  3400|   MTABC_BXM9|  MTA_450082|        0|             6.0|988.0|           6.0|               7.0|
|  6365| MTA NYCT_M10|  MTA_401335|        1|             9.0| 32.0|           3.0|              13.0|
+------+-------------+------------+---------+----------------+-----+--------------+------------------+
only showing top 20 rows


In [18]:
result.show()


+------+-------------+------------+---------+----------------+-----+--------------+------------------+
|bus_id|     route_id|next_stop_id|direction|half_hour_bucket|class|route_id_Index|next_stop_id_Index|
+------+-------------+------------+---------+----------------+-----+--------------+------------------+
|  4367|MTA NYCT_BX13|  MTA_100897|        1|             9.0|  0.0|           7.0|               5.0|
|  4367|MTA NYCT_BX13|  MTA_100897|        1|            10.0|157.0|           7.0|               5.0|
|  4991| MTA NYCT_B83|  MTA_308528|        1|             9.0| 63.0|          14.0|              10.0|
|   230| MTA NYCT_B35|  MTA_302697|        0|             9.0| 32.0|          10.0|              17.0|
|  3834| MTA NYCT_M10|  MTA_401257|        0|             9.0| 31.0|           3.0|               8.0|
|  3834| MTA NYCT_M10|  MTA_401257|        0|            10.5|    0|           3.0|               8.0|
|  3742|    MTABC_Q10|  MTA_550316|        1|             9.0| 32.0|           9.0|              12.0|
|  6296| MTA NYCT_S44|  MTA_201256|        1|             9.0| 31.0|           4.0|               3.0|
|  6296| MTA NYCT_S44|  MTA_201256|        1|             1.0|728.0|           4.0|               3.0|
|  6296| MTA NYCT_S44|  MTA_201256|        1|             3.0|250.0|           4.0|               3.0|
|   393| MTA NYCT_B68|  MTA_305700|        0|             9.0| 31.0|          13.0|              14.0|
|   496|   MTABC_B103|  MTA_304072|        0|             9.0|  0.0|           2.0|               2.0|
|   496|   MTABC_B103|  MTA_304072|        0|            16.5| 95.0|           2.0|               2.0|
|   496|   MTABC_B103|  MTA_304072|        0|             3.0| 93.0|           2.0|               2.0|
|   496|   MTABC_B103|  MTA_304072|        0|             6.5| 31.0|           2.0|               2.0|
|  3706|    MTABC_Q52|  MTA_550441|        0|             9.0| 62.0|          11.0|              15.0|
|  4762|MTA NYCT_BX40|  MTA_103525|        1|             9.0| 62.0|          12.0|              18.0|
|  3400|   MTABC_BXM9|  MTA_450082|        0|             9.0|655.0|           6.0|               7.0|
|  3400|   MTABC_BXM9|  MTA_450082|        0|             6.0|988.0|           6.0|               7.0|
|  6365| MTA NYCT_M10|  MTA_401335|        1|             9.0| 32.0|           3.0|              13.0|
+------+-------------+------------+---------+----------------+-----+--------------+------------------+
only showing top 20 rows


In [23]:
drop_list = ['route_id', 'next_stop_id']

result = result.select([column for column in result.columns if column not in drop_list])

result.show()


+------+---------+----------------+-----+--------------+------------------+
|bus_id|direction|half_hour_bucket|class|route_id_Index|next_stop_id_Index|
+------+---------+----------------+-----+--------------+------------------+
|  4367|        1|             9.0|  0.0|           7.0|               5.0|
|  4367|        1|            10.0|157.0|           7.0|               5.0|
|  4991|        1|             9.0| 63.0|          14.0|              10.0|
|   230|        0|             9.0| 32.0|          10.0|              17.0|
|  3834|        0|             9.0| 31.0|           3.0|               8.0|
|  3834|        0|            10.5|    0|           3.0|               8.0|
|  3742|        1|             9.0| 32.0|           9.0|              12.0|
|  6296|        1|             9.0| 31.0|           4.0|               3.0|
|  6296|        1|             1.0|728.0|           4.0|               3.0|
|  6296|        1|             3.0|250.0|           4.0|               3.0|
|   393|        0|             9.0| 31.0|          13.0|              14.0|
|   496|        0|             9.0|  0.0|           2.0|               2.0|
|   496|        0|            16.5| 95.0|           2.0|               2.0|
|   496|        0|             3.0| 93.0|           2.0|               2.0|
|   496|        0|             6.5| 31.0|           2.0|               2.0|
|  3706|        0|             9.0| 62.0|          11.0|              15.0|
|  4762|        1|             9.0| 62.0|          12.0|              18.0|
|  3400|        0|             9.0|655.0|           6.0|               7.0|
|  3400|        0|             6.0|988.0|           6.0|               7.0|
|  6365|        1|             9.0| 32.0|           3.0|              13.0|
+------+---------+----------------+-----+--------------+------------------+
only showing top 20 rows


In [19]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel

# Prepare training documents, which are labeled.
training = spark_session.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0),
    (10, "spark compile", 1.0),
    (11, "hadoop software", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

# Prepare test documents, which are unlabeled.
test = spark_session.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "mapreduce spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    print(row)


Row(id=4, text='spark i j k', probability=DenseVector([0.2581, 0.7419]), prediction=1.0)
Row(id=5, text='l m n', probability=DenseVector([0.9186, 0.0814]), prediction=0.0)
Row(id=6, text='mapreduce spark', probability=DenseVector([0.432, 0.568]), prediction=1.0)
Row(id=7, text='apache hadoop', probability=DenseVector([0.6766, 0.3234]), prediction=0.0)

In [9]:
cvModel.bestModel.save('./models/bm.ml')

In [ ]:
bm = CrossValidator.

In [20]:
pp = PipelineModel.load('./models/bm.ml')
pp


Out[20]:
PipelineModel_4ef7b1a9cd490897dff4

In [22]:
pp.transform(test).show(10)


+---+---------------+------------------+--------------------+--------------------+--------------------+----------+
| id|           text|             words|            features|       rawPrediction|         probability|prediction|
+---+---------------+------------------+--------------------+--------------------+--------------------+----------+
|  4|    spark i j k|  [spark, i, j, k]|(100,[5,29,49,56]...|[-1.0560322733153...|[0.25806842225846...|       1.0|
|  5|          l m n|         [l, m, n]|(100,[6,38,55],[1...|[2.42293721495130...|[0.91855974126539...|       0.0|
|  6|mapreduce spark|[mapreduce, spark]|(100,[5,53],[1.0,...|[-0.2735651887090...|[0.43203205663918...|       1.0|
|  7|  apache hadoop|  [apache, hadoop]|(100,[81,95],[1.0...|[0.73822817597493...|[0.67660828566522...|       0.0|
+---+---------------+------------------+--------------------+--------------------+--------------------+----------+


In [17]:
prediction.show(10)


+---+---------------+------------------+--------------------+--------------------+--------------------+----------+
| id|           text|             words|            features|       rawPrediction|         probability|prediction|
+---+---------------+------------------+--------------------+--------------------+--------------------+----------+
|  4|    spark i j k|  [spark, i, j, k]|(100,[5,29,49,56]...|[-1.0560322733153...|[0.25806842225846...|       1.0|
|  5|          l m n|         [l, m, n]|(100,[6,38,55],[1...|[2.42293721495130...|[0.91855974126539...|       0.0|
|  6|mapreduce spark|[mapreduce, spark]|(100,[5,53],[1.0,...|[-0.2735651887090...|[0.43203205663918...|       1.0|
|  7|  apache hadoop|  [apache, hadoop]|(100,[81,95],[1.0...|[0.73822817597493...|[0.67660828566522...|       0.0|
+---+---------------+------------------+--------------------+--------------------+--------------------+----------+


In [ ]: