In [ ]:
# Catch up cell. Run if you did not do previous notebooks of this sequence
!wget http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz
BUCKET='cloud-training-demos-ml' # CHANGE
!pip install google-compute-engine
!gsutil cp kdd* gs://$BUCKET/
In [ ]:
BUCKET='cloud-training-demos-ml' # CHANGE
!gsutil ls gs://$BUCKET/kdd*
In [ ]:
%%writefile spark_analysis.py
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--bucket", help="bucket for input and output")
args = parser.parse_args()
BUCKET = args.bucket
In [ ]:
%%writefile -a spark_analysis.py
from pyspark.sql import SparkSession, SQLContext, Row
spark = SparkSession.builder.appName("kdd").getOrCreate()
sc = spark.sparkContext
data_file = "gs://{}/kddcup.data_10_percent.gz".format(BUCKET)
raw_rdd = sc.textFile(data_file).cache()
#raw_rdd.take(5)
In [ ]:
%%writefile -a spark_analysis.py
csv_rdd = raw_rdd.map(lambda row: row.split(","))
parsed_rdd = csv_rdd.map(lambda r: Row(
duration=int(r[0]),
protocol_type=r[1],
service=r[2],
flag=r[3],
src_bytes=int(r[4]),
dst_bytes=int(r[5]),
wrong_fragment=int(r[7]),
urgent=int(r[8]),
hot=int(r[9]),
num_failed_logins=int(r[10]),
num_compromised=int(r[12]),
su_attempted=r[14],
num_root=int(r[15]),
num_file_creations=int(r[16]),
label=r[-1]
)
)
#parsed_rdd.take(5)
In [ ]:
%%writefile -a spark_analysis.py
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(parsed_rdd)
connections_by_protocol = df.groupBy('protocol_type').count().orderBy('count', ascending=False)
connections_by_protocol.show()
In [ ]:
%%writefile -a spark_analysis.py
df.registerTempTable("connections")
attack_stats = sqlContext.sql("""
SELECT
protocol_type,
CASE label
WHEN 'normal.' THEN 'no attack'
ELSE 'attack'
END AS state,
COUNT(*) as total_freq,
ROUND(AVG(src_bytes), 2) as mean_src_bytes,
ROUND(AVG(dst_bytes), 2) as mean_dst_bytes,
ROUND(AVG(duration), 2) as mean_duration,
SUM(num_failed_logins) as total_failed_logins,
SUM(num_compromised) as total_compromised,
SUM(num_file_creations) as total_file_creations,
SUM(su_attempted) as total_root_attempts,
SUM(num_root) as total_root_acceses
FROM connections
GROUP BY protocol_type, state
ORDER BY 3 DESC
""")
attack_stats.show()
In [ ]:
%%writefile -a spark_analysis.py
ax = attack_stats.toPandas().plot.bar(x='protocol_type', subplots=True, figsize=(10,25))
In [ ]:
%%writefile -a spark_analysis.py
ax[0].get_figure().savefig('report.png');
#!gsutil rm -rf gs://$BUCKET/sparktobq/
#!gsutil cp report.png gs://$BUCKET/sparktobq/
In [ ]:
%%writefile -a spark_analysis.py
import google.cloud.storage as gcs
bucket = gcs.Client().get_bucket(BUCKET)
for blob in bucket.list_blobs(prefix='sparktobq/'):
blob.delete()
bucket.blob('sparktobq/report.png').upload_from_filename('report.png')
In [ ]:
%%writefile -a spark_analysis.py
connections_by_protocol.write.format("csv").mode("overwrite").save(
"gs://{}/sparktobq/connections_by_protocol".format(BUCKET))
In [ ]:
BUCKET='cloud-training-demos-ml' # CHANGE
print('Writing to {}'.format(BUCKET))
!python spark_analysis.py --bucket=$BUCKET
In [ ]:
!gsutil ls gs://$BUCKET/sparktobq/**
Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.