In [2]:
import sys
from pyspark.sql import Row, Window
import pyspark.sql.functions as SQLFunctions
In [4]:
bikedatafile = "bikeshare_train"
bike_data = sc.textFile(bikedatafile).cache()
line_count = bike_data.count()
print "Line count is %d" % line_count
In [6]:
# df = sqlContext.sql("select * from bikeshare_test")
df = sqlContext.sql("select * from bikeshare_train union select * from bikeshare_test")
print df.count()
print df.printSchema()
print df.describe().show(n=10)
Since the fit() functions all expect two input columns, a label column and a feature column that combines all the encoded features together, we have some steps to take first. Use the VectorAssembler to take a group of separate integer columns and create a new column that is a vector combination of the integer columns
In [8]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols=["temp", "atemp", "humidity","windspeed"],
outputCol="features")
output = assembler.transform(df)
# transform the count column from integer to double (since the labels must be a double)
output = output.select((output["count"]*1.0).alias("label"), "*")
# Let's print out a few of the rows to see how if the VectorAssembler did its job
print output.select("count","temp", "atemp", "humidity","windspeed","features").show(n=5,truncate=False)
In [10]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=5)
model = lr.fit(output)
print model
In [11]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
# Prepare training documents from a list of (id, text, label) tuples.
training = sqlContext.createDataFrame([
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# Fit the pipeline to training documents.
model = pipeline.fit(training)
# Prepare test documents, which are unlabeled (id, text) tuples.
test = sqlContext.createDataFrame([
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "mapreduce spark"),
(7L, "apache hadoop")], ["id", "text"])
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "prediction")
for row in selected.collect():
print(row)
In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
# Prepare training documents from a list of (id, text, label) tuples.
training = sqlContext.createDataFrame([
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)], ["id", "text", "label"])
print "Pre-transformed training set ..."
print training.show()
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tr = tokenizer.transform(training)
print "Transformed training set ..."
print tr.show()
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
hashed = hashingTF.transform(tr)
print "Hashed data set ..."
print hashed.show()
# lr = LogisticRegression(maxIter=10, regParam=0.01)
# pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
In [13]:
from numpy import allclose
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import StringIndexer
df = sqlContext.createDataFrame([
(1.0, Vectors.dense(1.0)),
(0.0, Vectors.sparse(1, [], []))], ["label", "features"])
stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
si_model = stringIndexer.fit(df)
td = si_model.transform(df)
print td.show()
# >>> gbt = GBTClassifier(maxIter=5, maxDepth=2, labelCol="indexed")
# >>> model = gbt.fit(td)
# >>> allclose(model.treeWeights, [1.0, 0.1, 0.1, 0.1, 0.1])
# True
# >>> test0 = sqlContext.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
# >>> model.transform(test0).head().prediction
# 0.0
# >>> test1 = sqlContext.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
# >>> model.transform(test1).head().prediction
# 1.0
In [14]: