In [4]:
import findspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
findspark.init()
from pyspark.sql import *
# from pyspark import SparkContext
In [5]:
sc = SparkContext('local', 'HandsOn PySpark')
In [6]:
visitors = [10,3,35,25,41,9,29]
In [7]:
df_visitors = sc.parallelize(visitors)
# df_visitors_yearly = df_visitors.map(lambda x: x * 365).collect()
# print(df_visitors_yearly)
In [8]:
df_visitors_yearly = df_visitors.map(lambda x: x * 365).collect()
In [10]:
type(df_visitors)
print(df_visitors_yearly)
In [11]:
# !pip install --upgrade pyspark
In [13]:
# dir(df_visitors)
In [ ]:
# !cp /Users/director/Downloads/spark_rd.md .
In [14]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL My sample") \
.master("local[*]") \
.getOrCreate()
In [15]:
text_file = spark.read.text("spark_rd.md")
In [16]:
text_file.count()
Out[16]:
In [17]:
text_file.first()
Out[17]:
In [18]:
lines_with_spark = text_file.filter(text_file.value.contains("Spark"))
In [19]:
lines_with_spark.show(3)
In [20]:
text_file.filter(text_file.value.contains("Spark")).count()
Out[20]:
In [21]:
data_src = "http://archive.ics.uci.edu/ml/machine-learning-databases/kddcup99-mld/kddcup.data.gz"
import urllib.request
In [22]:
# f = urllib.request.urlretrieve(data_src, "kddcup.data.gz")
In [23]:
sc
Out[23]:
In [24]:
raw_cup_data = sc.textFile("./kddcup.data.gz")
In [25]:
contains_normal = raw_cup_data.filter(lambda line: "normal." in line)
In [26]:
contains_normal.count()
Out[26]:
In [27]:
split_file = raw_cup_data.map(lambda line: line.split(","))
In [46]:
# split_file.take(2)
In [29]:
from time import time
In [30]:
sampled = raw_cup_data.sample(False, 0.1, 42) # , 10%, seed: 42
In [31]:
contains_norma_sample = sampled.map(lambda x: x.split(",")).filter(lambda x: "normal"in x)
In [32]:
t0 = time()
num_sampled = contains_norma_sample.count()
duration = time() - t0
print(duration)
In [33]:
contains_norma = raw_cup_data.map(lambda x: x.split(",")).filter(lambda x: "normal"in x)
In [34]:
t1 = time()
full_count = contains_norma.count()
duration = time() - t1
print(duration)
In [35]:
data_in_memory = raw_cup_data.takeSample(False, 20, 42)
contains_normal_py = [line.split(",") for line in data_in_memory if "normal" in line]
In [36]:
len(contains_normal_py)
Out[36]:
In [37]:
# Lines without Normal
normal_sample = sampled.filter(lambda line: "normal" in line)
In [38]:
non_normal_sample = sampled.subtract(normal_sample)
In [39]:
sampled.count()
Out[39]:
In [40]:
normal_sample.count()
Out[40]:
In [41]:
non_normal_sample.count()
Out[41]:
In [42]:
# AGGREGATIONS
csv = raw_cup_data.map(lambda x: x.split(","))
normal_data = csv.filter(lambda x: x[41] == "normal.")
# normal_data = csv.filter(lambda x: "normal" in x)
duration = normal_data.map(lambda x: int(x[0]))
In [43]:
duration.take(3)
Out[43]:
In [44]:
total_duration = duration.reduce(lambda x,y: x+y)
total_duration
Out[44]:
In [45]:
avg_value = total_duration/(normal_data.count())
avg_value
Out[45]:
In [47]:
duration_count = duration.aggregate(
(0,0),
(lambda db, new_value: (db[0] + new_value, db[1] + 1)),
(lambda db1, db2: (db1[0] + db2[0],db1[1] + db2[1]))
)
duration_count[0]/duration_count[1]
Out[47]:
In [48]:
first_4 = [1,2,3,4]
seqOp = (lambda x,y: (x[0] + y, x[1] + 1))
combOp = (lambda x,y: (x[0] + y[0], x[1] + y[1]))
In [49]:
res_4 = sc.parallelize(first_4).aggregate((0,0), seqOp, combOp)
res_4
Out[49]:
In [58]:
fives = [2,4,6,8,10]
res_5 = sc.parallelize(fives).aggregate((0,0), seqOp, combOp) # (0,0) - init value, seqOp - functin=on
res_5
Out[58]:
In [60]:
kv = csv.map(lambda x: (x[41], x))
kv.take(1)
Out[60]:
In [61]:
kv_duration = csv.map(lambda x: (x[41], float(x[0]))).reduceByKey(lambda x,y: x + y)
kv_duration.collect()
Out[61]:
In [62]:
kv.countByKey()
Out[62]:
In [63]:
# Computing Summary Statistics
from pyspark.mllib.linalg import Vectors
In [64]:
# rdd = sc.parallelize([Vectors.dense()])
In [65]:
from pyspark.mllib.stat import Statistics
In [66]:
duration = csv.map(lambda x: [int(x[0])])
In [67]:
summary = Statistics.colStats(duration)
summary.mean()[0]
Out[67]:
In [68]:
from math import sqrt
sqrt(summary.variance()[0]) #std dev
Out[68]:
In [69]:
summary.max()
Out[69]:
In [70]:
summary.min()
Out[70]:
In [72]:
metrics = csv.map(lambda x: [x[0], x[4], x[5]])
spearman_corr = Statistics.corr(metrics, method="spearman")
Out[72]:
In [ ]:
spearman_corr
In [73]:
pearson_corr = Statistics.corr(metrics, method="pearson")
pearson_corr
Out[73]:
In [74]:
from pyspark.mllib.linalg import Vectors
In [75]:
visitors_freq = Vectors.dense(0.13,0.61,0.8,0.5,0.3)
print(Statistics.chiSqTest(visitors_freq))
In [76]:
from pyspark.sql import Row, SQLContext
rows = csv.map(lambda p: Row(duration=int(p[0]), protocol=p[1], service=p[2]))
In [77]:
sql_context = SQLContext(sc)
In [78]:
df = sql_context.createDataFrame(rows)
df.registerTempTable("rdd")
In [83]:
sql_context.sql("""SELECT duration FROM rdd WHERE protocol = 'tcp' AND duration > 2000""").show(5)
In [82]:
df.select("duration").filter(df.duration > 2000).filter(df.protocol=="tcp").show(5)
In [ ]: