Migrating from Spark to BigQuery via Dataproc -- Part 3

  • Part 1: The original Spark code, now running on Dataproc (lift-and-shift).
  • Part 2: Replace HDFS by Google Cloud Storage. This enables job-specific-clusters. (cloud-native)
  • Part 3: Automate everything, so that we can run in a job-specific cluster. (cloud-optimized)
  • Part 4: Load CSV into BigQuery, use BigQuery. (modernize)
  • Part 5: Using Cloud Functions, launch analysis every time there is a new file in the bucket. (serverless)

Catch up: data to GCS


In [ ]:
# Catch up cell. Run if you did not do previous notebooks of this sequence
!wget http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz
BUCKET='cloud-training-demos-ml'  # CHANGE
!pip install google-compute-engine
!gsutil cp kdd* gs://$BUCKET/

In [ ]:
BUCKET='cloud-training-demos-ml'  # CHANGE
!gsutil ls gs://$BUCKET/kdd*

Create a Python file

Put all the code in a Python file. We can comment out the display-only code such as take() and show() Make changeable settings like BUCKET come from sys.args


In [ ]:
%%writefile spark_analysis.py

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--bucket", help="bucket for input and output")
args = parser.parse_args()

BUCKET = args.bucket

In [ ]:
%%writefile -a spark_analysis.py

from pyspark.sql import SparkSession, SQLContext, Row

spark = SparkSession.builder.appName("kdd").getOrCreate()
sc = spark.sparkContext
data_file = "gs://{}/kddcup.data_10_percent.gz".format(BUCKET)
raw_rdd = sc.textFile(data_file).cache()
#raw_rdd.take(5)

In [ ]:
%%writefile -a spark_analysis.py

csv_rdd = raw_rdd.map(lambda row: row.split(","))
parsed_rdd = csv_rdd.map(lambda r: Row(
    duration=int(r[0]), 
    protocol_type=r[1],
    service=r[2],
    flag=r[3],
    src_bytes=int(r[4]),
    dst_bytes=int(r[5]),
    wrong_fragment=int(r[7]),
    urgent=int(r[8]),
    hot=int(r[9]),
    num_failed_logins=int(r[10]),
    num_compromised=int(r[12]),
    su_attempted=r[14],
    num_root=int(r[15]),
    num_file_creations=int(r[16]),
    label=r[-1]
    )
)
#parsed_rdd.take(5)

In [ ]:
%%writefile -a spark_analysis.py

sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(parsed_rdd)
connections_by_protocol = df.groupBy('protocol_type').count().orderBy('count', ascending=False)
connections_by_protocol.show()

In [ ]:
%%writefile -a spark_analysis.py

df.registerTempTable("connections")
attack_stats = sqlContext.sql("""
                           SELECT 
                             protocol_type, 
                             CASE label
                               WHEN 'normal.' THEN 'no attack'
                               ELSE 'attack'
                             END AS state,
                             COUNT(*) as total_freq,
                             ROUND(AVG(src_bytes), 2) as mean_src_bytes,
                             ROUND(AVG(dst_bytes), 2) as mean_dst_bytes,
                             ROUND(AVG(duration), 2) as mean_duration,
                             SUM(num_failed_logins) as total_failed_logins,
                             SUM(num_compromised) as total_compromised,
                             SUM(num_file_creations) as total_file_creations,
                             SUM(su_attempted) as total_root_attempts,
                             SUM(num_root) as total_root_acceses
                           FROM connections
                           GROUP BY protocol_type, state
                           ORDER BY 3 DESC
                           """)
attack_stats.show()

In [ ]:
%%writefile -a spark_analysis.py

ax = attack_stats.toPandas().plot.bar(x='protocol_type', subplots=True, figsize=(10,25))

Write out report

Make sure to copy the output to GCS so that we can safely delete the cluster. This has to be pure Python, so replace shell commands by equivalent Python code.


In [ ]:
%%writefile -a spark_analysis.py

ax[0].get_figure().savefig('report.png');
#!gsutil rm -rf gs://$BUCKET/sparktobq/
#!gsutil cp report.png gs://$BUCKET/sparktobq/

In [ ]:
%%writefile -a spark_analysis.py

import google.cloud.storage as gcs
bucket = gcs.Client().get_bucket(BUCKET)
for blob in bucket.list_blobs(prefix='sparktobq/'):
    blob.delete()
bucket.blob('sparktobq/report.png').upload_from_filename('report.png')

In [ ]:
%%writefile -a spark_analysis.py

connections_by_protocol.write.format("csv").mode("overwrite").save(
    "gs://{}/sparktobq/connections_by_protocol".format(BUCKET))

Test automation

Run it standalone


In [ ]:
BUCKET='cloud-training-demos-ml'  # CHANGE
print('Writing to {}'.format(BUCKET))
!python spark_analysis.py --bucket=$BUCKET

In [ ]:
!gsutil ls gs://$BUCKET/sparktobq/**

Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.