Licensed under the Apache License, Version 2.0 (the "License"); <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.

-->

Explore data from covidtracking.com

The data set is relatively small and used as a demonstration of working with Beam in an interactive notebook environment.

There are two ways to get the data:

  • Get json data from APIs.
  • Download data in csv files directly.

We'll have a batch Beam pipeline example utilizing either method.


In [ ]:
import json
import requests

json_current='https://covidtracking.com/api/v1/states/current.json'
json_historical='https://covidtracking.com/api/v1/states/daily.json'

def get_json_data(url):
  with requests.Session() as session:
    data = json.loads(session.get(url).text)
  return data

csv_current = 'https://covidtracking.com/api/v1/states/current.csv'
csv_historical = 'https://covidtracking.com/api/v1/states/daily.csv'

def download_csv(url, filename):
  if not filename.endswith('.csv'):
    filename = filename + '.csv'
  with requests.Session() as session:
    with open(filename, 'wb') as f:
      f.write(session.get(url).content)
  return filename

Below reads data into memory as json.


In [ ]:
current_data = get_json_data(json_current)

Below downloads data in csv format stored in files.


In [ ]:
csv_file_current = download_csv(csv_current, 'current')

Prepare some Apache Beam dependencies.


In [ ]:
import apache_beam as beam
from apache_beam.runners.interactive import interactive_beam as ib
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner

Create a Beam pipeline.


In [ ]:
p = beam.Pipeline(runner=InteractiveRunner())

You can create a PCollection from either in-memory json data or data in files.


In [ ]:
current_data_from_json = p | 'Create PCollection from json' >> beam.Create(current_data)
current_data_from_files = p | 'Create PCollection from files' >> beam.io.ReadFromText(csv_file_current, skip_header_lines=1)

The in-memory json data is already structured.


In [ ]:
ib.show(current_data_from_json)

The data from files read as plain text is not structured, we'll have to handle it.

For a batch pipeline reading files with huge content size, it's normal to read source data from files and let Beam handle the work load.


In [ ]:
ib.show(current_data_from_files)

We'll parse the plain texts into structured data with Beam SDK.


In [ ]:
from csv import reader

def read_headers(csv_file):
  with open(csv_file, 'r') as f:
    header_line = f.readline().strip()
  return next(reader([header_line]))

current_data_headers = read_headers(csv_file_current)

In [ ]:
from collections import namedtuple

UsCovidData = namedtuple('UsCovidData', current_data_headers)

class UsCovidDataCsvReader(beam.DoFn):
  def __init__(self, schema):
    self._schema = schema
    
  def process(self, element):
    values = [int(val) if val.isdigit() else val for val in next(reader([element]))]
    return [self._schema(*values)]

In [ ]:
current_data = current_data_from_files | 'Parse' >> beam.ParDo(UsCovidDataCsvReader(UsCovidData))

In [ ]:
ib.show(current_data)

With Interactive Beam, you can collect a PCollection into a pandas dataframe. It's useful when you just want to play with small test data sets locally on a single machine.


In [ ]:
df = ib.collect(current_data)
df.describe()

Now let's take a deeper look into the data with the visualization feature of Interactive Beam and come up with some tasks.


In [ ]:
ib.show(current_data, visualize_data=True)

We can find out that NY currently has the most positive COVID cases with above facets visualization because the data set is small (for demo).

Now we can write a beam transform to try to get that same conclusion of which state has the highest positive number currently.


In [ ]:
from functools import total_ordering

@total_ordering
class UsCovidDataOrderByPositive:
  def __init__(self, data):
    self._data = data
  
  def __gt__(self, other):
    return self._data.positive > other._data.positive


def maximum_positive(values):
  return max(values) if values else None

max_positive = (current_data 
                | 'Data OrderByPositive' >> beam.Map(lambda data: UsCovidDataOrderByPositive(data))
                | 'Find Maximum Positive' >> beam.CombineGlobally(maximum_positive)
                | 'Convert Back to Data' >> beam.Map(lambda orderable_data: orderable_data._data))

In [ ]:
ib.show(max_positive)

We can also try to come up with the total positive case number in the US.


In [ ]:
total_positive = (current_data
                  | 'Positive Per State' >> beam.Map(lambda data: data.positive)
                  | 'Total Positive' >> beam.CombineGlobally(sum))

In [ ]:
ib.show(total_positive)

Now let's look at some more complicated data: the historical data.

It contains similar data to current for each day until current day.


In [ ]:
csv_file_historical = download_csv(csv_historical, 'historical')

In [ ]:
historical_data_from_files = p | 'Create PCollection for historical data from files' >> beam.io.ReadFromText(csv_file_historical, skip_header_lines=1)

In [ ]:
ib.show(historical_data_from_files)

In [ ]:
historical_data_headers = read_headers(csv_file_historical)

HistoricalUsCovidData = namedtuple('HistoricalUsCovidData', historical_data_headers)

historical_data = historical_data_from_files | 'Parse' >> beam.ParDo(UsCovidDataCsvReader(HistoricalUsCovidData))

In [ ]:
ib.show(historical_data)

For demostration, let's just take a look at NY throughout the timeline.


In [ ]:
class FilterByState(beam.DoFn):
  def __init__(self, state):
    self._state = state
    
  def process(self, element):
    if element.state == self._state:
      yield element

ny_data = historical_data | 'Filter NY' >> beam.ParDo(FilterByState('NY'))

Then we do a visualization to see if there is anything worth looking for.


In [ ]:
ib.show(ny_data, visualize_data=True)

There happens to be a field named positiveIncrease. If not, we'll need to write some transforms to deduce the per day positive increment value.

Now let's try to find out the date with the most positiveIncrease for NY.


In [ ]:
@total_ordering
class UsCovidDataOrderByPositiveIncrease:
  def __init__(self, data):
    self._data = data
  
  def __gt__(self, other):
    self_positive_increase = self._data.positiveIncrease if self._data.positiveIncrease else 0
    other_positive_increase = other._data.positiveIncrease if other._data.positiveIncrease else 0
    return self_positive_increase > other_positive_increase


def maximum_positive_increase(values):
  return max(values) if values else None

worst_day = (ny_data
             | 'Order By PositiveIncrease' >> beam.Map(lambda data: UsCovidDataOrderByPositiveIncrease(data))
             | 'Maximum Positive Increase' >> beam.CombineGlobally(maximum_positive_increase)
             | 'Convert Back to Data' >> beam.Map(lambda orderable_data: orderable_data._data)
             | 'Extract Date' >> beam.Map(lambda data: data.date))

In [ ]:
ib.show(worst_day)