PySpark Examples

Basic Examples

In [1]:
import pyspark
from pyspark import SparkContext, SparkConf

In [4]:
conf = SparkConf().setMaster("local[*]")
sc = SparkContext(conf=conf)

In [3]:
words = ["Hello", "this", "is", "an", "example"]
# Collect
print(sc.parallelize(words).map(lambda w: w).collect())
# Count
print(sc.parallelize(words).map(lambda w: w).count())
# flatmap
print(sc.parallelize(words).flatMap(lambda w: w).collect())
# appending the words into a sentence
def form_sentence(words):
    result = ""
    for entry in words:
        result += entry
        result += " "
    return result

['Hello', 'this', 'is', 'an', 'example']
['H', 'e', 'l', 'l', 'o', 't', 'h', 'i', 's', 'i', 's', 'a', 'n', 'e', 'x', 'a', 'm', 'p', 'l', 'e']
['H e l l o ', 't h i s ', 'i s ', 'a n ', 'e x a m p l e ']

ML Examples

Basic Statistics

In [ ]:
Basic Statistics impemented here are as follows:
1. Correlation - linear relationship(statistical association) of two variables
        This attribute is mainly used to measure the type of relationship 
        between two variables. Either positive or negative. 
2. Hypothesis Testing - 
3. Summarizer - 

Streaming Receiver

In [7]:
# 8888 port 
from pyspark.streaming import StreamingContext

def avg_sum(new_values, last_sum):
    if last_sum == None:
        last_sum = 0
    return sum(new_values, last_sum)

# sc = self.spark_context
# ss = SQLContext.getOrCreate(sc).sparkSession
ssc = StreamingContext(sc, 1)

lines = ssc.socketTextStream("localhost", 8888)
# (1, number)
numbers = lines.flatMap(lambda line: line.split(",")).map(lambda number: (1, float(number)))
# (n_elements, elements_sum)
total_sum = numbers.reduce(lambda tup1, tup2: (tup1[0]+tup2[0], tup1[1]+tup2[1]))
# Compute average: sum/n_elements
avgRDD = tup: tup[1]/tup[0])


ssc.start()  # Start the computation
ssc.awaitTerminationOrTimeout(120)  # Wait for the computation to terminate

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-7-f20d03da0036> in <module>
     23 avgRDD.pprint()
---> 25 ssc.start()  # Start the computation
     26 ssc.awaitTerminationOrTimeout(120)  # Wait for the computation to terminate

C:\Spark\spark-2.3.1-bin-hadoop2.7\python\pyspark\streaming\ in start(self)
    194         Start the execution of the streams.
    195         """
--> 196         self._jssc.start()
    197         StreamingContext._activeContext = self

C:\Spark\spark-2.3.1-bin-hadoop2.7\python\lib\\py4j\ in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id,
   1259         for temp_arg in temp_args:

C:\Spark\spark-2.3.1-bin-hadoop2.7\python\lib\\py4j\ in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o382693.start.
: java.lang.IllegalStateException: Only one StreamingContext may be started in this JVM. Currently running StreamingContext was started
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
java.lang.reflect.Method.invoke(Unknown Source)
py4j.commands.CallCommand.execute( Source)
	at org.apache.spark.streaming.StreamingContext$.org$apache$spark$streaming$StreamingContext$$assertNoOtherContextIsActive(StreamingContext.scala:738)
	at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:571)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(
	at py4j.reflection.ReflectionEngine.invoke(
	at py4j.Gateway.invoke(
	at py4j.commands.AbstractCommand.invokeMethod(
	at py4j.commands.CallCommand.execute(
	at Source)

In [ ]:
import pyspark
from pyspark import SparkContext, SparkConf

from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SQLContext
import sys
import requests

# create spark configuration
conf = SparkConf()
# create spark context with the above configuration
sc = SparkContext(conf=conf)
# sc.setLogLevel("ERROR")
# create the Streaming Context from the above spark context with interval size 2 seconds
ssc = StreamingContext(sc, 5)
# setting a checkpoint to allow RDD recovery
# read data from port 9009
dataStream = ssc.socketTextStream("localhost", 5557)

if dataStream:
    # split each tweet into words
    words = dataStream.flatMap(lambda line: line.split(" "))

    # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1)
    hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1))
    # adding the count of each hashtag to its last count
    # do processing for each RDD generated in each interval

# start the streaming computation
# wait for the streaming to finish

In [ ]:
# This code is copyright (c) 2017 by Laurent Weichberger.
# Authors: Laurent Weichberger, from Hortonworks and,
# from RAND Corp: James Liu, Russell Hanson, Scot Hickey,
# Angel Martinez, Asa Wilks, & Sascha Ishikawa
# This script does use Apache Spark. Enjoy...
# This code was designed to be run as: spark-submit
import time
import json
import socket
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Our filter function:
def filter_tweets(tweet):
    json_tweet = json.loads(tweet)
    if 'lang' in json_tweet: # When the lang key was not present it caused issues
        if json_tweet['lang'] == 'fi':
            return True # filter() requires a Boolean value
    return False
# SparkContext(“local[1]”) would not work with Streaming bc 2 threads are required
sc = SparkContext("local[*]", "Twitter Demo")
ssc = StreamingContext(sc, 1) #10 is the batch interval in seconds
IP = socket.gethostname()
Port = 5558
lines = ssc.socketTextStream(IP, Port)
# When your DStream in Spark receives data, it creates an RDD every batch interval.
# We use coalesce(1) to be sure that the final filtered RDD has only one partition,
# so that we have only one resulting part-00000 file in the directory.
# The method saveAsTextFile() should really be re-named saveInDirectory(),
# because that is the name of the directory in which the final part-00000 file is saved.
# We use time.time() to make sure there is always a newly created directory, otherwise
# it will throw an Exception.
# filter( filter_tweets )
# lines.foreachRDD( lambda rdd: rdd.collect())# coalesce(1).saveAsTextFile("./tweets/%f" % time.time()) 
# You must start the Spark StreamingContext, and await process termination…

In [5]:
import socket



