In [4]:
import findspark 
from pyspark import SparkContext
from pyspark.sql import SparkSession
findspark.init() 
    
from pyspark.sql import *
# from pyspark import SparkContext

In [5]:
sc = SparkContext('local', 'HandsOn PySpark')

In [6]:
visitors = [10,3,35,25,41,9,29]

In [7]:
df_visitors = sc.parallelize(visitors)
# df_visitors_yearly = df_visitors.map(lambda x:  x * 365).collect()
# print(df_visitors_yearly)

In [8]:
df_visitors_yearly = df_visitors.map(lambda x:  x * 365).collect()

In [10]:
type(df_visitors)
print(df_visitors_yearly)


[3650, 1095, 12775, 9125, 14965, 3285, 10585]

In [11]:
# !pip install --upgrade pyspark

In [13]:
# dir(df_visitors)

In [ ]:
# !cp /Users/director/Downloads/spark_rd.md .

In [14]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL My sample") \
    .master("local[*]") \
    .getOrCreate()

In [15]:
text_file = spark.read.text("spark_rd.md")

In [16]:
text_file.count()


Out[16]:
32

In [17]:
text_file.first()


Out[17]:
Row(value='# Apache Spark')

In [18]:
lines_with_spark = text_file.filter(text_file.value.contains("Spark"))

In [19]:
lines_with_spark.show(3)


+--------------------+
|               value|
+--------------------+
|      # Apache Spark|
|Spark is a unifie...|
|rich set of highe...|
+--------------------+
only showing top 3 rows


In [20]:
text_file.filter(text_file.value.contains("Spark")).count()


Out[20]:
10

In [21]:
data_src = "http://archive.ics.uci.edu/ml/machine-learning-databases/kddcup99-mld/kddcup.data.gz"
import urllib.request

In [22]:
# f = urllib.request.urlretrieve(data_src, "kddcup.data.gz")

In [23]:
sc


Out[23]:

SparkContext

Spark UI

Version
v2.4.4
Master
local
AppName
HandsOn PySpark

In [24]:
raw_cup_data = sc.textFile("./kddcup.data.gz")

In [25]:
contains_normal = raw_cup_data.filter(lambda line: "normal." in line)

In [26]:
contains_normal.count()


Out[26]:
972781

In [27]:
split_file = raw_cup_data.map(lambda line: line.split(","))

In [46]:
# split_file.take(2)

In [29]:
from time import time

In [30]:
sampled = raw_cup_data.sample(False, 0.1, 42) # , 10%, seed: 42

In [31]:
contains_norma_sample = sampled.map(lambda x: x.split(",")).filter(lambda x: "normal"in x)

In [32]:
t0 = time()
num_sampled = contains_norma_sample.count()
duration = time() - t0
print(duration)


18.889207124710083

In [33]:
contains_norma = raw_cup_data.map(lambda x: x.split(",")).filter(lambda x: "normal"in x)

In [34]:
t1 = time()
full_count = contains_norma.count()
duration = time() - t1
print(duration)


36.53693389892578

In [35]:
data_in_memory = raw_cup_data.takeSample(False, 20, 42)
contains_normal_py = [line.split(",") for line in data_in_memory if "normal" in line]

In [36]:
len(contains_normal_py)


Out[36]:
3

In [37]:
# Lines without Normal
normal_sample = sampled.filter(lambda line: "normal" in line)

In [38]:
non_normal_sample = sampled.subtract(normal_sample)

In [39]:
sampled.count()


Out[39]:
490705

In [40]:
normal_sample.count()


Out[40]:
97404

In [41]:
non_normal_sample.count()


Out[41]:
393301

In [42]:
# AGGREGATIONS
csv = raw_cup_data.map(lambda x: x.split(","))
normal_data = csv.filter(lambda x: x[41] == "normal.")
# normal_data = csv.filter(lambda x: "normal" in x)
duration = normal_data.map(lambda x: int(x[0]))

In [43]:
duration.take(3)


Out[43]:
[0, 0, 0]

In [44]:
total_duration = duration.reduce(lambda x,y: x+y)
total_duration


Out[44]:
211895753

In [45]:
avg_value = total_duration/(normal_data.count())
avg_value


Out[45]:
217.82472416710442

FASTER AVERAGE COMPUTATIONS WITH AGGREGATE


In [47]:
duration_count = duration.aggregate(
    (0,0),
    (lambda db, new_value: (db[0] + new_value, db[1] + 1)),
    (lambda db1, db2: (db1[0] + db2[0],db1[1] + db2[1]))
)

duration_count[0]/duration_count[1]


Out[47]:
217.82472416710442

In [48]:
first_4 = [1,2,3,4]
seqOp = (lambda x,y: (x[0] + y, x[1] + 1))
combOp = (lambda x,y: (x[0] + y[0], x[1] + y[1]))

In [49]:
res_4 = sc.parallelize(first_4).aggregate((0,0), seqOp, combOp)
res_4


Out[49]:
(10, 4)

In [58]:
fives = [2,4,6,8,10]
res_5 = sc.parallelize(fives).aggregate((0,0), seqOp, combOp) # (0,0) - init value, seqOp - functin=on
res_5


Out[58]:
(30, 5)

PIVOT TABLING WITH KEY-VALUE PAIRED DATA POINTS

Grouping data by key-values


In [60]:
kv = csv.map(lambda x: (x[41], x))
kv.take(1)


Out[60]:
[('normal.',
  ['0',
   'tcp',
   'http',
   'SF',
   '215',
   '45076',
   '0',
   '0',
   '0',
   '0',
   '0',
   '1',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '1',
   '1',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '1.00',
   '0.00',
   '0.00',
   '0',
   '0',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   'normal.'])]

In [61]:
kv_duration = csv.map(lambda x: (x[41], float(x[0]))).reduceByKey(lambda x,y: x + y)
kv_duration.collect()


Out[61]:
[('normal.', 211895753.0),
 ('buffer_overflow.', 2751.0),
 ('loadmodule.', 326.0),
 ('perl.', 124.0),
 ('neptune.', 2.0),
 ('smurf.', 0.0),
 ('guess_passwd.', 144.0),
 ('pod.', 0.0),
 ('teardrop.', 0.0),
 ('portsweep.', 24257982.0),
 ('ipsweep.', 13049.0),
 ('land.', 0.0),
 ('ftp_write.', 259.0),
 ('back.', 284.0),
 ('imap.', 72.0),
 ('satan.', 500.0),
 ('phf.', 18.0),
 ('nmap.', 0.0),
 ('multihop.', 1288.0),
 ('warezmaster.', 301.0),
 ('warezclient.', 627563.0),
 ('spy.', 636.0),
 ('rootkit.', 1008.0)]

In [62]:
kv.countByKey()


Out[62]:
defaultdict(int,
            {'normal.': 972781,
             'buffer_overflow.': 30,
             'loadmodule.': 9,
             'perl.': 3,
             'neptune.': 1072017,
             'smurf.': 2807886,
             'guess_passwd.': 53,
             'pod.': 264,
             'teardrop.': 979,
             'portsweep.': 10413,
             'ipsweep.': 12481,
             'land.': 21,
             'ftp_write.': 8,
             'back.': 2203,
             'imap.': 12,
             'satan.': 15892,
             'phf.': 4,
             'nmap.': 2316,
             'multihop.': 7,
             'warezmaster.': 20,
             'warezclient.': 1020,
             'spy.': 2,
             'rootkit.': 10})

Data Analysis with MLlib


In [63]:
# Computing Summary Statistics
from pyspark.mllib.linalg import Vectors

In [64]:
# rdd = sc.parallelize([Vectors.dense()])

In [65]:
from pyspark.mllib.stat import Statistics

In [66]:
duration = csv.map(lambda x: [int(x[0])])

In [67]:
summary = Statistics.colStats(duration)
summary.mean()[0]


Out[67]:
48.34243046395863

In [68]:
from math import sqrt
sqrt(summary.variance()[0]) #std dev


Out[68]:
723.3298112546713

In [69]:
summary.max()


Out[69]:
array([58329.])

In [70]:
summary.min()


Out[70]:
array([0.])

Using Pearson and Spearman to discover correlation


In [72]:
metrics = csv.map(lambda x: [x[0], x[4], x[5]])
spearman_corr = Statistics.corr(metrics, method="spearman")


Out[72]:
array([[ 1.        ,  0.00890383,  0.30144701],
       [ 0.00890383,  1.        , -0.19510495],
       [ 0.30144701, -0.19510495,  1.        ]])

In [ ]:
spearman_corr

In [73]:
pearson_corr = Statistics.corr(metrics, method="pearson")
pearson_corr


Out[73]:
array([[1.00000000e+00, 4.12205545e-02, 2.03915936e-02],
       [4.12205545e-02, 1.00000000e+00, 2.39337570e-04],
       [2.03915936e-02, 2.39337570e-04, 1.00000000e+00]])

Testing your Hypotheses on large datasets


In [74]:
from pyspark.mllib.linalg import Vectors

In [75]:
visitors_freq = Vectors.dense(0.13,0.61,0.8,0.5,0.3)
print(Statistics.chiSqTest(visitors_freq))


Chi squared test summary:
method: pearson
degrees of freedom = 4 
statistic = 0.5852136752136753 
pValue = 0.9646925263439344 
No presumption against null hypothesis: observed follows the same distribution as expected..

SPARK SQL


In [76]:
from pyspark.sql import Row, SQLContext
rows = csv.map(lambda p: Row(duration=int(p[0]), protocol=p[1], service=p[2]))

In [77]:
sql_context = SQLContext(sc)

In [78]:
df = sql_context.createDataFrame(rows)
df.registerTempTable("rdd")

In [83]:
sql_context.sql("""SELECT duration FROM rdd WHERE protocol = 'tcp' AND duration > 2000""").show(5)


+--------+
|duration|
+--------+
|   10217|
|   11610|
|   13724|
|   10934|
|   12026|
+--------+
only showing top 5 rows

Using Spark DSL to build queries for structured data operations


In [82]:
df.select("duration").filter(df.duration > 2000).filter(df.protocol=="tcp").show(5)


+--------+
|duration|
+--------+
|   10217|
|   11610|
|   13724|
|   10934|
|   12026|
+--------+
only showing top 5 rows


In [ ]: