Migrating from Spark to BigQuery via Dataproc -- Part 1

  • Part 1: The original Spark code, now running on Dataproc (lift-and-shift).
  • Part 2: Replace HDFS by Google Cloud Storage. This enables job-specific-clusters. (cloud-native)
  • Part 3: Automate everything, so that we can run in a job-specific cluster. (cloud-optimized)
  • Part 4: Load CSV into BigQuery, use BigQuery. (modernize)
  • Part 5: Using Cloud Functions, launch analysis every time there is a new file in the bucket. (serverless)

Copy data to HDFS

The Spark code in this notebook is based loosely on the code accompanying this post by Dipanjan Sarkar. I am using it to illustrate migrating a Spark analytics workload to BigQuery via Dataproc.

The data itself comes from the 1999 KDD competition. Let's grab 10% of the data to use as an illustration.

In [1]:
!wget http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz

In [2]:
!hadoop fs -put kddcup* /

In [3]:
!hadoop fs -ls /

Reading in data

The data are CSV files. In Spark, these can be read using textFile and splitting rows on commas.

In [4]:
from pyspark.sql import SparkSession, SQLContext, Row

spark = SparkSession.builder.appName("kdd").getOrCreate()
sc = spark.sparkContext
data_file = "hdfs:///kddcup.data_10_percent.gz"
raw_rdd = sc.textFile(data_file).cache()


In [5]:
csv_rdd = raw_rdd.map(lambda row: row.split(","))
parsed_rdd = csv_rdd.map(lambda r: Row(

[Row(dst_bytes=5450, duration=0, flag='SF', hot=0, label='normal.', num_compromised=0, num_failed_logins=0, num_file_creations=0, num_root=0, protocol_type='tcp', service='http', src_bytes=181, su_attempted='0', urgent=0, wrong_fragment=0),
 Row(dst_bytes=486, duration=0, flag='SF', hot=0, label='normal.', num_compromised=0, num_failed_logins=0, num_file_creations=0, num_root=0, protocol_type='tcp', service='http', src_bytes=239, su_attempted='0', urgent=0, wrong_fragment=0),
 Row(dst_bytes=1337, duration=0, flag='SF', hot=0, label='normal.', num_compromised=0, num_failed_logins=0, num_file_creations=0, num_root=0, protocol_type='tcp', service='http', src_bytes=235, su_attempted='0', urgent=0, wrong_fragment=0),
 Row(dst_bytes=1337, duration=0, flag='SF', hot=0, label='normal.', num_compromised=0, num_failed_logins=0, num_file_creations=0, num_root=0, protocol_type='tcp', service='http', src_bytes=219, su_attempted='0', urgent=0, wrong_fragment=0),
 Row(dst_bytes=2032, duration=0, flag='SF', hot=0, label='normal.', num_compromised=0, num_failed_logins=0, num_file_creations=0, num_root=0, protocol_type='tcp', service='http', src_bytes=217, su_attempted='0', urgent=0, wrong_fragment=0)]

Spark analysis

One way to analyze data in Spark is to call methods on a dataframe.

In [6]:
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(parsed_rdd)
connections_by_protocol = df.groupBy('protocol_type').count().orderBy('count', ascending=False)

|protocol_type| count|
|         icmp|283602|
|          tcp|190065|
|          udp| 20354|

Another way is to use Spark SQL

In [7]:
attack_stats = sqlContext.sql("""
                             CASE label
                               WHEN 'normal.' THEN 'no attack'
                               ELSE 'attack'
                             END AS state,
                             COUNT(*) as total_freq,
                             ROUND(AVG(src_bytes), 2) as mean_src_bytes,
                             ROUND(AVG(dst_bytes), 2) as mean_dst_bytes,
                             ROUND(AVG(duration), 2) as mean_duration,
                             SUM(num_failed_logins) as total_failed_logins,
                             SUM(num_compromised) as total_compromised,
                             SUM(num_file_creations) as total_file_creations,
                             SUM(su_attempted) as total_root_attempts,
                             SUM(num_root) as total_root_acceses
                           FROM connections
                           GROUP BY protocol_type, state
                           ORDER BY 3 DESC

|protocol_type|    state|total_freq|mean_src_bytes|mean_dst_bytes|mean_duration|total_failed_logins|total_compromised|total_file_creations|total_root_attempts|total_root_acceses|
|         icmp|   attack|    282314|        932.14|           0.0|          0.0|                  0|                0|                   0|                0.0|                 0|
|          tcp|   attack|    113252|       9880.38|        881.41|        23.19|                 57|             2269|                  76|                1.0|               152|
|          tcp|no attack|     76813|       1439.31|       4263.97|        11.08|                 18|             2776|                 459|               17.0|              5456|
|          udp|no attack|     19177|         98.01|         89.89|      1054.63|                  0|                0|                   0|                0.0|                 0|
|         icmp|no attack|      1288|         91.47|           0.0|          0.0|                  0|                0|                   0|                0.0|                 0|
|          udp|   attack|      1177|          27.5|          0.23|          0.0|                  0|                0|                   0|                0.0|                 0|

In [8]:
%matplotlib inline
ax = attack_stats.toPandas().plot.bar(x='protocol_type', subplots=True, figsize=(10,25))

