Migrating from Spark to BigQuery via Dataproc -- Part 5

  • Part 1: The original Spark code, now running on Dataproc (lift-and-shift).
  • Part 2: Replace HDFS by Google Cloud Storage. This enables job-specific-clusters. (cloud-native)
  • Part 3: Automate everything, so that we can run in a job-specific cluster. (cloud-optimized)
  • Part 4: Load CSV into BigQuery, use BigQuery. (modernize)
  • Part 5: Using Cloud Functions, launch analysis every time there is a new file in the bucket. (serverless)

Catch-up cell

In [ ]:
wget http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz
gunzip kddcup.data_10_percent.gz
BUCKET='cloud-training-demos-ml'  # CHANGE
gsutil cp kdd* gs://$BUCKET/
bq mk sparktobq

Create reporting function

In [ ]:
%%writefile main.py

from google.cloud import bigquery
import google.cloud.storage as gcs
import tempfile
import os

def create_report(BUCKET, gcsfilename, tmpdir):
    Creates report in gs://BUCKET/ based on contents in gcsfilename (gs://bucket/some/dir/filename)
    # connect to BigQuery
    client = bigquery.Client()
    destination_table = client.get_table('sparktobq.kdd_cup')
    # Specify table schema. Autodetect is not a good idea for production code
    job_config = bigquery.LoadJobConfig()
    schema = [
        bigquery.SchemaField("duration", "INT64"),
    for name in ['protocol_type', 'service', 'flag']:
        schema.append(bigquery.SchemaField(name, "STRING"))
    for name in 'src_bytes,dst_bytes,wrong_fragment,urgent,hot,num_failed_logins'.split(','):
        schema.append(bigquery.SchemaField(name, "INT64"))
    schema.append(bigquery.SchemaField("unused_10", "STRING"))
    schema.append(bigquery.SchemaField("num_compromised", "INT64"))
    schema.append(bigquery.SchemaField("unused_12", "STRING"))
    for name in 'su_attempted,num_root,num_file_creations'.split(','):
        schema.append(bigquery.SchemaField(name, "INT64")) 
    for fieldno in range(16, 41):
        schema.append(bigquery.SchemaField("unused_{}".format(fieldno), "STRING"))
    schema.append(bigquery.SchemaField("label", "STRING"))
    job_config.schema = schema

    # Load CSV data into BigQuery, replacing any rows that were there before
    job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
    job_config.skip_leading_rows = 0
    job_config.source_format = bigquery.SourceFormat.CSV
    load_job = client.load_table_from_uri(gcsfilename, destination_table, job_config=job_config)
    print("Starting LOAD job {} for {}".format(load_job.job_id, gcsfilename))
    load_job.result()  # Waits for table load to complete.
    print("Finished LOAD job {}".format(load_job.job_id))
    # connections by protocol
    sql = """
        SELECT COUNT(*) AS count
        FROM sparktobq.kdd_cup
        GROUP BY protocol_type
        ORDER by count ASC    
    connections_by_protocol = client.query(sql).to_dataframe()
    print("Finished analyzing connections")
    # attacks plot
    sql = """
                             CASE label
                               WHEN 'normal.' THEN 'no attack'
                               ELSE 'attack'
                             END AS state,
                             COUNT(*) as total_freq,
                             ROUND(AVG(src_bytes), 2) as mean_src_bytes,
                             ROUND(AVG(dst_bytes), 2) as mean_dst_bytes,
                             ROUND(AVG(duration), 2) as mean_duration,
                             SUM(num_failed_logins) as total_failed_logins,
                             SUM(num_compromised) as total_compromised,
                             SUM(num_file_creations) as total_file_creations,
                             SUM(su_attempted) as total_root_attempts,
                             SUM(num_root) as total_root_acceses
                           FROM sparktobq.kdd_cup
                           GROUP BY protocol_type, state
                           ORDER BY 3 DESC
    attack_stats = client.query(sql).to_dataframe()
    ax = attack_stats.plot.bar(x='protocol_type', subplots=True, figsize=(10,25))
    print("Finished analyzing attacks")
    bucket = gcs.Client().get_bucket(BUCKET)
    for blob in bucket.list_blobs(prefix='sparktobq/'):
    for fname in ['report.png', 'connections_by_protocol.csv']:
    print("Uploaded report based on {} to {}".format(gcsfilename, BUCKET))

def bigquery_analysis_cf(data, context):
    # check that trigger is for a file of interest
    bucket = data['bucket']
    name = data['name']
    if ('kddcup' in name) and not ('gz' in name):
        filename = 'gs://{}/{}'.format(bucket, data['name'])
        print(bucket, filename)
        with tempfile.TemporaryDirectory() as tmpdir:
            create_report(bucket, filename, tmpdir)

In [ ]:
%%writefile requirements.txt

In [ ]:
# verify that the code in the CF works
if 'kddcup' in name and not ('gz' in name):

Test that the function endpoint works

In [ ]:
# test that the function works
import main as bq

BUCKET='cloud-training-demos-ml' # CHANGE
    bq.create_report(BUCKET, 'gs://{}/kddcup.data_10_percent'.format(BUCKET), "/tmp")
except Exception as e:

Deploy the cloud function

In [ ]:
!gcloud functions deploy bigquery_analysis_cf --runtime python37 --trigger-resource $BUCKET --trigger-event google.storage.object.finalize

Deploying function (may take a while - up to 2 minutes)...⠼

Try it out

Copy the file to the bucket:

In [ ]:
!gsutil rm -rf gs://$BUCKET/sparktobq
!gsutil cp kddcup.data_10_percent gs://$BUCKET/

Verify that the Cloud Function is being run. You can do this from the Cloud Functions part of the GCP Console.

Once the function is complete (in about 30 seconds), see if the output folder contains the report:

In [ ]:
!gsutil ls gs://$BUCKET/sparktobq

Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.