In [1]:
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Author: Gusseppe Bravo <gbravor@uni.pe>
# License: BSD 3 clause
"""
This module provides the logic of the whole project.
"""
import define
#import analyze
import prepare
import feature_selection
import evaluate
import time
import os
import findspark
findspark.init()
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
#from pyspark import SparkContext, SparkConf
try:
# spark.stop()
pass
except:
pass
name = "datasets/buses_10000_filtered.csv"
# name = "hdfs://King:9000/user/bdata/mta_data/MTA-Bus-Time_.2014-08-01.txt"
response = "tiempoRecorrido"
spark_session = SparkSession.builder \
.master('local')\
.appName("Sparkmach") \
.config("spark.driver.allowMultipleContexts", "true")\
.getOrCreate()
# conf = SparkConf()\
# .setMaster("local")\
# .setAppName("sparkmach")\
# .set("spark.driver.allowMultipleContexts", "true")
#sparkContext = SparkContext(conf=conf)
currentDir = os.getcwd()
spark_session.sparkContext.addPyFile(currentDir + "/define.py")
#spark_session.sparkContext.addPyFile("/home/vagrant/tesis/sparkmach/sparkmach/sparkmach/analyze.py")
spark_session.sparkContext.addPyFile(currentDir + "/prepare.py")
spark_session.sparkContext.addPyFile(currentDir + "/feature_selection.py")
spark_session.sparkContext.addPyFile(currentDir + "/evaluate.py")
# STEP 0: Define workflow parameters
definer = define.Define(spark_session, data_path=name, response=response).pipeline()
# STEP 1: Analyze data by ploting it
#analyze.Analyze(definer).pipeline()
# STEP 2: Prepare data by scaling, normalizing, etc.
preparer = prepare.Prepare(definer).pipeline()
#STEP 3: Feature selection
featurer = feature_selection.FeatureSelection(definer).pipeline()
#STEP4: Evalute the algorithms by using the pipelines
# evaluator = evaluate.Evaluate(definer, preparer, featurer).pipeline()
# start = time.time()
# result = main()
# end = time.time()
# print()
# print("Execution time for all the steps: ", end-start)
In [2]:
%%time
evaluator = evaluate.Evaluate(definer, preparer, featurer).pipeline()
In [5]:
import pandas as pd
In [6]:
bus_data = pd.read_csv('datasets/buses_1458098_filtered.csv')
bus_data.head()
Out[6]:
In [7]:
bus_data.shape
Out[7]:
In [ ]:
In [7]:
spark_session.sparkContext
Out[7]:
In [10]:
from pyspark.sql import functions as F
df = spark_session.createDataFrame([
(1, "a"),
(2, "b"),
(3, "c"),
], ["ID", "Text"])
categories = df.select("Text").distinct().rdd.flatMap(lambda x: x).collect()
exprs = [F.when(F.col("Text") == category, 1).otherwise(0).alias(category)
for category in categories]
df.select("ID", *exprs).show()
In [17]:
print(df.select("Text").distinct().rdd.flatMap(lambda x: x).collect())
In [12]:
from pyspark.sql.functions import col
df.sort(col("Score").desc()).show(truncate=False)
In [4]:
df.show(truncate=False)
In [ ]:
%%time
import bus_times
import os
import define
#import analyze
import prepare
import feature_selection
import evaluate
from pyspark.ml.feature import StringIndexer
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row, SparkSession
from pyspark.sql.types import *
name = "datasets/buses_10000_filtered.csv"
# name = "hdfs://King:9000/user/bdata/mta_data/MTA-Bus-Time_.2014-08-01.txt"
response = "class"
spark_session = SparkSession.builder \
.master('local[*]')\
.appName("Sparkmach") \
.config("spark.driver.allowMultipleContexts", "true")\
.getOrCreate()
currentDir = os.getcwd()
#Piero
spark_session.sparkContext.addPyFile(currentDir + "/bus_times.py")
#Gusseppe
spark_session.sparkContext.addPyFile(currentDir + "/define.py")
spark_session.sparkContext.addPyFile(currentDir + "/prepare.py")
spark_session.sparkContext.addPyFile(currentDir + "/feature_selection.py")
spark_session.sparkContext.addPyFile(currentDir + "/evaluate.py")
rdd = spark_session.sparkContext.textFile(currentDir + '/datasets/MTA-Bus-Time_.2014-08-01.txt')
# rdd = spark_session.sparkContext.textFile(currentDir + '/datasets/test.txt')
# rdd = sc.textFile('hdfs://King:9000/user/bdata/mta_data/MTA-Bus-Time_.2014-10-31.txt')
classTuple= bus_times.mainFilter(rdd)
halfHourBucket=classTuple.map(lambda x: bus_times.toHalfHourBucket(list(x)))
bucket_schema= StructType([StructField("bus_id",StringType(), True),StructField("route_id",StringType(), True),StructField("next_stop_id",StringType(), True),StructField("direction",StringType(), True),StructField("half_hour_bucket",StringType(), True),StructField("class",StringType(), True) ])
# bucket_schema= StructType([StructField("bus_id",IntegerType(), True),StructField("route_id",StringType(), True),StructField("next_stop_id",StringType(), True),StructField("direction",IntegerType(), True),StructField("half_hour_bucket",FloatType(), True),StructField("class",FloatType(), True) ])
df = spark_session.createDataFrame(halfHourBucket, bucket_schema)
stringIndexer = StringIndexer(inputCol='route_id', outputCol='route_id'+"_Index")
df = stringIndexer.fit(df).transform(df)
stringIndexer = StringIndexer(inputCol='next_stop_id', outputCol='next_stop_id'+"_Index")
df = stringIndexer.fit(df).transform(df)
drop_list = ['route_id', 'next_stop_id']
df = df.select([column for column in df.columns if column not in drop_list])
# print('hellllooo')
# STEP 0: Define workflow parameters
definer = define.Define(spark_session, data_path=name, response=response, df=df).pipeline()
# STEP 1: Analyze data by ploting it
#analyze.Analyze(definer).pipeline()
# STEP 2: Prepare data by scaling, normalizing, etc.
preparer = prepare.Prepare(definer).pipeline()
#STEP 3: Feature selection
featurer = feature_selection.FeatureSelection(definer).pipeline()
In [4]:
%%time
evaluator = evaluate.Evaluate(definer, preparer, featurer).pipeline()
In [5]:
df.show(20)
In [1]:
spark_session
In [27]:
# stringIndexer = StringIndexer(inputCol='route_id', outputCol='route_id'+"_Index")
# df = stringIndexer.fit(times_df).transform(times_df)
# stringIndexer = StringIndexer(inputCol='next_stop_id', outputCol='next_stop_id'+"_Index")
# result = stringIndexer.fit(df).transform(df)
# drop_list = ['route_id', 'next_stop_id']
# df = df.select([column for column in df.columns if column not in drop_list])
# df.show()
In [ ]:
model1 = RandomForestClassificationModel.load("./models/name.ml")
model1
In [3]:
evaluator.Evaluate.test
In [ ]:
In [ ]:
In [ ]:
In [3]:
definer.data.schema.fields
Out[3]:
In [4]:
definer.data.dropna().count()
Out[4]:
In [6]:
tt = definer.data.dropna()
tt.count()
Out[6]:
In [1]:
definer.data.count()
In [35]:
from pyspark.sql.functions import col
df = df.withColumn("class", col("class").cast('float'))
df = df.withColumn("bus_id", col("class").cast('float'))
df = df.withColumn("direction", col("class").cast('float'))
df.schema
Out[35]:
In [36]:
df.show(4)
In [ ]:
df = rawdata.select(col('house name'), rawdata.price.cast('float').alias('price'))
In [25]:
df.count()
Out[25]:
In [7]:
times_df.select('next_stop_id').distinct().count()
Out[7]:
In [6]:
times_df.select('route_id').distinct().count()
Out[6]:
In [70]:
# times_df.write.csv('datasets/bus_2014-08-01.csv', header=True)
In [50]:
df = spark_session.read\
.format("txt")\
.option("header", "true")\
.option("mode", "DROPMALFORMED")\
.option("delimiter", "\t")\
.option("inferSchema", "true")\
.csv('datasets/MTA-Bus-Time_.2014-08-01.txt')
In [8]:
times_df.count()
Out[8]:
In [9]:
times_df_t = times_df.limit(40)
times_df_t.show(8)
In [20]:
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol='route_id', outputCol='route_id'+"_Index")
result = stringIndexer.fit(times_df_t).transform(times_df_t)
stringIndexer = StringIndexer(inputCol='next_stop_id', outputCol='next_stop_id'+"_Index")
result = stringIndexer.fit(result).transform(result)
result.show()
In [18]:
result.show()
In [23]:
drop_list = ['route_id', 'next_stop_id']
result = result.select([column for column in result.columns if column not in drop_list])
result.show()
In [19]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
# Prepare training documents, which are labeled.
training = spark_session.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
(8, "e spark program", 1.0),
(9, "a e c l", 0.0),
(10, "spark compile", 1.0),
(11, "hadoop software", 0.0)
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
# Prepare test documents, which are unlabeled.
test = spark_session.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "mapreduce spark"),
(7, "apache hadoop")
], ["id", "text"])
# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
print(row)
In [9]:
cvModel.bestModel.save('./models/bm.ml')
In [ ]:
bm = CrossValidator.
In [20]:
pp = PipelineModel.load('./models/bm.ml')
pp
Out[20]:
In [22]:
pp.transform(test).show(10)
In [17]:
prediction.show(10)
In [ ]: