In [1]:
import findspark
findspark.init()

from pyspark import SparkContext
sc = SparkContext('local', 'Hands on PySpark')

In [2]:
# sc.stop()

In [3]:
visitors = [10, 3, 35, 25, 41, 9, 29]
df_visitors = sc.parallelize(visitors)
df_visitors_yearly = df_visitors.map(lambda x: x*365).collect()
print(df_visitors_yearly)


[3650, 1095, 12775, 9125, 14965, 3285, 10585]

In [4]:
df_visitors.take(3)


Out[4]:
[10, 3, 35]

In [5]:
df_visitors_yearly


Out[5]:
[3650, 1095, 12775, 9125, 14965, 3285, 10585]

In [6]:
import urllib.request

In [7]:
# try:
#     urllib.request.urlretrieve("https://archive.ics.uci.edu/ml/machine-learning-databases/kddcup99-mld/kddcup.data.gz"),"kddcup.data.gz"
# except Exception as e:
#     print("Could not download data", e)

In [8]:
# dd  =  urllib.request.urlretrieve("https://archive.ics.uci.edu/ml/machine-learning-databases/kddcup99-mld/kddcup.data.gz"),"kddcup.data.gz"

In [9]:
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/kddcup99-mld/kddcup.data.gz


--2020-01-17 18:26:18--  https://archive.ics.uci.edu/ml/machine-learning-databases/kddcup99-mld/kddcup.data.gz
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 18115902 (17M) [application/x-httpd-php]
Saving to: ‘kddcup.data.gz.1’

kddcup.data.gz.1    100%[===================>]  17.28M  1.56MB/s    in 17s     

2020-01-17 18:26:36 (1.01 MB/s) - ‘kddcup.data.gz.1’ saved [18115902/18115902]


In [10]:
sc


Out[10]:

SparkContext

Spark UI

Version
v2.4.4
Master
local
AppName
Hands on PySpark

In [11]:
raw_data = sc.textFile("kddcup.data.gz")

In [12]:
raw_data


Out[12]:
kddcup.data.gz MapPartitionsRDD[4] at textFile at NativeMethodAccessorImpl.java:0

In [13]:
!ls -laS


total 73880
-rw-r--r--   1 director  staff  18115902 26 Jun  2007 kddcup.data.gz
-rw-r--r--   1 director  staff  18115902 26 Jun  2007 kddcup.data.gz.1
-rw-r--r--   1 director  staff      3913 17 Jan 18:26 hands_on_pyspark.ipynb
-rw-r--r--   1 director  staff      1648  6 Feb  2019 GraphFrames_Pyspark.ipynb
-rw-r--r--@  1 director  staff      1304  6 Feb  2019 streams.py
-rw-r--r--@  1 director  staff       963  6 Feb  2019 streaming_kafka.py
drwxr-xr-x  20 director  staff       640 23 Dec 14:23 ..
drwxr-xr-x   9 director  staff       288 17 Jan 18:26 .
drwxr-xr-x   4 director  staff       128 17 Jan 17:40 .ipynb_checkpoints

In [14]:
split_file = raw_data.map(lambda line: line.split(","))

In [15]:
split_file.take(6)


Out[15]:
[['0',
  'tcp',
  'http',
  'SF',
  '215',
  '45076',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '1',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '0',
  '0',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.'],
 ['0',
  'tcp',
  'http',
  'SF',
  '162',
  '4528',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '2',
  '2',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '1',
  '1',
  '1.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.'],
 ['0',
  'tcp',
  'http',
  'SF',
  '236',
  '1228',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '1',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '2',
  '2',
  '1.00',
  '0.00',
  '0.50',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.'],
 ['0',
  'tcp',
  'http',
  'SF',
  '233',
  '2032',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '2',
  '2',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '3',
  '3',
  '1.00',
  '0.00',
  '0.33',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.'],
 ['0',
  'tcp',
  'http',
  'SF',
  '239',
  '486',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '3',
  '3',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '4',
  '4',
  '1.00',
  '0.00',
  '0.25',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.'],
 ['0',
  'tcp',
  'http',
  'SF',
  '238',
  '1282',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '4',
  '4',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '5',
  '5',
  '1.00',
  '0.00',
  '0.20',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.']]

In [16]:
# raw_data.collect()

In [17]:
from time import time

In [18]:
sampled = raw_data.sample(False, 0.1, 42)

In [19]:
contains_normal_sample = sampled.map(lambda x: x.split(",")).filter(lambda x: "normal" in x)

In [20]:
t0 = time()
num_sampled = contains_normal_sample.count()
duration = time() - t0

In [21]:
duration


Out[21]:
29.59354019165039

In [22]:
contains_normal = raw_data.map(lambda x: x.split(",")).filter(lambda x: "normal" in x)
t0 = time()
num_sampled = contains_normal.count()
duration = time() - t0

In [23]:
duration


Out[23]:
53.96395492553711

In [24]:
data_in_memory = raw_data.takeSample(False, 10, 42)

In [25]:
contains_normal_py = [line.split(",") for line in data_in_memory if "normal" in line]
len(contains_normal_py)


Out[25]:
1

In [26]:
normal_sample = sampled.filter(lambda line: "normal." in line)

In [27]:
non_normal_sample = sampled.subtract(normal_sample)

In [28]:
sampled.count()


Out[28]:
490705

In [29]:
normal_sample.count()


Out[29]:
97404

In [30]:
non_normal_sample.count()


Out[30]:
393301

In [31]:
feature_1 = sampled.map(lambda line: line.split(",")).map(lambda features: features[1]).distinct()

In [32]:
feature_2 = sampled.map(lambda line: line.split(",")).map(lambda features: features[2]).distinct()

In [33]:
f1 = feature_1.collect()
f2 = feature_2.collect()

In [34]:
f1


Out[34]:
['tcp', 'udp', 'icmp']

In [35]:
f2


Out[35]:
['http',
 'finger',
 'auth',
 'domain_u',
 'smtp',
 'ftp',
 'telnet',
 'eco_i',
 'ntp_u',
 'ecr_i',
 'other',
 'private',
 'pop_3',
 'ftp_data',
 'daytime',
 'remote_job',
 'supdup',
 'name',
 'ssh',
 'domain',
 'gopher',
 'time',
 'rje',
 'ctf',
 'mtp',
 'X11',
 'urp_i',
 'pm_dump',
 'IRC',
 'exec',
 'bgp',
 'nnsp',
 'iso_tsap',
 'http_443',
 'login',
 'shell',
 'printer',
 'efs',
 'courier',
 'uucp',
 'kshell',
 'klogin',
 'whois',
 'echo',
 'discard',
 'systat',
 'netstat',
 'hostnames',
 'csnet_ns',
 'pop_2',
 'sunrpc',
 'uucp_path',
 'nntp',
 'netbios_ns',
 'netbios_ssn',
 'netbios_dgm',
 'imap4',
 'sql_net',
 'vmnet',
 'link',
 'Z39_50',
 'ldap',
 'urh_i',
 'tftp_u',
 'red_i',
 'tim_i']

In [36]:
# all the combination between f1 and f2
len(feature_1.cartesian(feature_2).collect())


Out[36]:
198

In [37]:
csv = raw_data.map(lambda x: x.split(","))
normal_data = csv.filter(lambda x: x[41]=="normal.")

In [38]:
# Get total duration
duration = normal_data.map(lambda x: int(x[0]))
total_duration = duration.reduce(lambda x, y: x+y)
total_duration


Out[38]:
211895753

In [39]:
# average duration
total_duration/(normal_data.count())


Out[39]:
217.82472416710442

Computations using Aggregate


In [48]:
duration_count = duration.aggregate(
    (0,0),
    (lambda db, new_value: (db[0] + new_value, db[1] + 1)),
    (lambda db1, db2: (db1[0] + db2[0], db1[1] + db2[1]))
)

In [49]:
duration_count[0]/duration_count[1]


Out[49]:
217.82472416710442

In [50]:
# PIVOT
kv_duration = csv.map(lambda x: (x[41], float(x[0]))).reduceByKey(lambda x, y: x+y)
kv_duration.collect()


Out[50]:
[('normal.', 211895753.0),
 ('buffer_overflow.', 2751.0),
 ('loadmodule.', 326.0),
 ('perl.', 124.0),
 ('neptune.', 2.0),
 ('smurf.', 0.0),
 ('guess_passwd.', 144.0),
 ('pod.', 0.0),
 ('teardrop.', 0.0),
 ('portsweep.', 24257982.0),
 ('ipsweep.', 13049.0),
 ('land.', 0.0),
 ('ftp_write.', 259.0),
 ('back.', 284.0),
 ('imap.', 72.0),
 ('satan.', 500.0),
 ('phf.', 18.0),
 ('nmap.', 0.0),
 ('multihop.', 1288.0),
 ('warezmaster.', 301.0),
 ('warezclient.', 627563.0),
 ('spy.', 636.0),
 ('rootkit.', 1008.0)]

In [51]:
kv = csv.map(lambda x: (x[41], x))
kv.take(1)


Out[51]:
[('normal.',
  ['0',
   'tcp',
   'http',
   'SF',
   '215',
   '45076',
   '0',
   '0',
   '0',
   '0',
   '0',
   '1',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '1',
   '1',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '1.00',
   '0.00',
   '0.00',
   '0',
   '0',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   'normal.'])]

In [52]:
kv.countByKey()


Out[52]:
defaultdict(int,
            {'normal.': 972781,
             'buffer_overflow.': 30,
             'loadmodule.': 9,
             'perl.': 3,
             'neptune.': 1072017,
             'smurf.': 2807886,
             'guess_passwd.': 53,
             'pod.': 264,
             'teardrop.': 979,
             'portsweep.': 10413,
             'ipsweep.': 12481,
             'land.': 21,
             'ftp_write.': 8,
             'back.': 2203,
             'imap.': 12,
             'satan.': 15892,
             'phf.': 4,
             'nmap.': 2316,
             'multihop.': 7,
             'warezmaster.': 20,
             'warezclient.': 1020,
             'spy.': 2,
             'rootkit.': 10})

In [ ]: