PySpark Examples

Basic Examples


In [1]:
import pyspark
from pyspark import SparkContext, SparkConf

In [4]:
conf = SparkConf().setMaster("local[*]")
sc = SparkContext(conf=conf)

In [3]:
words = ["Hello", "this", "is", "an", "example"]
# Collect
print(sc.parallelize(words).map(lambda w: w).collect())
# Count
print(sc.parallelize(words).map(lambda w: w).count())
# flatmap
print(sc.parallelize(words).flatMap(lambda w: w).collect())
# appending the words into a sentence
def form_sentence(words):
    result = ""
    for entry in words:
        result += entry
        result += " "
    return result
print(sc.parallelize(words).map(form_sentence).collect())


['Hello', 'this', 'is', 'an', 'example']
5
['H', 'e', 'l', 'l', 'o', 't', 'h', 'i', 's', 'i', 's', 'a', 'n', 'e', 'x', 'a', 'm', 'p', 'l', 'e']
['H e l l o ', 't h i s ', 'i s ', 'a n ', 'e x a m p l e ']

ML Examples

Basic Statistics


In [ ]:
"""
Basic Statistics impemented here are as follows:
1. Correlation - linear relationship(statistical association) of two variables
        This attribute is mainly used to measure the type of relationship 
        between two variables. Either positive or negative. 
2. Hypothesis Testing - 
3. Summarizer - 
"""

Streaming Receiver


In [7]:
# 8888 port 
from pyspark.streaming import StreamingContext

def avg_sum(new_values, last_sum):
    if last_sum == None:
        last_sum = 0
    return sum(new_values, last_sum)

# sc = self.spark_context
# ss = SQLContext.getOrCreate(sc).sparkSession
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")

lines = ssc.socketTextStream("localhost", 8888)
# (1, number)
numbers = lines.flatMap(lambda line: line.split(",")).map(lambda number: (1, float(number)))
# (n_elements, elements_sum)
total_sum = numbers.reduce(lambda tup1, tup2: (tup1[0]+tup2[0], tup1[1]+tup2[1]))
# Compute average: sum/n_elements
avgRDD = total_sum.map(lambda tup: tup[1]/tup[0])

total_sum.pprint()
avgRDD.pprint()

ssc.start()  # Start the computation
ssc.awaitTerminationOrTimeout(120)  # Wait for the computation to terminate


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-7-f20d03da0036> in <module>
     23 avgRDD.pprint()
     24 
---> 25 ssc.start()  # Start the computation
     26 ssc.awaitTerminationOrTimeout(120)  # Wait for the computation to terminate

C:\Spark\spark-2.3.1-bin-hadoop2.7\python\pyspark\streaming\context.py in start(self)
    194         Start the execution of the streams.
    195         """
--> 196         self._jssc.start()
    197         StreamingContext._activeContext = self
    198 

C:\Spark\spark-2.3.1-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

C:\Spark\spark-2.3.1-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o382693.start.
: java.lang.IllegalStateException: Only one StreamingContext may be started in this JVM. Currently running StreamingContext was started atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
java.lang.reflect.Method.invoke(Unknown Source)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:282)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:238)
java.lang.Thread.run(Unknown Source)
	at org.apache.spark.streaming.StreamingContext$.org$apache$spark$streaming$StreamingContext$$assertNoOtherContextIsActive(StreamingContext.scala:738)
	at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:571)
	at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)

In [ ]:
import pyspark
from pyspark import SparkContext, SparkConf

from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SQLContext
import sys
import requests


# create spark configuration
conf = SparkConf()
conf.setAppName("TwitterStreamApp").setMaster("local[*]")
# create spark context with the above configuration
sc = SparkContext(conf=conf)
# sc.setLogLevel("ERROR")
# create the Streaming Context from the above spark context with interval size 2 seconds
ssc = StreamingContext(sc, 5)
# setting a checkpoint to allow RDD recovery
ssc.checkpoint("checkpoint_TwitterApp")
# read data from port 9009
dataStream = ssc.socketTextStream("localhost", 5557)

if dataStream:
    # split each tweet into words
    words = dataStream.flatMap(lambda line: line.split(" "))

    # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1)
    hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1))
    # adding the count of each hashtag to its last count
    hashtags.pprint(10)
    # do processing for each RDD generated in each interval

# start the streaming computation
ssc.start()
# wait for the streaming to finish
ssc.awaitTermination()

In [ ]:
# SparkDemo.py
# This code is copyright (c) 2017 by Laurent Weichberger.
# Authors: Laurent Weichberger, from Hortonworks and,
# from RAND Corp: James Liu, Russell Hanson, Scot Hickey,
# Angel Martinez, Asa Wilks, & Sascha Ishikawa
# This script does use Apache Spark. Enjoy...
# This code was designed to be run as: spark-submit SparkDemo.py
 
import time
import json
import socket
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
 
# Our filter function:
def filter_tweets(tweet):
    json_tweet = json.loads(tweet)
    if 'lang' in json_tweet: # When the lang key was not present it caused issues
        if json_tweet['lang'] == 'fi':
            return True # filter() requires a Boolean value
    return False
 
# SparkContext(“local[1]”) would not work with Streaming bc 2 threads are required
sc = SparkContext("local[*]", "Twitter Demo")
ssc = StreamingContext(sc, 1) #10 is the batch interval in seconds
IP = socket.gethostname()
Port = 5558
lines = ssc.socketTextStream(IP, Port)
 
# When your DStream in Spark receives data, it creates an RDD every batch interval.
# We use coalesce(1) to be sure that the final filtered RDD has only one partition,
# so that we have only one resulting part-00000 file in the directory.
# The method saveAsTextFile() should really be re-named saveInDirectory(),
# because that is the name of the directory in which the final part-00000 file is saved.
# We use time.time() to make sure there is always a newly created directory, otherwise
# it will throw an Exception.
# filter( filter_tweets )
# lines.foreachRDD( lambda rdd: rdd.collect())# coalesce(1).saveAsTextFile("./tweets/%f" % time.time()) 
lines.pprint()
# You must start the Spark StreamingContext, and await process termination…
ssc.start()
ssc.awaitTermination()


-------------------------------------------
Time: 2019-02-12 19:17:49
-------------------------------------------

-------------------------------------------
Time: 2019-02-12 19:17:50
-------------------------------------------

-------------------------------------------
Time: 2019-02-12 19:17:51
-------------------------------------------

-------------------------------------------
Time: 2019-02-12 19:17:52
-------------------------------------------

-------------------------------------------
Time: 2019-02-12 19:17:53
-------------------------------------------

-------------------------------------------
Time: 2019-02-12 19:17:54
-------------------------------------------

-------------------------------------------
Time: 2019-02-12 19:17:55
-------------------------------------------

-------------------------------------------
Time: 2019-02-12 19:17:56
-------------------------------------------

-------------------------------------------
Time: 2019-02-12 19:17:57
-------------------------------------------

-------------------------------------------
Time: 2019-02-12 19:17:58
-------------------------------------------

-------------------------------------------
Time: 2019-02-12 19:17:59
-------------------------------------------

-------------------------------------------
Time: 2019-02-12 19:18:00
-------------------------------------------

-------------------------------------------
Time: 2019-02-12 19:18:01
-------------------------------------------

-------------------------------------------
Time: 2019-02-12 19:18:02
-------------------------------------------

-------------------------------------------
Time: 2019-02-12 19:18:03
-------------------------------------------

-------------------------------------------
Time: 2019-02-12 19:18:04
-------------------------------------------

-------------------------------------------
Time: 2019-02-12 19:18:05
-------------------------------------------


In [5]:
import socket

print(socket.gethostname())


Kris

In [ ]:
lines

In [ ]: