In [68]:
import seaborn as sns
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
In [69]:
%matplotlib inline
In [70]:
ssc = SQLContext(sc)
tweets = ssc.read.parquet("/tmp/tweet-corpus")
tweets.cache()
Out[70]:
In [71]:
tweets.count()
Out[71]:
In [72]:
df = tweets.toPandas()
df.columns = ["tokens", "label"]
df.head()
Out[72]:
In [73]:
from pyspark.mllib.feature import IDF
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.classification import LabeledPoint
In [74]:
coeff = 3000
hashingTf = HashingTF(coeff)
In [75]:
#vectors = sc.parallelize([hashingTf.transform(tokens) for tokens in df._1])
#idf = IDF().fit(vectors)
In [76]:
def featurize(tokens): return hashingTf.transform(tokens)
#def tfidf(tokens): return idf.transform(tf(tokens))
In [77]:
df['lpoint'] = df.apply(lambda row: LabeledPoint(row['label'], featurize(row['tokens'])), axis=1)
df
Out[77]:
Create train/test split
In [78]:
# create boolean mask
msk = np.random.rand(len(df)) < 0.80
train = df[msk]
test = df[~msk]
In [79]:
_ = sns.countplot(x="label", data=train)
In [80]:
_ = sns.countplot(x="label", data=test)
In [81]:
from pyspark.mllib.feature import PCA
In [82]:
df
Out[82]:
In [83]:
#lpoints = df['lpoint']
#rdd = sc.parallelize(lpoints.map(lambda point: point.features).tolist())
#pca = PCA(3).fit(rdd)
#df['pca'] = df.apply(lambda row: pca.transform(row['lpoint'].features), axis=1)
#df['pca_0'] = df.apply(lambda row: row['pca'][0], axis=1)
#df['pca_1'] = df.apply(lambda row: row['pca'][1], axis=1)
#viz = df[['label', 'pca_0', 'pca_1']]
In [84]:
#_ = sns.pairplot(viz, vars=['pca_0', 'pca_1'], hue="label", size=6.0)
In [85]:
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.classification import LabeledPoint
Let's add a new column with LabeledPoint
s consisting of TF-IDF vectors.
In [86]:
train_rdd = sc.parallelize(train.lpoint)
Now train the logistic regression estimator.
In [87]:
lr = LogisticRegressionWithSGD.train(train_rdd, initialWeights=Vectors.zeros(coeff), iterations=200)
In [88]:
test['pred'] = test.apply(lambda row: lr.predict(row['lpoint'].features), axis=1)
In [89]:
test
Out[89]:
In [90]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics
In [91]:
scoreAndLabels = test.apply(lambda row: (float(row['pred']), row['lpoint'].label), axis=1)
scoreAndLabels = sc.parallelize(scoreAndLabels)
In [92]:
binary_metrics = BinaryClassificationMetrics(scoreAndLabels)
In [93]:
binary_metrics.areaUnderPR
Out[93]:
In [94]:
binary_metrics.areaUnderROC
Out[94]:
In [95]:
mult_metrics = MulticlassMetrics(scoreAndLabels)
In [96]:
mult_metrics.precision()
Out[96]:
In [97]:
mult_metrics.recall()
Out[97]:
In [98]:
max(test.label.mean(), 1 - test.label.mean())
Out[98]:
In [99]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF
lr = LogisticRegression()
tf = HashingTF(inputCol="tokens", outputCol="features")
pipeline = Pipeline(stages=[tf, lr])
In [100]:
pdf = ssc.createDataFrame(df)
In [101]:
dataset = sqlContext.createDataFrame(
[(point.features, point.label) for point in df['lpoint']],
["features", "label"])
ptrain = ssc.createDataFrame(train)
ptest = ssc.createDataFrame(test[['tokens','label','lpoint']])
In [102]:
model = pipeline.fit(ptrain)
In [103]:
prediction = model.transform(ptest)
In [104]:
result = prediction.select("tokens", "label", "prediction").toPandas()
In [106]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
In [1]:
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
evaluator.evaluate(cvModel.transform(dataset))
type(cvModel)
In [117]:
weights = cvModel.bestModel.weights
In [128]:
lr_new = LogisticRegressionWithSGD.train(train_rdd, initialWeights=weights, iterations=200)
test['pred_new'] = test.apply(lambda row: lr_new.predict(row['lpoint'].features), axis=1)
In [129]:
scoreAndLabels = test.apply(lambda row: (float(row['pred_new']), row['lpoint'].label), axis=1)
scoreAndLabels = sc.parallelize(scoreAndLabels)
In [130]:
binary_metrics = BinaryClassificationMetrics(scoreAndLabels)
In [131]:
binary_metrics.areaUnderROC
Out[131]:
In [132]:
binary_metrics.areaUnderPR
Out[132]:
In [133]:
mult_metrics = MulticlassMetrics(scoreAndLabels)
In [134]:
mult_metrics.precision()
Out[134]:
In [ ]: