Stateful Stream

This notebook demonstrates the Stateful Stream design pattern

Anomaly detection

Data exploration of usually late flights on a couple of days in May 2010


In [1]:
%%bigquery df
SELECT 
  PARSE_DATETIME('%Y-%m-%d-%H%M',
                 CONCAT(CAST(date AS STRING), '-', FORMAT('%04d', departure_schedule))
                 ) AS scheduled_time,
   departure_delay AS delay
FROM `bigquery-samples.airline_ontime_data.flights`
WHERE departure_airport = 'DFW' AND 
      date BETWEEN '2010-05-10' AND '2010-05-11'

In [2]:
df.head()


Out[2]:
scheduled_time delay
0 2010-05-11 18:00:00 -10.0
1 2010-05-10 18:00:00 0.0
2 2010-05-11 09:05:00 -4.0
3 2010-05-10 09:05:00 3.0
4 2010-05-11 21:00:00 40.0

In [3]:
df = df.sort_values(by='scheduled_time').set_index('scheduled_time');
df.head()


Out[3]:
delay
scheduled_time
2010-05-10 05:20:00 -5.0
2010-05-10 05:40:00 -5.0
2010-05-10 05:40:00 -2.0
2010-05-10 05:45:00 -1.0
2010-05-10 05:45:00 -6.0

In [4]:
%pip install -q seaborn


WARNING: You are using pip version 19.2.3, however version 20.1.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
Note: you may need to restart the kernel to use updated packages.

In [5]:
import seaborn as sns
import numpy as np
sns.set(rc={'figure.figsize':(20, 5)})
ax = df['delay'].plot(linewidth=2);
ax.set_ylabel('delay (minutes)');


We can train & predict on batch using Pandas using the .rolling() method


In [6]:
def is_anomaly(xarr):
    outcome = xarr[-1] # the last item
    
    # discard min & max value & current (last) item
    xarr = xarr.drop(index=[xarr.idxmin(), xarr.idxmax(), xarr.index[-1]])
    if len(xarr) < 3:
        # need at least three items to compute a valid standard deviation
        return False
    
    # Fit a model (4-sigma deviations)
    prediction = xarr.mean()
    acceptable_deviation = 4 * xarr.std()
    return np.abs(outcome - prediction) > acceptable_deviation

PERIOD = '2h'
windowed = df.copy()
windowed['anomaly'] = df['delay'].rolling(PERIOD).apply(is_anomaly, raw=False).astype('bool')
windowed.head()


Out[6]:
delay anomaly
scheduled_time
2010-05-10 05:20:00 -5.0 False
2010-05-10 05:40:00 -5.0 False
2010-05-10 05:40:00 -2.0 False
2010-05-10 05:45:00 -1.0 False
2010-05-10 05:45:00 -6.0 False

In [7]:
windowed['delay'].plot(linewidth=0.6);
anomalies = windowed[windowed['anomaly']]
ax = anomalies['delay'].plot(linewidth=2.0, style='o');
ax.set_ylabel('anamolous delays');
ax.set_ylim([-10, 300]);



In [ ]:
windowed['delay'].plot(linewidth=0.6);
anomalies = windowed[windowed['anomaly']]
ax = anomalies['delay'].plot(linewidth=2.0, style='o');
ax.set_ylabel('anamolous delays');
ax.set_ylim([-10, 300]);
ax.set_xlim(['2010-05-10 06:00:00', '2010-05-10 18:00:00']);

Stateful pipeline using Beam

In order to be able to do this in real-time, we can use Apache Beam. This will allow us to have a stateful pipeline. We demonstrate using the interactive runner (which works in notebooks)


In [8]:
df.to_csv('delays.csv', header=False)

In [9]:
!head delays.csv


2010-05-10 05:20:00,-5.0
2010-05-10 05:40:00,-5.0
2010-05-10 05:40:00,-2.0
2010-05-10 05:45:00,-1.0
2010-05-10 05:45:00,-6.0
2010-05-10 05:45:00,-2.0
2010-05-10 06:00:00,2.0
2010-05-10 06:00:00,-1.0
2010-05-10 06:00:00,-1.0
2010-05-10 06:05:00,-4.0

In [10]:
import apache_beam as beam
from apache_beam.runners.interactive import interactive_runner
import apache_beam.runners.interactive.interactive_beam as ib
import datetime

def parse_line(line):
    import datetime
    timestamp, delay = line.split(",")
    timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')
    return beam.window.TimestampedValue(
        {
            'scheduled': timestamp,
            'delay': float(delay)
        },
        timestamp.timestamp() # unix timestamp
    )

p = beam.Pipeline(interactive_runner.InteractiveRunner())
data = (p 
        | 'files' >> beam.io.ReadFromText('delays.csv')
        | 'parse' >> beam.Map(parse_line))

ib.show(data)



In [11]:
## Time window into 2 hour windows, triggered every minute
WINDOW_INTERVAL = 2 * 60 * 60. # 2 hours, in seconds
PANE_INTERVAL = 10*60 # 10 minutes, in seconds

windowed = (data
        | 'window' >> beam.WindowInto(
                beam.window.SlidingWindows(WINDOW_INTERVAL, PANE_INTERVAL),
                accumulation_mode=beam.trigger.AccumulationMode.DISCARDING))
ib.show(windowed, include_window_info=True)



In [16]:
# Compute model parameters on windowed stream.
import pandas as pd
import numpy as np
    
class ModelFn(beam.CombineFn):
    def create_accumulator(self):
        return pd.DataFrame()

    def add_input(self, df, window):
        return df.append(window, ignore_index=True)

    def merge_accumulators(self, dfs):
        return pd.concat(dfs)

    def extract_output(self, df):
        if len(df) < 1:
            return {}
        df = df.sort_values(by='scheduled').reset_index(drop=True);
        orig = df['delay'].values
        xarr = np.delete(orig, [np.argmin(orig), np.argmax(orig)])
        if len(xarr) > 2:
            # need at least three items to compute a valid standard deviation
            prediction = np.mean(xarr)
            acceptable_deviation = 4 * np.std(xarr)
        else:
            prediction = 0.0
            acceptable_deviation = 60.0
        return {
            'asof_date': df['scheduled'].iloc[len(df)-1].strftime('%Y-%m-%dT%H:%M:%S'),
            'prediction': prediction,
            'acceptable_deviation': acceptable_deviation
        }

def is_anomaly(data, model_state):
    result = data.copy()
    if not isinstance(model_state, beam.pvalue.EmptySideInput):
        result['is_anomaly'] = np.abs(data['delay'] - model_state['prediction']) > model_state['acceptable_deviation']
        if result['is_anomaly']:
            print(result)
    return result

def is_latest_slice(element, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
    # in a sliding window, find whether we are in the last pane
    secs = (window.max_timestamp().micros - timestamp.micros)/(1000*1000)
    if secs < PANE_INTERVAL:
        yield element
        
def to_csv(d):
    return ','.join([d['scheduled'].strftime('%Y-%m-%dT%H:%M:%S'), 
                     str(d['delay']), 
                     str(d['is_anomaly'])
                    ])

model_state = (windowed 
        | 'model' >> beam.transforms.CombineGlobally(ModelFn()).without_defaults())

    
ib.show(model_state, include_window_info=False)



In [13]:
anomalies = (windowed 
        | 'latest_slice' >> beam.FlatMap(is_latest_slice)
        | 'find_anomaly' >> beam.Map(is_anomaly, beam.pvalue.AsSingleton(model_state)))
ib.show(anomalies, include_window_info=False)


Run as pipeline

Move the code into a Python file. We can change the ReadFromText() and WriteToText() in that file to read/write to Pub/Sub for real-time behavior.


In [21]:
!rm -rf beam-temp* anom*json* anom*csv
%run ./find_anomalies_model.py


{'scheduled': datetime.datetime(2010, 5, 10, 7, 0), 'delay': 250.0, 'is_anomaly': True}
{'scheduled': datetime.datetime(2010, 5, 10, 9, 10), 'delay': 90.0, 'is_anomaly': True}
{'scheduled': datetime.datetime(2010, 5, 10, 9, 35), 'delay': 77.0, 'is_anomaly': True}
{'scheduled': datetime.datetime(2010, 5, 10, 11, 50), 'delay': 89.0, 'is_anomaly': True}
{'scheduled': datetime.datetime(2010, 5, 10, 12, 10), 'delay': 224.0, 'is_anomaly': True}
{'scheduled': datetime.datetime(2010, 5, 10, 16, 5), 'delay': 82.0, 'is_anomaly': True}
{'scheduled': datetime.datetime(2010, 5, 10, 17, 20), 'delay': 119.0, 'is_anomaly': True}
{'scheduled': datetime.datetime(2010, 5, 10, 19, 20), 'delay': 89.0, 'is_anomaly': True}
{'scheduled': datetime.datetime(2010, 5, 10, 19, 35), 'delay': 121.0, 'is_anomaly': True}
{'scheduled': datetime.datetime(2010, 5, 10, 20, 0), 'delay': 120.0, 'is_anomaly': True}
{'scheduled': datetime.datetime(2010, 5, 11, 6, 35), 'delay': 100.0, 'is_anomaly': True}
{'scheduled': datetime.datetime(2010, 5, 11, 9, 0), 'delay': 186.0, 'is_anomaly': True}
{'scheduled': datetime.datetime(2010, 5, 11, 10, 35), 'delay': 128.0, 'is_anomaly': True}
{'scheduled': datetime.datetime(2010, 5, 11, 12, 30), 'delay': 75.0, 'is_anomaly': True}
{'scheduled': datetime.datetime(2010, 5, 11, 13, 25), 'delay': 119.0, 'is_anomaly': True}
{'scheduled': datetime.datetime(2010, 5, 11, 15, 10), 'delay': 122.0, 'is_anomaly': True}
{'scheduled': datetime.datetime(2010, 5, 11, 17, 0), 'delay': 126.0, 'is_anomaly': True}
{'scheduled': datetime.datetime(2010, 5, 11, 19, 40), 'delay': 123.0, 'is_anomaly': True}
{'scheduled': datetime.datetime(2010, 5, 11, 20, 15), 'delay': 134.0, 'is_anomaly': True}

In [22]:
!wc -l anom*csv*
!grep True anom*csv*


1487 anomalies.csv-00000-of-00001
2010-05-10T07:00:00,250.0,True
2010-05-10T09:10:00,90.0,True
2010-05-10T09:35:00,77.0,True
2010-05-10T11:50:00,89.0,True
2010-05-10T12:10:00,224.0,True
2010-05-10T16:05:00,82.0,True
2010-05-10T17:20:00,119.0,True
2010-05-10T19:20:00,89.0,True
2010-05-10T19:35:00,121.0,True
2010-05-10T20:00:00,120.0,True
2010-05-11T06:35:00,100.0,True
2010-05-11T09:00:00,186.0,True
2010-05-11T10:35:00,128.0,True
2010-05-11T12:30:00,75.0,True
2010-05-11T13:25:00,119.0,True
2010-05-11T15:10:00,122.0,True
2010-05-11T17:00:00,126.0,True
2010-05-11T19:40:00,123.0,True
2010-05-11T20:15:00,134.0,True

Streaming SQL

In BigQuery, this will work on both historical and streaming data


In [23]:
%%bigquery
WITH data AS (
  SELECT 
    PARSE_DATETIME('%Y-%m-%d-%H%M',
                   CONCAT(CAST(date AS STRING), '-', FORMAT('%04d', arrival_schedule))
                   ) AS scheduled_arrival_time,
     arrival_delay
  FROM `bigquery-samples.airline_ontime_data.flights`
  WHERE arrival_airport = 'DFW' AND SUBSTR(date, 0, 7) = '2010-05'
),

model_state AS (
  SELECT
    scheduled_arrival_time,
    arrival_delay,
    AVG(arrival_delay) OVER (time_window) AS prediction,
    4*STDDEV(arrival_delay) OVER (time_window) AS acceptable_deviation
  FROM data
  WINDOW time_window AS 
    (ORDER BY UNIX_SECONDS(TIMESTAMP(scheduled_arrival_time))
     RANGE BETWEEN 7200 PRECEDING AND 1 PRECEDING)
)

SELECT 
  *,
  (ABS(arrival_delay - prediction) > acceptable_deviation) AS is_anomaly 
FROM model_state
LIMIT 10


Out[23]:
scheduled_arrival_time arrival_delay prediction acceptable_deviation is_anomaly
0 2010-05-01 00:10:00 -3.0 NaN NaN None
1 2010-05-01 05:00:00 2.0 NaN NaN None
2 2010-05-01 05:00:00 -31.0 NaN NaN None
3 2010-05-01 05:05:00 -6.0 -14.500000 93.338095 False
4 2010-05-01 05:35:00 2.0 -11.666667 68.857340 False
5 2010-05-01 05:45:00 -18.0 -8.250000 62.513998 False
6 2010-05-01 06:00:00 -13.0 -10.200000 56.878819 False
7 2010-05-01 06:35:00 -1.0 -10.666667 51.079024 False
8 2010-05-01 06:45:00 -9.0 -9.285714 48.865218 False
9 2010-05-01 07:00:00 54.0 -9.250000 45.242205 True

Copyright 2020 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License