In [1]:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext('local', 'Hands on PySpark')
In [2]:
# sc.stop()
In [3]:
visitors = [10, 3, 35, 25, 41, 9, 29]
df_visitors = sc.parallelize(visitors)
df_visitors_yearly = df_visitors.map(lambda x: x*365).collect()
print(df_visitors_yearly)
In [4]:
df_visitors.take(3)
Out[4]:
In [5]:
df_visitors_yearly
Out[5]:
In [6]:
import urllib.request
In [7]:
# try:
# urllib.request.urlretrieve("https://archive.ics.uci.edu/ml/machine-learning-databases/kddcup99-mld/kddcup.data.gz"),"kddcup.data.gz"
# except Exception as e:
# print("Could not download data", e)
In [8]:
# dd = urllib.request.urlretrieve("https://archive.ics.uci.edu/ml/machine-learning-databases/kddcup99-mld/kddcup.data.gz"),"kddcup.data.gz"
In [9]:
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/kddcup99-mld/kddcup.data.gz
In [10]:
sc
Out[10]:
In [11]:
raw_data = sc.textFile("kddcup.data.gz")
In [12]:
raw_data
Out[12]:
In [13]:
!ls -laS
In [14]:
split_file = raw_data.map(lambda line: line.split(","))
In [15]:
split_file.take(6)
Out[15]:
In [16]:
# raw_data.collect()
In [17]:
from time import time
In [18]:
sampled = raw_data.sample(False, 0.1, 42)
In [19]:
contains_normal_sample = sampled.map(lambda x: x.split(",")).filter(lambda x: "normal" in x)
In [20]:
t0 = time()
num_sampled = contains_normal_sample.count()
duration = time() - t0
In [21]:
duration
Out[21]:
In [22]:
contains_normal = raw_data.map(lambda x: x.split(",")).filter(lambda x: "normal" in x)
t0 = time()
num_sampled = contains_normal.count()
duration = time() - t0
In [23]:
duration
Out[23]:
In [24]:
data_in_memory = raw_data.takeSample(False, 10, 42)
In [25]:
contains_normal_py = [line.split(",") for line in data_in_memory if "normal" in line]
len(contains_normal_py)
Out[25]:
In [26]:
normal_sample = sampled.filter(lambda line: "normal." in line)
In [27]:
non_normal_sample = sampled.subtract(normal_sample)
In [28]:
sampled.count()
Out[28]:
In [29]:
normal_sample.count()
Out[29]:
In [30]:
non_normal_sample.count()
Out[30]:
In [31]:
feature_1 = sampled.map(lambda line: line.split(",")).map(lambda features: features[1]).distinct()
In [32]:
feature_2 = sampled.map(lambda line: line.split(",")).map(lambda features: features[2]).distinct()
In [33]:
f1 = feature_1.collect()
f2 = feature_2.collect()
In [34]:
f1
Out[34]:
In [35]:
f2
Out[35]:
In [36]:
# all the combination between f1 and f2
len(feature_1.cartesian(feature_2).collect())
Out[36]:
In [37]:
csv = raw_data.map(lambda x: x.split(","))
normal_data = csv.filter(lambda x: x[41]=="normal.")
In [38]:
# Get total duration
duration = normal_data.map(lambda x: int(x[0]))
total_duration = duration.reduce(lambda x, y: x+y)
total_duration
Out[38]:
In [39]:
# average duration
total_duration/(normal_data.count())
Out[39]:
In [48]:
duration_count = duration.aggregate(
(0,0),
(lambda db, new_value: (db[0] + new_value, db[1] + 1)),
(lambda db1, db2: (db1[0] + db2[0], db1[1] + db2[1]))
)
In [49]:
duration_count[0]/duration_count[1]
Out[49]:
In [50]:
# PIVOT
kv_duration = csv.map(lambda x: (x[41], float(x[0]))).reduceByKey(lambda x, y: x+y)
kv_duration.collect()
Out[50]:
In [51]:
kv = csv.map(lambda x: (x[41], x))
kv.take(1)
Out[51]:
In [52]:
kv.countByKey()
Out[52]:
In [ ]: