In [1]:
%matplotlib inline
%pylab inline


Populating the interactive namespace from numpy and matplotlib

In [2]:
import warnings
warnings.filterwarnings('ignore')

In [3]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn import preprocessing
import seaborn as sns
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
progress = ProgressBar().register()

In [4]:
IMG_DIR = '../../analysis'
DPI=120

In [5]:
# data_types = {'CRSElapsedTime': int, 'CRSDepTime': int, 'Year': int, 'Month': int, 'DayOfWeek': int, 'DayofMonth': int}
data_types = {'CRSDepTime': int, 'Year': int, 'Month': int, 'DayOfWeek': int, 'DayofMonth': int}


# http://dask.pydata.org/en/latest/dataframe-overview.html
# %time df = dd.read_csv('../../data/raw/*.csv', encoding='iso-8859-1', dtype=data_types, assume_missing=True, blocksize=1024 * 1024)
# %time df = dd.read_csv('../../data/raw/*.csv', encoding='iso-8859-1', dtype=data_types, assume_missing=True)
%time df = dd.read_csv('../../data/raw/2002.csv', encoding='iso-8859-1', dtype=data_types, assume_missing=True)


CPU times: user 78.1 ms, sys: 250 ms, total: 328 ms
Wall time: 359 ms

In [6]:
# %time df.head()

In [7]:
%time df = df.fillna(-1)


CPU times: user 15.6 ms, sys: 15.6 ms, total: 31.2 ms
Wall time: 27.2 ms

In [8]:
# Takes a while
# %time df.count().compute()

In [9]:
# Takes a while, but should be doable
# %time unique_origins = df['Origin'].unique().compute()

In [10]:
# once you compute you get a real pandas series
# type(unique_origins)

In [11]:
# unique_origins

In [12]:
# 2400 is not a valid time
df['CRSDepTime'] = df.apply(lambda row: 2359 if row['CRSDepTime'] == 2400 else row['CRSDepTime'], axis='columns')

In [13]:
# df.apply?

In [14]:
head = df.head()


[########################################] | 100% Completed | 23.1s

In [15]:
def create_timestamp (row):
    return pd.Timestamp('%s-%s-%s;%04d'%(row['Year'], row['Month'], row['DayofMonth'], row['CRSDepTime']))

In [16]:
# type(head)

In [17]:
# head

In [18]:
# create a sample for dask to figure out the data types
transformation_sample = head.apply(create_timestamp, axis='columns')

In [19]:
type(transformation_sample)


Out[19]:
pandas.core.series.Series

In [20]:
transformation_sample[0]


Out[20]:
Timestamp('2002-01-13 22:35:00')

In [21]:
# meta_information = {'@timestamp': pd.Timestamp}
meta_information = transformation_sample

df['@timestamp'] = df.apply(lambda row: pd.Timestamp('%s-%s-%s;%04d'%(row['Year'], row['Month'], row['DayofMonth'], row['CRSDepTime'])),
                            axis='columns',
                           meta=meta_information)

In [22]:
# df.head()

In [23]:
from pyelasticsearch import ElasticSearch, bulk_chunks

In [24]:
ES_HOST = 'http://localhost:9200/'
INDEX_NAME = "expo2009"
DOC_TYPE = "flight"
es = ElasticSearch(ES_HOST)

In [29]:
# https://pyelasticsearch.readthedocs.io/en/latest/api/#pyelasticsearch.ElasticSearch.bulk
def documents(records):
    for flight in records:
        yield es.index_op(flight)
        
def chunk_import(records):        
    # bulk_chunks() breaks your documents into smaller requests for speed:
    for chunk in bulk_chunks(documents(records=records),
                             docs_per_chunk=50000,
                             bytes_per_chunk=10000000):
        # We specify a default index and doc type here so we don't
        # have to repeat them in every operation:
        es.bulk(chunk, doc_type=DOC_TYPE, index=INDEX_NAME)

In [26]:
# should be 2 initially or 0, depending on if kibana hasrun already
es.count('*')['count']


Out[26]:
12140825

In [27]:
df.npartitions


Out[27]:
9

In [28]:
begin_partition = 0
end_partition = df.npartitions

# begin_partition = 23
# end_partition = 25

In [31]:
from time import time

In [32]:
for partition_nr in range(df.npartitions):
    if partition_nr >= end_partition:
        break
    if partition_nr < begin_partition:
        continue
    t0 = time()
    print ("Importing partition %d"%(partition_nr))
    partition = df.get_partition(partition_nr)
    records = partition.compute().to_dict(orient='records')
    print ("Importing into ES: %d"%(len(records)))    
    chunk_import(records)
    cnt = es.count('*')['count']
    print ("Datasets in ES: %d"%(cnt))
    print("Time loading: %s"%(round(time() - t0, 3)))


Importing partition 0
[########################################] | 100% Completed |  2min 39.9s
Importing into ES: 637003
Datasets in ES: 12285083
Time loading: 358.014
[                                        ] | 0% Completed |  0.0sImporting partition 1
[########################################] | 100% Completed |  2min 42.5s
Importing into ES: 636510
Datasets in ES: 12921153
Time loading: 363.954
[                                        ] | 0% Completed |  0.0s
[########################################] | 100% Completed |  2min 35.7s
Importing into ES: 640252
Datasets in ES: 13559346
[                                        ] | 0% Completed |  0.0s
Importing partition 3
[########################################] | 100% Completed |  2min 30.4s
Importing into ES: 636826
Datasets in ES: 14200553
[                                        ] | 0% Completed |  0.0s
Importing partition 4
[########################################] | 100% Completed |  2min 38.7s
Importing into ES: 639237
Datasets in ES: 14839229
Time loading: 368.125
[                                        ] | 0% Completed |  0.0s
[########################################] | 100% Completed |  2min 36.3s
Importing into ES: 636553
Datasets in ES: 15474202
[                                        ] | 0% Completed |  0.0s
Importing partition 6
[########################################] | 100% Completed |  2min 34.7s
Importing into ES: 631200
Datasets in ES: 16106026
[                                        ] | 0% Completed |  0.0s
Importing partition 7
[########################################] | 100% Completed |  2min 55.5s
Importing into ES: 632999
Datasets in ES: 16738214
Time loading: 362.899
[                                        ] | 0% Completed |  0.0sImporting partition 8
[########################################] | 100% Completed | 50.0s
Importing into ES: 180779
Datasets in ES: 16917753
Time loading: 106.667

In [ ]: