%matplotlib inline
import warnings

import numpy as np
import matplotlib.pyplot as plt
from sklearn import preprocessing
import seaborn as sns
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
progress = ProgressBar().register()

IMG_DIR = '../../analysis'

mkdir data
cd data

# !curl -O
# !curl -O
# !curl -O
# !ls -lh

# !bzip2 -d 2000.csv.bz2
# !bzip2 -d 2001.csv.bz2
# !bzip2 -d 2002.csv.bz2
# !ls -lh

# data_types = {'CRSElapsedTime': int, 'CRSDepTime': int, 'Year': int, 'Month': int, 'DayOfWeek': int, 'DayofMonth': int}
data_types = {'CRSDepTime': int, 'Year': int, 'Month': int, 'DayOfWeek': int, 'DayofMonth': int}

%time df = dd.read_csv('./data/200*.csv', encoding='iso-8859-1', dtype=data_types, assume_missing=True)

# for live feed
# %time df = dd.read_csv('./data/2003.csv', encoding='iso-8859-1', dtype=data_types, assume_missing=True)

%time len(df)

# just 1% of data
df = df.sample(.01)

%time len(df)

%time df.head()

Year Month DayofMonth DayOfWeek DepTime CRSDepTime ArrTime CRSArrTime UniqueCarrier FlightNum ... TaxiIn TaxiOut Cancelled CancellationCode Diverted CarrierDelay WeatherDelay NASDelay SecurityDelay LateAircraftDelay
167644 2000 1 23 7 1029.0 1035 1144.0 1137.0 NW 743.0 ... 9.0 42.0 0.0 NaN 0.0 NaN NaN NaN NaN NaN
229822 2000 1 11 2 1810.0 1815 2037.0 2039.0 US 1484.0 ... 8.0 16.0 0.0 NaN 0.0 NaN NaN NaN NaN NaN
477385 2000 2 14 1 600.0 600 757.0 754.0 HP 2470.0 ... 4.0 20.0 0.0 NaN 0.0 NaN NaN NaN NaN NaN
84887 2000 1 4 2 952.0 952 1117.0 1114.0 UA 2504.0 ... 3.0 18.0 0.0 NaN 0.0 NaN NaN NaN NaN NaN
528202 2000 2 20 7 1755.0 1715 2224.0 2032.0 CO 157.0 ... 5.0 81.0 0.0 NaN 0.0 NaN NaN NaN NaN NaN

5 rows × 29 columns

Cleaning and fixing data

%time df = df.fillna(-1)

# Takes a while
# %time df.count().compute()

# Takes a while, but should be doable
# %time unique_origins = df['Origin'].unique().compute()

# once you compute you get a real pandas series
# type(unique_origins)

# unique_origins

# 2400 is not a valid time
df['CRSDepTime'] = df.apply(lambda row: 2359 if row['CRSDepTime'] == 2400 else row['CRSDepTime'], axis='columns')

# df.apply?

head = df.head()

[########################################] | 100% Completed |  4.1s

def create_timestamp (row):
    return pd.Timestamp('%s-%s-%s;%04d'%(row['Year'], row['Month'], row['DayofMonth'], row['CRSDepTime']))

# type(head)

# head

# create a sample for dask to figure out the data types
transformation_sample = head.apply(create_timestamp, axis='columns')

167644   2000-01-23 10:35:00
229822   2000-01-11 18:15:00
477385   2000-02-14 06:00:00
84887    2000-01-04 09:52:00
528202   2000-02-20 17:15:00
dtype: datetime64[ns]

# meta_information = {'@timestamp': pd.Timestamp}
meta_information = transformation_sample

df['@timestamp'] = df.apply(lambda row: pd.Timestamp('%s-%s-%s;%04d'%(row['Year'], row['Month'], row['DayofMonth'], row['CRSDepTime'])),

# df.head()

from pyelasticsearch import ElasticSearch, bulk_chunks

ES_HOST = 'http://localhost:9200/'
INDEX_NAME = "expo2009"
DOC_TYPE = "flight"
es = ElasticSearch(ES_HOST)

def documents(records):
    for flight in records:
        yield es.index_op(flight)
def chunk_import(records):        
    # bulk_chunks() breaks your documents into smaller requests for speed:
    for chunk in bulk_chunks(documents(records=records),
        # We specify a default index and doc type here so we don't
        # have to repeat them in every operation:
        es.bulk(chunk, doc_type=DOC_TYPE, index=INDEX_NAME)

# should be 2 initially or 0, depending on if kibana hasrun already


In [40]:


begin_partition = 0
end_partition = df.npartitions

# begin_partition = 23
# end_partition = 25

for partition_nr in range(df.npartitions):
    if partition_nr >= end_partition:
    if partition_nr < begin_partition:
    print ("Importing partition %d"%(partition_nr))
    partition = df.get_partition(partition_nr)
    records = partition.compute().to_dict(orient='records')
    print ("Importing into ES: %d"%(len(records)))    
    cnt = es.count('*')['count']
    print ("Datasets in ES: %d"%(cnt))

Importing partition 0
Importing into ES: 6402
Importing partition 1
Datasets in ES: 11931
Datasets in ES: 11931
Importing partition 2
Datasets in ES: 15856
Datasets in ES: 15856
Importing partition 3
Importing into ES: 6399
Datasets in ES: 24466
Datasets in ES: 24466
Datasets in ES: 24466
Importing partition 4
Datasets in ES: 27716
Datasets in ES: 27716
Importing partition 5
Datasets in ES: 34758
Datasets in ES: 34758
Importing partition 6
Importing into ES: 6368
Datasets in ES: 41542
Datasets in ES: 41542
Datasets in ES: 41542
Importing partition 7
Importing into ES: 6322
Datasets in ES: 48952
Datasets in ES: 48952
Datasets in ES: 48952
Importing partition 8
Datasets in ES: 54822
Datasets in ES: 54822
Importing partition 9
Datasets in ES: 61849
Datasets in ES: 61849
Importing partition 10
Importing into ES: 6372
Datasets in ES: 64273
Datasets in ES: 64273
Datasets in ES: 64273
Importing partition 11
Datasets in ES: 71291
Datasets in ES: 71291
Importing into ES: 6358
Datasets in ES: 77252
Importing partition 13
Importing into ES: 6379
Datasets in ES: 87819
Datasets in ES: 87819
Datasets in ES: 87819
Importing partition 14
Datasets in ES: 94465
Datasets in ES: 94465
Importing partition 15
Datasets in ES: 97385
Datasets in ES: 97385
Importing into ES: 6343
Datasets in ES: 105156
Importing partition 17
Datasets in ES: 109966
Datasets in ES: 109966
Importing partition 18
Datasets in ES: 114114
Datasets in ES: 114114
Importing partition 19
Datasets in ES: 119475
Datasets in ES: 119475
Importing into ES: 6365
Datasets in ES: 128372
[                                        ] | 0% Completed |  0.0s
Importing into ES: 6403
Datasets in ES: 131099
Importing partition 22
Datasets in ES: 138170
Datasets in ES: 138170
Importing partition 23
Datasets in ES: 147700
Datasets in ES: 147700
Importing partition 24
Datasets in ES: 152583
Datasets in ES: 152583
Importing partition 25
Importing into ES: 6312
Datasets in ES: 160501
Datasets in ES: 160501
Datasets in ES: 160501
Importing partition 26
Importing into ES: 6330
Datasets in ES: 166212
Datasets in ES: 166212
Datasets in ES: 166212
Importing partition 27
Importing into ES: 1808
Datasets in ES: 167414

!mkdir feed

!curl -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 90.9M  100 90.9M    0     0  4432k      0  0:00:21  0:00:21 --:--:-- 5598k

!bzip2 -d 2003.csv.bz2

!mv 2003.csv feed

In [50]:
!ls -l feed

total 612056
-rwxrwxrwx 1 root root 626745242 Feb 10 17:07 2003.csv

In [ ]:
# for live reload of data during demo
# execute this and repeat steps from dd.read_csv in Cell 8
cd ..
mkdir feed
cd feed
!curl -O
!bzip2 -d 2003.csv.bz2