Import some useful libraries ...


In [2]:
import sys
from pyspark.sql import Row, Window
import pyspark.sql.functions as SQLFunctions

This won't work ...


In [4]:
bikedatafile = "bikeshare_train"
bike_data = sc.textFile(bikedatafile).cache()
line_count = bike_data.count()
print "Line count is %d" % line_count

... but this does

(I'm still not sure the best way to get dataframe from an file that I uploaded but this will suffice)


In [6]:
# df = sqlContext.sql("select * from bikeshare_test")
df = sqlContext.sql("select * from bikeshare_train union select * from bikeshare_test")
print df.count()
print df.printSchema()
print df.describe().show(n=10)

And now, some Feature extraction ...

Since the fit() functions all expect two input columns, a label column and a feature column that combines all the encoded features together, we have some steps to take first. Use the VectorAssembler to take a group of separate integer columns and create a new column that is a vector combination of the integer columns


In [8]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["temp", "atemp", "humidity","windspeed"],
    outputCol="features")
output = assembler.transform(df)
# transform the count column from integer to double (since the labels must be a double)
output = output.select((output["count"]*1.0).alias("label"), "*")
# Let's print out a few of the rows to see how if the VectorAssembler did its job
print output.select("count","temp", "atemp", "humidity","windspeed","features").show(n=5,truncate=False)

Now we have a column for 'labels' and a column for 'features'

... which means we have what we need to fit/learn a model.


In [10]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=5)
model = lr.fit(output)
print model

In [11]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row

# Prepare training documents from a list of (id, text, label) tuples.
training = sqlContext.createDataFrame([
    (0L, "a b c d e spark", 1.0),
    (1L, "b d", 0.0),
    (2L, "spark f g h", 1.0),
    (3L, "hadoop mapreduce", 0.0)], ["id", "text", "label"])

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled (id, text) tuples.
test = sqlContext.createDataFrame([
    (4L, "spark i j k"),
    (5L, "l m n"),
    (6L, "mapreduce spark"),
    (7L, "apache hadoop")], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "prediction")
for row in selected.collect():
    print(row)

In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row

# Prepare training documents from a list of (id, text, label) tuples.
training = sqlContext.createDataFrame([
    (0L, "a b c d e spark", 1.0),
    (1L, "b d", 0.0),
    (2L, "spark f g h", 1.0),
    (3L, "hadoop mapreduce", 0.0)], ["id", "text", "label"])

print "Pre-transformed training set ..."
print training.show()
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tr = tokenizer.transform(training)
print "Transformed training set ..."
print tr.show()
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
hashed = hashingTF.transform(tr)
print "Hashed data set ..."
print hashed.show()
# lr = LogisticRegression(maxIter=10, regParam=0.01)
# pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

In [13]:
from numpy import allclose
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import StringIndexer
df = sqlContext.createDataFrame([
    (1.0, Vectors.dense(1.0)),
    (0.0, Vectors.sparse(1, [], []))], ["label", "features"])
stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
si_model = stringIndexer.fit(df)
td = si_model.transform(df)
print td.show()
# >>> gbt = GBTClassifier(maxIter=5, maxDepth=2, labelCol="indexed")
# >>> model = gbt.fit(td)
# >>> allclose(model.treeWeights, [1.0, 0.1, 0.1, 0.1, 0.1])
# True
# >>> test0 = sqlContext.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
# >>> model.transform(test0).head().prediction
# 0.0
# >>> test1 = sqlContext.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
# >>> model.transform(test1).head().prediction
# 1.0

In [14]: