In [1]:
# 設定Spark
import os
import sys
# SPARK_HOME="/opt/spark-1.4.1-bin-hadoop2.6"
# os.environ["SPARK_HOME"] = SPARK_HOME
# sys.path.append(os.path.join(SPARK_HOME, 'python'))
# sys.path.append(os.path.join(SPARK_HOME, 'python/lib/py4j-0.8.2.1-src.zip'))
#----
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
# import seaborn as sns
# sns.set(rc={"figure.figsize": (14, 5)},palette=sns.color_palette("Set1", 2))
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.ml.param import *
from pyspark.ml import *
# from pyspark import HiveContext
# ctx = HiveContext(sc)
ctx=sqlContext
資料來源是 2015痞客邦 PIXNET HACKATHON 活動中開放的資料集
文章資料集蘊含正常使用者所發佈的優質文章,及spam使用者所發佈的劣質廣告文章,資料收集時間為2015/4。 因為data使用規定,本demo無法直接提供資料,請於資料連結內下載。 資料欄位說明:
"post_at": 文章發佈時間 (Unix Timestamp),
"author": 作者 ID ,
"tags": [文章歸屬的標籤列表],
"title": 文章的標題,
"hits": 文章總人氣,
"content": 部落格本文
"comment_count": 多少人 comment 過,
"comment_ids": [留言者ID] ,
"category": 文章的分類,
"spam": 是否為 Spam 文章,1為spam,0為正常。
In [12]:
path = <<FILL-IN>> # the path of file: articles-half-a.json.
panda_df = pd.read_json(path) #如果無法讀入,可以試試python的json套件
df=ctx.createDataFrame(panda_df)
df=df.withColumn('spam2',df['spam'].astype(DoubleType())).drop('spam').withColumnRenamed('spam2','spam') # 1.4 bug
df=df.withColumn('post_at2',(df['post_at']/1000000000).astype(LongType())).drop('post_at').withColumnRenamed('post_at2','post_at') # 1.4 bug
df.printSchema()
In [13]:
# 觀察table內的數值
df.select('category','comment_count','hits','post_at','title','spam').show(2)
In [14]:
df.select('post_at','spam').show(3)
In [15]:
from datetime import datetime,tzinfo,timedelta
# 定義把utc時間轉成當天小時時間的function
get_hour = lambda x: (datetime.utcfromtimestamp(float(x)) + timedelta(hours=8)).hour
# 定義SQL UDF
hourOfDay = udf(get_hour, IntegerType())
In [16]:
df_hour=(df.withColumn('hour',hourOfDay(df['post_at'])) # 使用上述定義的UDF,增加hour欄位
.groupBy('spam','hour') # group by (spam , hour)
.agg(count('*').alias('count')) # 計算每個group內的數量並且命名為 'count'
.orderBy(asc('hour'))) # 照時間順序排序結果
df_hour.printSchema()
In [17]:
# spark dataframe轉成pandas dataframe
pandas_df = df_hour.toPandas()
# 轉一下格式方便畫圖
pandas_df = pandas_df.pivot_table(index='hour',columns='spam',values='count')
pandas_df.set_axis(1,['not spam','spam'])
# 使用pandas畫圖
ax=pandas_df.plot(kind='bar')
ax.set_title('count by hour')
ax.set_ylabel('count')
ax.set_xlabel('hour')
Out[17]:
In [18]:
# 加入時間特徵
df_ml = df.withColumn('hour',hourOfDay(df['post_at']).astype(DoubleType()))
本demo目的為Pipeline的使用,為了簡單起見,本demo只挑選幾個簡單的特徵:
我們使用線性分類器logistic regression,這種分類器只接受數值特徵,類別特徵必須先經過編碼轉換才能通過分類器。留言數和點擊數本身是數值特徵, 小時時間類別是類別特徵需要經過編碼轉成數值特徵。
Pipeline流程圖:
發文時間 ---- 類別數值化 ------------\
\
點擊數量 -------------------------- 結合 ---> 分類器
/
留言數量 --------------------------/
In [19]:
# 發文時間編碼
hour_encoder = OneHotEncoder(inputCol="hour", outputCol="hour_code")
# 結合所有特徵
assembler = VectorAssembler(inputCols=["hour_code","hits",'comment_count'], outputCol="features")
# 分類器
log_regressor = LogisticRegression(featuresCol="features",labelCol="spam")
#機器學習管線
pipeline = Pipeline(stages=[hour_encoder,assembler,log_regressor])
In [20]:
df_train,df_test=df_ml.randomSplit([7.,3.],123)
In [21]:
model = pipeline.fit(df_train)
In [22]:
# 用訓練好的pipeline進行預測測試資料集。
# 訓練好的pipeline是Transformer所以有transform方法。
df_pred = model.transform(df_test)
In [23]:
# 觀察transofrm對dataframe做了什麼事,所有pipeline設定的特徵和預測都加入dataframe了。
df_pred.printSchema()
In [24]:
pred_label = df_pred.select("prediction", "spam")
# 建立accuracy function,使用內建dataframe functions達成UDAF
accuracy = avg((pred_label.prediction==pred_label['spam']).astype(IntegerType())).alias("accuracy")
# 需要groupBy才能使用此function, 所以就group所有data吧
pred_label.groupBy().agg(accuracy).show()
In [25]:
# 檢討模型效能,觀察什麼樣的特徵導致錯誤預測。
df_pred.filter(df_pred['spam']!=df_pred['prediction'])\
.select('hour','hits','comment_count','spam','prediction')\
.show(10)
In [7]:
sc.stop()
In [ ]: