The idea is to consume the big rss feed as a stream from Satori, enrich each message with Spark Structured Streaming and visualize it in a browser app.
The Satori stream gathers more than 6.5 million rss feeds. The stream is consumed as websocket with a python function and written into the kafka topic world-feed. Spark streaming is then used to read from the topic into a streaming dataframe which is enhanced with 2 informations:
The stream aggregates the message count by a 15 minute time window, the country_code and the language. All updates are written every 5 seconds into a second kafka topic enriched-feed.
The visualisation is done with a small node.js app consuming the kafka topic and sending the message via websockets to all connected browsers where a reactJS app is handling the update of charts.
In [ ]:
from __future__ import print_function
import socket
import json
import sys
import threading
import time
from satori.rtm.client import make_client, SubscriptionMode
from kafka import KafkaProducer
def satori2kafka(channel,endpoint, appkey, topic, delay=1):
# Kafka
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
with make_client(
endpoint=endpoint, appkey=appkey) as client:
print('Connected!')
mailbox = []
got_message_event = threading.Event()
class SubscriptionObserver(object):
def on_subscription_data(self, data):
for message in data['messages']:
mailbox.append(message)
got_message_event.set()
subscription_observer = SubscriptionObserver()
client.subscribe(
channel,
SubscriptionMode.SIMPLE,
subscription_observer)
if not got_message_event.wait(10):
print("Timeout while waiting for a message")
sys.exit(1)
while True:
for message in mailbox:
msg = json.dumps(message, ensure_ascii=False)
producer.send(topic, msg.encode())
# do not send the messages to fast for development
time.sleep(delay)
simple usage of the python langid library to identify in which language the message is written. The function returns a 2 letter iso code for the identified language.
In [ ]:
import langid
def get_language_from_text(text):
lang, prob = langid.classify(text)
return lang
In [ ]:
from geolite2 import geolite2
import socket
from urllib.parse import urlparse
def get_country_from_url(url):
try:
hostname = urlparse(url)[1]
ip = socket.gethostbyname(hostname)
result = geolite2.reader().get(ip)
country_iso_code = result['country']['iso_code']
except:
country_iso_code = "unknown"
finally:
geolite2.close()
return country_iso_code
with spark structured streaming we connect to a kafka topic and continuously append each message to a dataframe. Each kafka record consists of a key, a value, and a timestamp. the value contains our satori message in the JSON format. For further processing we apply the jsonSchema to the value field and create a new streaming dataframe where we keep the timestamp from the kafka record together with the satori structured message.
In [ ]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Since we know the data format already,
# let's define the schema to speed up processing
# (no need for Spark to infer schema)
jsonSchema = StructType([ StructField("publishedTimestamp", TimestampType(), True),
StructField("url", StringType(), True),
StructField("feedURL", StringType(), True),
StructField("title", StringType(), True),
StructField("description", StringType(), True)
])
# define 'parsed' as a structured stream from the
# kafka records in the topic 'world-feed'.
parsed = (
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "world-feed")
.load()
# keep timestamp and the json from the field value in a new field 'parsed_value'
.select(col("timestamp"),from_json(col("value").cast("string"),jsonSchema).alias("parsed_value"))
)
# print the current schema
parsed.printSchema()
# get rid of the struct 'parsed_value' and keep only the fields beneath.
worldfeed = parsed.select("timestamp","parsed_value.*")
# print the schema which is used for further processing
worldfeed.printSchema()
# show that the dataframe is streaming
worldfeed.isStreaming
the dataframe function withColumn() allows us to add a new column to a dataframe by applying a function to existing columns. For this, an existing function has to be converted to a UserDefinedFunction. This function can than be applied to a distributed dataframe.
The get_country_from_url() functions is too big to be serialized. It is therefore loaded from a library. Be aware that any library used in such a function has to be made available on the worker nodes executing the job.
In [ ]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
# as get_country_from_url can not be serialized it is loaded from a library.
# Therefore worldfeed.location_lookup has to be installed on all worker nodes.
from worldfeed.location_lookup import get_country_from_url
# transform the helpers to UDFs.
language_classify_udf = udf(get_language_from_text, StringType())
get_country_from_url_udf = udf(get_country_from_url, StringType())
# create a new dataframe with the additional columns 'language' and 'server_country'
enriched_df = (
worldfeed
.withColumn('language', language_classify_udf(worldfeed['description']))
.withColumn('server_country', get_country_from_url_udf(worldfeed['feedURL']))
)
# print the new schema
enriched_df.printSchema()
In [ ]:
spark.conf.set("spark.sql.shuffle.partitions", "2") # keep the size of shuffles small
query = (
enriched_df
# watermarking allows to update passed timeframes with late arrivals
# after 30 minutes the timeframe is frozen and can be removed from memory
.withWatermark("timestamp", "30 minutes")
# aggregation happens by server_country, the language and a 15 minute timeframe
.groupBy(
enriched_df.server_country,
enriched_df.language,
window(enriched_df.timestamp, "1 minutes"))
# the messages are counted
.count()
# the query result is written to a kafka topic,
#therefore the output has to consist of a 'key' and a 'value'
# key:
.select(to_json(struct("server_country", "window")).alias("key"),
# value: (json format the mentioned fields)
to_json(struct("window.start","window.end","server_country", "language", "count")).alias("value"))
.writeStream
# only write every 5 seconds
.trigger(processingTime='5 seconds')
# output to console for debug
# .format("console")
# output to kafka
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "enriched-feed")
.option("checkpointLocation", "./checkpoints")
# End kafka related output
# only write the rows that where updated since the last update
.outputMode("update")
.queryName("worldfeed")
.start()
)
In [ ]:
channel = "big-rss"
endpoint = "wss://open-data.api.satori.com"
appkey = "8e7f2BeFE8C8c6e8A4A41976a2dE5Fa9"
topic = "world-feed"
satori2kafka(channel, endpoint, appkey, topic)
# has to be manually cancelled
make sure to start the node.js app in visualize. Command:
node server/server.js
then point your browser to http://localhost:3001
This node app is using browserify to compile the JS code for the browser. If changes in the JS- and JSX-files are made, the code needs to get compiled again.
To compile the JS-files automatically every time a change is made to a JS- or JSX-file, start gulp. Command:
gulp
In [ ]:
spark.streams.active
query.stop()