สำรวจความนิยมของเหล่าไอดอล BNK48 แบบ real-time จาก Twitter's hashtags ด้วย Python

ตัวอย่างแสดงการ streaming และจัดการข้อมูลด้วย PySpark (2.2.0)

Techical part 0 -- Workflow

ส่วน workflow หลักๆ ก็เหมือนกับการทำ streaming ด้วย Python networking interface ปกตินะครับมาดู โดยมีการเปลี่ยนแปลงโดยการใช้ PySpark ในขั้นตอน Technical part 2 และ Technicla part 3

  1. ดึงข้อมูลจาก Twitter ด้วย Tweepy
  2. Streaming และการจัดการข้อมูล DataFrame ด้วย Pyspark
  3. จัดลำดับ hashtags และทำหน้าจอแสดงผลแบบ real-time อย่างง่ายๆ

โดยการใช้ PySpark จะมีประโยชน์เมื่อต้องจัดการข้อมูลขนาดใหญ่เนื่องจากสามารถทำ distributed computation ได้

Techical part 1 -- ดึงข้อมูลจาก Twitter ด้วย Tweepy

ทำการสร้าง script (streamingTwitterTags.py) เพื่อเชื่อมต่อและดึงข้อมูลจาก Twitter อย่างที่แสดงใน blog หลักครับ

Technical part 2 -- Streaming และการจัดการข้อมูล DataFrame ด้วย Pyspark

สำหรับการ streaming data บน Jupyter Notebook โดยใช้ PySpark ทำได้ดังส่วนข้างล่างครับ

มีกระบวนการโดยสรุปก็คือ

  • บรรทัดที่ 6: สร้าง SparkSession
  • บรรทัดที่ 9: เชื่อมต่อ streaming socket โดยให้กรอกค่า "host" และ ค่า "port" ที่เราตั้งไว้ใน streamingTwitterTags.py
  • บรรทัดที่ 11-23: จัดการข้อมูล twitter streaming โดยค่าบนบรรทัดที่ 22 คือ PySpark DataFrame ที่เรียงลำดับ hashtags ทั้งหมดจากมากลงไปหาน้อยโดยเก็บข้อมูลจาก tweets ทั้งหมดที่มีคำว่า '#bnk48' อยู่ (ตั้งไว้ใน streamingTwitterTags.py)
  • บรรทัดที่ 26-27: บันทึกเวลาและเริ่มการ streaming โดยให้ผลลัพธ์บันทึกไว้ใน table ชื่อ "topTagsTable"

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower, split, explode, substring, count
from datetime import datetime

# create SparkSession
spark = SparkSession.builder.appName('streamTwitterTags').getOrCreate()

# connect and get tweets
tweets = spark.readStream.format("socket").option("host", "127.0.0.1").option("port", 5555).load()

# convert to lowercase and split the words
words_train = tweets.select(split(lower(tweets.value), " ").alias("value"))

# 'explode' a list of words into rows with single word
words_df = words_train.select(explode(words_train.value).alias('word_explode'))

# keep only rows that the word starts with '#'
hashtags = words_df.filter(substring(words_df.word_explode,0,1)=='#')
hashtags = hashtags.select(hashtags.word_explode.alias('hashtag'))

# count hashtags
count_hashtags = hashtags.groupBy('hashtag').count()
count_hashtags_order = count_hashtags.orderBy(count_hashtags['count'].desc())

# start streaming
start_time = datetime.now()
query = count_hashtags_order.writeStream.outputMode("complete").format("memory").queryName("topTagsTable").start()

เมื่อเรามีทั้ง script สำหรับทำการ streaming tweets (streamingTwitterTags.py) และ script สำหรับจัดการข้อมูลบน Jupyter Notebook แล้ว เราก็พร้อมที่จะทดสอบระบบ โดยเริ่มจากสั่ง run script สำหรับ streaming ก่อน

twitterStreaming_run

จากนั้นก็ run code PySpark บน Jupyter Notebook เท่านี้ code ของเราเริ่มดึง tweets จาก Twitter มาจัดการสร้าง DataFrame ถ้าอยากจะรู้ว่า ณ ขณะนั้นๆ DataFrame ของเราหน้าตาเป็นยังไงก็เรียกดูได้ด้วยคำสั่ง


In [ ]:
spark.sql("select * from topTagsTable").limit(11).show()

ก็จะเป็นการเรียกข้อมูล hashtags 11 ลำดับแรกขึ้นมา ขั้นตอนนี้อาจต้องรอให้เรา run code ไปสักพัก (ไม่กี่วินาทีก็พอ) ก่อนเพื่อให้มีข้อมูลใน DataFrame มาแสดง ไม่งั้นเราก็จะเห็น DataFrame เปล่าๆ

การเห็น DataFrame ออกมาแบบนี้ก็ดีในระดับนึง แต่จะดียิ่งขึ้นถ้าเรานำข้อมูลที่ streaming นี้มาวาด chart ที่ช่วยแสดงผลแบบ real-time

Technical part 3 -- จัดลำดับ hashtags และทำหน้าจอแสดงผลแบบ real-time อย่างง่ายๆ

เมื่อเรามีตัว PySpark DataFrame แล้ว วิธีที่จะแสดงผลอย่างง่ายก็คือ แปลงข้อมูลเป็น Pandas DataFrame แล้วใช้ที่สุด Pandas+Matplotlib ช่วยทำ chart ครับ โดยเราอยากให้ chart ของเราคอย update เรื่อยๆ ตามช่วงเวลาที่เรากำหนดไว้

ตัว code ก็มีประมาณนี้ครับ


In [2]:
import time
from datetime import timedelta
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocatorหลังหลัง
from IPython import display

%matplotlib inline

# set streaming period
stream_period = 10  # in minutes
finish_time = start_time + timedelta(minutes=stream_period)

# interactively query in-memory table
while datetime.now() < finish_time:
    
    # set wait time between iteration
    time.sleep(10)    
    
    # get top hashtags
    top_hashtags_sql = spark.sql("select * from topTagsTable").limit(11)
    
    # convert top hashtags DataFrame to Pandas DataFrame
    top_hashtags = top_hashtags_sql.toPandas()
    
    # number of '#bnk48'
    bnk48_count = top_hashtags[top_hashtags['hashtag']=='#bnk48']['count'].values

    # create bar chart ranking top ten hashtags related to '#bnk48'
    fig, ax = plt.subplots(1,1,figsize=(10,6)) 
    top_hashtags[top_hashtags['hashtag']!='#bnk48'].plot(kind='bar', x='hashtag', y='count', legend=False, ax=ax)
    ax.set_title("Top 10 hashtags related to #BNK48 (%d counts)" % bnk48_count, fontsize=18)
    ax.set_xlabel("Hashtag", fontsize=18)
    ax.set_ylabel("Count", fontsize=18)
    ax.set_xticklabels(ax.get_xticklabels(), {"fontsize":14}, rotation=30)
    ax.yaxis.set_major_locator(MaxNLocator(integer=True)) # show only integer yticks
    plt.yticks(fontsize=14)
    
    # clear previous output, print start time and current time, and plot the current chart 
    display.clear_output(wait=True)
    print("start time:", start_time.strftime('%Y-%m-%d %H:%M:%S'))
    print("current time:", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
    plt.show()


start time: 2018-01-20 13:03:13
current time: 2018-01-20 13:13:20

ปล. stream เสร็จแล้วก็อย่าลืมไปปิดตัว terminal ที่รัน python script (streamingTwitterTags.py) เชื่อมต่อกับ Twitter API ด้วยนะครับ