In [1]:
%matplotlib inline
%pylab inline


Populating the interactive namespace from numpy and matplotlib

In [2]:
import warnings
warnings.filterwarnings('ignore')

In [3]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn import preprocessing
import seaborn as sns
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
progress = ProgressBar().register()

In [4]:
IMG_DIR = '../../analysis'
DPI=120

In [20]:
mkdir data
cd data

In [ ]:
# http://stat-computing.org/dataexpo/2009/the-data.html
# !curl -O http://stat-computing.org/dataexpo/2009/2000.csv.bz2
# !curl -O http://stat-computing.org/dataexpo/2009/2001.csv.bz2
# !curl -O http://stat-computing.org/dataexpo/2009/2002.csv.bz2
# !ls -lh

In [26]:
# !bzip2 -d 2000.csv.bz2
# !bzip2 -d 2001.csv.bz2
# !bzip2 -d 2002.csv.bz2
# !ls -lh

In [14]:
# data_types = {'CRSElapsedTime': int, 'CRSDepTime': int, 'Year': int, 'Month': int, 'DayOfWeek': int, 'DayofMonth': int}
data_types = {'CRSDepTime': int, 'Year': int, 'Month': int, 'DayOfWeek': int, 'DayofMonth': int}


# http://dask.pydata.org/en/latest/dataframe-overview.html
%time df = dd.read_csv('./data/200*.csv', encoding='iso-8859-1', dtype=data_types, assume_missing=True)

# for live feed
# %time df = dd.read_csv('./data/2003.csv', encoding='iso-8859-1', dtype=data_types, assume_missing=True)


CPU times: user 78.1 ms, sys: 46.9 ms, total: 125 ms
Wall time: 181 ms

In [10]:
%time len(df)


[########################################] | 100% Completed | 49.6s
CPU times: user 1min 18s, sys: 32.9 s, total: 1min 51s
Wall time: 49.6 s
Out[10]:
16922186

In [15]:
# just 1% of data
df = df.sample(.01)

In [16]:
%time len(df)


[########################################] | 100% Completed | 46.3s
CPU times: user 1min 22s, sys: 38.5 s, total: 2min
Wall time: 46.3 s
Out[16]:
169219

In [17]:
%time df.head()


[########################################] | 100% Completed |  3.0s
CPU times: user 2.28 s, sys: 656 ms, total: 2.94 s
Wall time: 2.99 s
Out[17]:
Year Month DayofMonth DayOfWeek DepTime CRSDepTime ArrTime CRSArrTime UniqueCarrier FlightNum ... TaxiIn TaxiOut Cancelled CancellationCode Diverted CarrierDelay WeatherDelay NASDelay SecurityDelay LateAircraftDelay
167644 2000 1 23 7 1029.0 1035 1144.0 1137.0 NW 743.0 ... 9.0 42.0 0.0 NaN 0.0 NaN NaN NaN NaN NaN
229822 2000 1 11 2 1810.0 1815 2037.0 2039.0 US 1484.0 ... 8.0 16.0 0.0 NaN 0.0 NaN NaN NaN NaN NaN
477385 2000 2 14 1 600.0 600 757.0 754.0 HP 2470.0 ... 4.0 20.0 0.0 NaN 0.0 NaN NaN NaN NaN NaN
84887 2000 1 4 2 952.0 952 1117.0 1114.0 UA 2504.0 ... 3.0 18.0 0.0 NaN 0.0 NaN NaN NaN NaN NaN
528202 2000 2 20 7 1755.0 1715 2224.0 2032.0 CO 157.0 ... 5.0 81.0 0.0 NaN 0.0 NaN NaN NaN NaN NaN

5 rows × 29 columns

Cleaning and fixing data


In [18]:
%time df = df.fillna(-1)


CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 29.7 ms

In [19]:
# Takes a while
# %time df.count().compute()

In [20]:
# Takes a while, but should be doable
# %time unique_origins = df['Origin'].unique().compute()

In [21]:
# once you compute you get a real pandas series
# type(unique_origins)

In [22]:
# unique_origins

In [23]:
# 2400 is not a valid time
df['CRSDepTime'] = df.apply(lambda row: 2359 if row['CRSDepTime'] == 2400 else row['CRSDepTime'], axis='columns')

In [24]:
# df.apply?

In [25]:
head = df.head()


[########################################] | 100% Completed |  4.1s

In [26]:
def create_timestamp (row):
    return pd.Timestamp('%s-%s-%s;%04d'%(row['Year'], row['Month'], row['DayofMonth'], row['CRSDepTime']))

In [27]:
# type(head)

In [28]:
# head

In [29]:
# create a sample for dask to figure out the data types
transformation_sample = head.apply(create_timestamp, axis='columns')

In [32]:
type(transformation_sample)


Out[32]:
pandas.core.series.Series

In [33]:
transformation_sample


Out[33]:
167644   2000-01-23 10:35:00
229822   2000-01-11 18:15:00
477385   2000-02-14 06:00:00
84887    2000-01-04 09:52:00
528202   2000-02-20 17:15:00
dtype: datetime64[ns]

In [34]:
# meta_information = {'@timestamp': pd.Timestamp}
meta_information = transformation_sample

df['@timestamp'] = df.apply(lambda row: pd.Timestamp('%s-%s-%s;%04d'%(row['Year'], row['Month'], row['DayofMonth'], row['CRSDepTime'])),
                            axis='columns',
                           meta=meta_information)

In [35]:
# df.head()

In [36]:
from pyelasticsearch import ElasticSearch, bulk_chunks

In [37]:
ES_HOST = 'http://localhost:9200/'
INDEX_NAME = "expo2009"
DOC_TYPE = "flight"
es = ElasticSearch(ES_HOST)

In [38]:
# https://pyelasticsearch.readthedocs.io/en/latest/api/#pyelasticsearch.ElasticSearch.bulk
def documents(records):
    for flight in records:
        yield es.index_op(flight)
        
def chunk_import(records):        
    # bulk_chunks() breaks your documents into smaller requests for speed:
    for chunk in bulk_chunks(documents(records=records),
                             docs_per_chunk=50000,
                             bytes_per_chunk=10000000):
        # We specify a default index and doc type here so we don't
        # have to repeat them in every operation:
        es.bulk(chunk, doc_type=DOC_TYPE, index=INDEX_NAME)

In [39]:
# should be 2 initially or 0, depending on if kibana hasrun already
es.count('*')['count']


Out[39]:
2

In [40]:
df.npartitions


Out[40]:
28

In [41]:
begin_partition = 0
end_partition = df.npartitions

# begin_partition = 23
# end_partition = 25

In [42]:
for partition_nr in range(df.npartitions):
    if partition_nr >= end_partition:
        break
    if partition_nr < begin_partition:
        continue
    print ("Importing partition %d"%(partition_nr))
    partition = df.get_partition(partition_nr)
    records = partition.compute().to_dict(orient='records')
    print ("Importing into ES: %d"%(len(records)))    
    chunk_import(records)
    cnt = es.count('*')['count']
    print ("Datasets in ES: %d"%(cnt))


Importing partition 0
[########################################] | 100% Completed |  5.8s
Importing into ES: 6402
[                                        ] | 0% Completed |  0.0s
Importing partition 1
[########################################] | 100% Completed |  5.7s
Importing into ES: 6389
Datasets in ES: 11931
Importing partition 2
[########################################] | 100% Completed |  4.7s
Importing into ES: 6400
Datasets in ES: 15856
[                                        ] | 0% Completed |  0.0sImporting partition 3
[########################################] | 100% Completed |  4.9s
Importing into ES: 6399
Datasets in ES: 24466
Importing partition 4
[########################################] | 100% Completed |  6.4s
Importing into ES: 6390
Datasets in ES: 27716
Importing partition 5
[########################################] | 100% Completed |  7.4s
Importing into ES: 6400
Datasets in ES: 34758
[                                        ] | 0% Completed |  0.0sImporting partition 6
[########################################] | 100% Completed |  5.0s
Importing into ES: 6368
Datasets in ES: 41542
[                                        ] | 0% Completed |  0.0sImporting partition 7
[########################################] | 100% Completed |  5.1s
Importing into ES: 6322
Datasets in ES: 48952
Importing partition 8
[########################################] | 100% Completed |  5.0s
Importing into ES: 5759
Datasets in ES: 54822
Importing partition 9
[########################################] | 100% Completed |  4.9s
Importing into ES: 6385
Datasets in ES: 61849
[                                        ] | 0% Completed |  0.0sImporting partition 10
[########################################] | 100% Completed |  4.8s
Importing into ES: 6372
Datasets in ES: 64273
Importing partition 11
[########################################] | 100% Completed |  5.1s
Importing into ES: 6375
Datasets in ES: 71291
[                                        ] | 0% Completed |  0.0s
[########################################] | 100% Completed |  6.3s
Importing into ES: 6358
Datasets in ES: 77252
[                                        ] | 0% Completed |  0.0sImporting partition 13
[########################################] | 100% Completed |  4.9s
Importing into ES: 6379
Datasets in ES: 87819
Importing partition 14
[########################################] | 100% Completed |  4.5s
Importing into ES: 6375
Datasets in ES: 94465
Importing partition 15
[########################################] | 100% Completed |  5.2s
Importing into ES: 6386
Datasets in ES: 97385
[                                        ] | 0% Completed |  0.0s
[########################################] | 100% Completed |  4.4s
Importing into ES: 6343
Datasets in ES: 105156
Importing partition 17
[########################################] | 100% Completed |  6.3s
Importing into ES: 6310
Datasets in ES: 109966
Importing partition 18
[########################################] | 100% Completed |  2.0s
Importing into ES: 2393
Datasets in ES: 114114
Importing partition 19
[########################################] | 100% Completed |  5.1s
Importing into ES: 6370
Datasets in ES: 119475
[                                        ] | 0% Completed |  0.0s
[########################################] | 100% Completed |  4.5s
Importing into ES: 6365
Datasets in ES: 128372
[                                        ] | 0% Completed |  0.0s
[########################################] | 100% Completed |  5.4s
Importing into ES: 6403
Datasets in ES: 131099
Importing partition 22
[########################################] | 100% Completed |  4.5s
Importing into ES: 6368
Datasets in ES: 138170
Importing partition 23
[########################################] | 100% Completed |  4.1s
Importing into ES: 6392
Datasets in ES: 147700
Importing partition 24
[########################################] | 100% Completed |  4.4s
Importing into ES: 6366
Datasets in ES: 152583
[                                        ] | 0% Completed |  0.0sImporting partition 25
[########################################] | 100% Completed |  5.7s
Importing into ES: 6312
Datasets in ES: 160501
[                                        ] | 0% Completed |  0.0sImporting partition 26
[########################################] | 100% Completed |  6.1s
Importing into ES: 6330
Datasets in ES: 166212
Importing partition 27
[########################################] | 100% Completed |  1.4s
Importing into ES: 1808
Datasets in ES: 167414

In [43]:
!mkdir feed

In [45]:
!curl -O http://stat-computing.org/dataexpo/2009/2003.csv.bz2


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 90.9M  100 90.9M    0     0  4432k      0  0:00:21  0:00:21 --:--:-- 5598k

In [46]:
!bzip2 -d 2003.csv.bz2

In [49]:
!mv 2003.csv feed

In [50]:
!ls -l feed


total 612056
-rwxrwxrwx 1 root root 626745242 Feb 10 17:07 2003.csv

In [ ]:


In [ ]:
# for live reload of data during demo
# execute this and repeat steps from dd.read_csv in Cell 8
cd ..
mkdir feed
cd feed
!curl -O http://stat-computing.org/dataexpo/2009/2003.csv.bz2
!bzip2 -d 2003.csv.bz2