In [1]:
%%file consumer.py
import sys
import socket
from collections import Counter
HOST = sys.argv[1]
PORT = int(sys.argv[2])
s = socket.socket()
s.bind((HOST, PORT))
s.listen(4)
connection, address = s.accept()
c = Counter()
while True:
line = connection.recv(64)
words = line.split()
if words:
c.update(words)
print(c.most_common(5))
In [2]:
%%file client.py
import socket
import time
import sys
HOST = sys.argv[1]
PORT = int(sys.argv[2])
s = socket.socket()
s.connect((HOST, PORT))
while True:
for line in open('data/Ulysses.txt'):
s.sendall(str.encode(line))
time.sleep(1)
In [1]:
from pyspark import SparkContext
sc = SparkContext('local[*]')
In [5]:
lines = sc.textFile('data/Ulysses.txt')
counts = (lines.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.reduceByKey(lambda x,y: x+ y))
counts.takeOrdered(5, key=lambda x: -x[1])
Out[5]:
In [3]:
%%file file_consumer.py
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext('local[*]')
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 2)
lines = ssc.textFileStream(sys.argv[1])
counts = (lines.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.reduceByKey(lambda x,y: x+ y))
counts.pprint()
ssc.start()
ssc.awaitTermination()
In [4]:
%%file socket_consumer.py
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext('local[*]')
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 2)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = (lines.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.reduceByKey(lambda x,y: x+ y))
counts.pprint()
ssc.start()
ssc.awaitTermination()
In [6]:
%%file stateful_socket_consumer.py
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def updateFunc(new, last):
if last is None:
last = 0
return sum(new) + last
sc = SparkContext('local[*]')
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 2)
ssc.checkpoint("checkpoint")
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = (lines.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.updateStateByKey(updateFunc)
.transform(lambda x: x.sortByKey()))
counts.pprint()
ssc.start()
ssc.awaitTermination()
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]: