In [1]:
import pyspark
from pyspark import SparkContext, SparkConf
In [4]:
conf = SparkConf().setMaster("local[*]")
sc = SparkContext(conf=conf)
In [3]:
words = ["Hello", "this", "is", "an", "example"]
# Collect
print(sc.parallelize(words).map(lambda w: w).collect())
# Count
print(sc.parallelize(words).map(lambda w: w).count())
# flatmap
print(sc.parallelize(words).flatMap(lambda w: w).collect())
# appending the words into a sentence
def form_sentence(words):
result = ""
for entry in words:
result += entry
result += " "
return result
print(sc.parallelize(words).map(form_sentence).collect())
In [ ]:
"""
Basic Statistics impemented here are as follows:
1. Correlation - linear relationship(statistical association) of two variables
This attribute is mainly used to measure the type of relationship
between two variables. Either positive or negative.
2. Hypothesis Testing -
3. Summarizer -
"""
In [7]:
# 8888 port
from pyspark.streaming import StreamingContext
def avg_sum(new_values, last_sum):
if last_sum == None:
last_sum = 0
return sum(new_values, last_sum)
# sc = self.spark_context
# ss = SQLContext.getOrCreate(sc).sparkSession
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
lines = ssc.socketTextStream("localhost", 8888)
# (1, number)
numbers = lines.flatMap(lambda line: line.split(",")).map(lambda number: (1, float(number)))
# (n_elements, elements_sum)
total_sum = numbers.reduce(lambda tup1, tup2: (tup1[0]+tup2[0], tup1[1]+tup2[1]))
# Compute average: sum/n_elements
avgRDD = total_sum.map(lambda tup: tup[1]/tup[0])
total_sum.pprint()
avgRDD.pprint()
ssc.start() # Start the computation
ssc.awaitTerminationOrTimeout(120) # Wait for the computation to terminate
In [ ]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SQLContext
import sys
import requests
# create spark configuration
conf = SparkConf()
conf.setAppName("TwitterStreamApp").setMaster("local[*]")
# create spark context with the above configuration
sc = SparkContext(conf=conf)
# sc.setLogLevel("ERROR")
# create the Streaming Context from the above spark context with interval size 2 seconds
ssc = StreamingContext(sc, 5)
# setting a checkpoint to allow RDD recovery
ssc.checkpoint("checkpoint_TwitterApp")
# read data from port 9009
dataStream = ssc.socketTextStream("localhost", 5557)
if dataStream:
# split each tweet into words
words = dataStream.flatMap(lambda line: line.split(" "))
# filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1)
hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1))
# adding the count of each hashtag to its last count
hashtags.pprint(10)
# do processing for each RDD generated in each interval
# start the streaming computation
ssc.start()
# wait for the streaming to finish
ssc.awaitTermination()
In [ ]:
# SparkDemo.py
# This code is copyright (c) 2017 by Laurent Weichberger.
# Authors: Laurent Weichberger, from Hortonworks and,
# from RAND Corp: James Liu, Russell Hanson, Scot Hickey,
# Angel Martinez, Asa Wilks, & Sascha Ishikawa
# This script does use Apache Spark. Enjoy...
# This code was designed to be run as: spark-submit SparkDemo.py
import time
import json
import socket
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Our filter function:
def filter_tweets(tweet):
json_tweet = json.loads(tweet)
if 'lang' in json_tweet: # When the lang key was not present it caused issues
if json_tweet['lang'] == 'fi':
return True # filter() requires a Boolean value
return False
# SparkContext(“local[1]”) would not work with Streaming bc 2 threads are required
sc = SparkContext("local[*]", "Twitter Demo")
ssc = StreamingContext(sc, 1) #10 is the batch interval in seconds
IP = socket.gethostname()
Port = 5558
lines = ssc.socketTextStream(IP, Port)
# When your DStream in Spark receives data, it creates an RDD every batch interval.
# We use coalesce(1) to be sure that the final filtered RDD has only one partition,
# so that we have only one resulting part-00000 file in the directory.
# The method saveAsTextFile() should really be re-named saveInDirectory(),
# because that is the name of the directory in which the final part-00000 file is saved.
# We use time.time() to make sure there is always a newly created directory, otherwise
# it will throw an Exception.
# filter( filter_tweets )
# lines.foreachRDD( lambda rdd: rdd.collect())# coalesce(1).saveAsTextFile("./tweets/%f" % time.time())
lines.pprint()
# You must start the Spark StreamingContext, and await process termination…
ssc.start()
ssc.awaitTermination()
In [5]:
import socket
print(socket.gethostname())
In [ ]:
lines
In [ ]: