In [1]:
%%bigquery df
SELECT
PARSE_DATETIME('%Y-%m-%d-%H%M',
CONCAT(CAST(date AS STRING), '-', FORMAT('%04d', departure_schedule))
) AS scheduled_time,
departure_delay AS delay
FROM `bigquery-samples.airline_ontime_data.flights`
WHERE departure_airport = 'DFW' AND
date BETWEEN '2010-05-10' AND '2010-05-11'
In [2]:
df.head()
Out[2]:
In [3]:
df = df.sort_values(by='scheduled_time').set_index('scheduled_time');
df.head()
Out[3]:
In [4]:
%pip install -q seaborn
In [5]:
import seaborn as sns
import numpy as np
sns.set(rc={'figure.figsize':(20, 5)})
ax = df['delay'].plot(linewidth=2);
ax.set_ylabel('delay (minutes)');
We can train & predict on batch using Pandas using the .rolling() method
In [6]:
def is_anomaly(xarr):
outcome = xarr[-1] # the last item
# discard min & max value & current (last) item
xarr = xarr.drop(index=[xarr.idxmin(), xarr.idxmax(), xarr.index[-1]])
if len(xarr) < 3:
# need at least three items to compute a valid standard deviation
return False
# Fit a model (4-sigma deviations)
prediction = xarr.mean()
acceptable_deviation = 4 * xarr.std()
return np.abs(outcome - prediction) > acceptable_deviation
PERIOD = '2h'
windowed = df.copy()
windowed['anomaly'] = df['delay'].rolling(PERIOD).apply(is_anomaly, raw=False).astype('bool')
windowed.head()
Out[6]:
In [7]:
windowed['delay'].plot(linewidth=0.6);
anomalies = windowed[windowed['anomaly']]
ax = anomalies['delay'].plot(linewidth=2.0, style='o');
ax.set_ylabel('anamolous delays');
ax.set_ylim([-10, 300]);
In [ ]:
windowed['delay'].plot(linewidth=0.6);
anomalies = windowed[windowed['anomaly']]
ax = anomalies['delay'].plot(linewidth=2.0, style='o');
ax.set_ylabel('anamolous delays');
ax.set_ylim([-10, 300]);
ax.set_xlim(['2010-05-10 06:00:00', '2010-05-10 18:00:00']);
In [8]:
df.to_csv('delays.csv', header=False)
In [9]:
!head delays.csv
In [10]:
import apache_beam as beam
from apache_beam.runners.interactive import interactive_runner
import apache_beam.runners.interactive.interactive_beam as ib
import datetime
def parse_line(line):
import datetime
timestamp, delay = line.split(",")
timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')
return beam.window.TimestampedValue(
{
'scheduled': timestamp,
'delay': float(delay)
},
timestamp.timestamp() # unix timestamp
)
p = beam.Pipeline(interactive_runner.InteractiveRunner())
data = (p
| 'files' >> beam.io.ReadFromText('delays.csv')
| 'parse' >> beam.Map(parse_line))
ib.show(data)
In [11]:
## Time window into 2 hour windows, triggered every minute
WINDOW_INTERVAL = 2 * 60 * 60. # 2 hours, in seconds
PANE_INTERVAL = 10*60 # 10 minutes, in seconds
windowed = (data
| 'window' >> beam.WindowInto(
beam.window.SlidingWindows(WINDOW_INTERVAL, PANE_INTERVAL),
accumulation_mode=beam.trigger.AccumulationMode.DISCARDING))
ib.show(windowed, include_window_info=True)
In [16]:
# Compute model parameters on windowed stream.
import pandas as pd
import numpy as np
class ModelFn(beam.CombineFn):
def create_accumulator(self):
return pd.DataFrame()
def add_input(self, df, window):
return df.append(window, ignore_index=True)
def merge_accumulators(self, dfs):
return pd.concat(dfs)
def extract_output(self, df):
if len(df) < 1:
return {}
df = df.sort_values(by='scheduled').reset_index(drop=True);
orig = df['delay'].values
xarr = np.delete(orig, [np.argmin(orig), np.argmax(orig)])
if len(xarr) > 2:
# need at least three items to compute a valid standard deviation
prediction = np.mean(xarr)
acceptable_deviation = 4 * np.std(xarr)
else:
prediction = 0.0
acceptable_deviation = 60.0
return {
'asof_date': df['scheduled'].iloc[len(df)-1].strftime('%Y-%m-%dT%H:%M:%S'),
'prediction': prediction,
'acceptable_deviation': acceptable_deviation
}
def is_anomaly(data, model_state):
result = data.copy()
if not isinstance(model_state, beam.pvalue.EmptySideInput):
result['is_anomaly'] = np.abs(data['delay'] - model_state['prediction']) > model_state['acceptable_deviation']
if result['is_anomaly']:
print(result)
return result
def is_latest_slice(element, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
# in a sliding window, find whether we are in the last pane
secs = (window.max_timestamp().micros - timestamp.micros)/(1000*1000)
if secs < PANE_INTERVAL:
yield element
def to_csv(d):
return ','.join([d['scheduled'].strftime('%Y-%m-%dT%H:%M:%S'),
str(d['delay']),
str(d['is_anomaly'])
])
model_state = (windowed
| 'model' >> beam.transforms.CombineGlobally(ModelFn()).without_defaults())
ib.show(model_state, include_window_info=False)
In [13]:
anomalies = (windowed
| 'latest_slice' >> beam.FlatMap(is_latest_slice)
| 'find_anomaly' >> beam.Map(is_anomaly, beam.pvalue.AsSingleton(model_state)))
ib.show(anomalies, include_window_info=False)
In [21]:
!rm -rf beam-temp* anom*json* anom*csv
%run ./find_anomalies_model.py
In [22]:
!wc -l anom*csv*
!grep True anom*csv*
In [23]:
%%bigquery
WITH data AS (
SELECT
PARSE_DATETIME('%Y-%m-%d-%H%M',
CONCAT(CAST(date AS STRING), '-', FORMAT('%04d', arrival_schedule))
) AS scheduled_arrival_time,
arrival_delay
FROM `bigquery-samples.airline_ontime_data.flights`
WHERE arrival_airport = 'DFW' AND SUBSTR(date, 0, 7) = '2010-05'
),
model_state AS (
SELECT
scheduled_arrival_time,
arrival_delay,
AVG(arrival_delay) OVER (time_window) AS prediction,
4*STDDEV(arrival_delay) OVER (time_window) AS acceptable_deviation
FROM data
WINDOW time_window AS
(ORDER BY UNIX_SECONDS(TIMESTAMP(scheduled_arrival_time))
RANGE BETWEEN 7200 PRECEDING AND 1 PRECEDING)
)
SELECT
*,
(ABS(arrival_delay - prediction) > acceptable_deviation) AS is_anomaly
FROM model_state
LIMIT 10
Out[23]:
Copyright 2020 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License