Visualizing origins and language of RSS feeds

The idea is to consume the big rss feed as a stream from Satori, enrich each message with Spark Structured Streaming and visualize it in a browser app.

The Satori stream gathers more than 6.5 million rss feeds. The stream is consumed as websocket with a python function and written into the kafka topic world-feed. Spark streaming is then used to read from the topic into a streaming dataframe which is enhanced with 2 informations:

  1. Country of the server the message is comming from
  2. The language of the message

The stream aggregates the message count by a 15 minute time window, the country_code and the language. All updates are written every 5 seconds into a second kafka topic enriched-feed.

The visualisation is done with a small node.js app consuming the kafka topic and sending the message via websockets to all connected browsers where a reactJS app is handling the update of charts.

System Architecture

Implementation

satori2Kafka

a generic function consuming a satori stream identified by channel, endpoint and appkey and writing each message to a kafka topic. The message is in the JSON format. The stream is slowed down for development purpose.


In [ ]:
from __future__ import print_function
import socket
import json
import sys
import threading
import time
from satori.rtm.client import make_client, SubscriptionMode
from kafka import KafkaProducer

def satori2kafka(channel,endpoint, appkey, topic, delay=1):
    # Kafka
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
    
    with make_client(
            endpoint=endpoint, appkey=appkey) as client:
        print('Connected!')
        mailbox = []
        got_message_event = threading.Event()

        class SubscriptionObserver(object):
            def on_subscription_data(self, data):
                for message in data['messages']:
                    mailbox.append(message)
                    got_message_event.set()

        subscription_observer = SubscriptionObserver()
        client.subscribe(
            channel,
            SubscriptionMode.SIMPLE,
            subscription_observer)

        if not got_message_event.wait(10):
            print("Timeout while waiting for a message")
            sys.exit(1)

        while True:
            for message in mailbox:
                msg = json.dumps(message, ensure_ascii=False)
                producer.send(topic, msg.encode())
                # do not send the messages to fast for development
                time.sleep(delay)

helper functions for data enrichment

Language Identification

simple usage of the python langid library to identify in which language the message is written. The function returns a 2 letter iso code for the identified language.


In [ ]:
import langid

def get_language_from_text(text):
    lang, prob = langid.classify(text)
    return lang

Country Identification

with the extracted hostname from the url the ip address can be fetched using a nameserver lookup on the local machine. The geoip2 library allows a lookup with the IP address in the Maxmind geolocation database. The iso country code is returned by the function.


In [ ]:
from geolite2 import geolite2
import socket
from urllib.parse import urlparse


def get_country_from_url(url):
    try:
        hostname = urlparse(url)[1]
        ip = socket.gethostbyname(hostname)
        result = geolite2.reader().get(ip)
        country_iso_code = result['country']['iso_code']
    except:
        country_iso_code = "unknown"
    finally:
        geolite2.close()
    return country_iso_code

Spark Structured Streaming

Read Stream

with spark structured streaming we connect to a kafka topic and continuously append each message to a dataframe. Each kafka record consists of a key, a value, and a timestamp. the value contains our satori message in the JSON format. For further processing we apply the jsonSchema to the value field and create a new streaming dataframe where we keep the timestamp from the kafka record together with the satori structured message.


In [ ]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Since we know the data format already, 
# let's define the schema to speed up processing 
# (no need for Spark to infer schema)
jsonSchema = StructType([ StructField("publishedTimestamp", TimestampType(), True), 
                          StructField("url", StringType(), True),
                          StructField("feedURL", StringType(), True),
                          StructField("title", StringType(), True),
                          StructField("description", StringType(), True)
                        ])
# define 'parsed' as a structured stream from the 
# kafka records in the topic 'world-feed'.
parsed = (
  spark
    .readStream                       
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "world-feed")
    .load()
    # keep timestamp and the json from the field value in a new field 'parsed_value'
    .select(col("timestamp"),from_json(col("value").cast("string"),jsonSchema).alias("parsed_value"))
)
# print the current schema 
parsed.printSchema()

# get rid of the struct 'parsed_value' and keep only the fields beneath.
worldfeed = parsed.select("timestamp","parsed_value.*")

# print the schema which is used for further processing
worldfeed.printSchema()

# show that the dataframe is streaming
worldfeed.isStreaming

data enrichment

the dataframe function withColumn() allows us to add a new column to a dataframe by applying a function to existing columns. For this, an existing function has to be converted to a UserDefinedFunction. This function can than be applied to a distributed dataframe.

The get_country_from_url() functions is too big to be serialized. It is therefore loaded from a library. Be aware that any library used in such a function has to be made available on the worker nodes executing the job.


In [ ]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

# as get_country_from_url can not be serialized it is loaded from a library. 
# Therefore worldfeed.location_lookup has to be installed on all worker nodes.

from worldfeed.location_lookup import get_country_from_url

# transform the helpers to UDFs.
language_classify_udf = udf(get_language_from_text, StringType())
get_country_from_url_udf = udf(get_country_from_url, StringType())

# create a new dataframe with the additional columns 'language' and 'server_country'
enriched_df = (
  worldfeed
    .withColumn('language', language_classify_udf(worldfeed['description']))
    .withColumn('server_country', get_country_from_url_udf(worldfeed['feedURL']))
)

# print the new schema
enriched_df.printSchema()

start the streaming query

based on the enriched_df dataframe, a query is written which aggregates the data, reformats it into a kafka readable format and writes it to a kafka topic.


In [ ]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  enriched_df
    # watermarking allows to update passed timeframes with late arrivals
    # after 30 minutes the timeframe is frozen and can be removed from memory
    .withWatermark("timestamp", "30 minutes")
    
    # aggregation happens by server_country, the language and a 15 minute timeframe
    .groupBy(
      enriched_df.server_country,
      enriched_df.language, 
      window(enriched_df.timestamp, "1 minutes"))
    # the messages are counted
    .count()
    
    # the query result is written to a kafka topic, 
    #therefore the output has to consist of a 'key' and a 'value'
    # key: 
    .select(to_json(struct("server_country", "window")).alias("key"),
    # value: (json format the mentioned fields)
        to_json(struct("window.start","window.end","server_country", "language", "count")).alias("value"))
    .writeStream
    # only write every 5 seconds
    .trigger(processingTime='5 seconds')

    # output to console for debug
    # .format("console")

    # output to kafka 
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "enriched-feed")
    .option("checkpointLocation", "./checkpoints")
    # End kafka related output
    # only write the rows that where updated since the last update
    .outputMode("update") 
    .queryName("worldfeed")
    .start()
)

start the satori2kafka stream


In [ ]:
channel = "big-rss"
endpoint = "wss://open-data.api.satori.com"
appkey = "8e7f2BeFE8C8c6e8A4A41976a2dE5Fa9"
topic = "world-feed"

satori2kafka(channel, endpoint, appkey, topic)
# has to be manually cancelled

start the node.js app

make sure to start the node.js app in visualize. Command:

node server/server.js

then point your browser to http://localhost:3001

Development notes

This node app is using browserify to compile the JS code for the browser. If changes in the JS- and JSX-files are made, the code needs to get compiled again.

To compile the JS-files automatically every time a change is made to a JS- or JSX-file, start gulp. Command:

gulp

helpers

check if there are any active streaming queries. query.stop() terminates the query. Only 1 query of the same instance can run simultaneously.


In [ ]:
spark.streams.active
query.stop()