In [1]:
%matplotlib inline
%pylab inline
In [2]:
import warnings
warnings.filterwarnings('ignore')
In [3]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn import preprocessing
import seaborn as sns
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
progress = ProgressBar().register()
In [4]:
IMG_DIR = '../../analysis'
DPI=120
In [20]:
mkdir data
cd data
In [ ]:
# http://stat-computing.org/dataexpo/2009/the-data.html
# !curl -O http://stat-computing.org/dataexpo/2009/2000.csv.bz2
# !curl -O http://stat-computing.org/dataexpo/2009/2001.csv.bz2
# !curl -O http://stat-computing.org/dataexpo/2009/2002.csv.bz2
# !ls -lh
In [26]:
# !bzip2 -d 2000.csv.bz2
# !bzip2 -d 2001.csv.bz2
# !bzip2 -d 2002.csv.bz2
# !ls -lh
In [14]:
# data_types = {'CRSElapsedTime': int, 'CRSDepTime': int, 'Year': int, 'Month': int, 'DayOfWeek': int, 'DayofMonth': int}
data_types = {'CRSDepTime': int, 'Year': int, 'Month': int, 'DayOfWeek': int, 'DayofMonth': int}
# http://dask.pydata.org/en/latest/dataframe-overview.html
%time df = dd.read_csv('./data/200*.csv', encoding='iso-8859-1', dtype=data_types, assume_missing=True)
# for live feed
# %time df = dd.read_csv('./data/2003.csv', encoding='iso-8859-1', dtype=data_types, assume_missing=True)
In [10]:
%time len(df)
Out[10]:
In [15]:
# just 1% of data
df = df.sample(.01)
In [16]:
%time len(df)
Out[16]:
In [17]:
%time df.head()
Out[17]:
In [18]:
%time df = df.fillna(-1)
In [19]:
# Takes a while
# %time df.count().compute()
In [20]:
# Takes a while, but should be doable
# %time unique_origins = df['Origin'].unique().compute()
In [21]:
# once you compute you get a real pandas series
# type(unique_origins)
In [22]:
# unique_origins
In [23]:
# 2400 is not a valid time
df['CRSDepTime'] = df.apply(lambda row: 2359 if row['CRSDepTime'] == 2400 else row['CRSDepTime'], axis='columns')
In [24]:
# df.apply?
In [25]:
head = df.head()
In [26]:
def create_timestamp (row):
return pd.Timestamp('%s-%s-%s;%04d'%(row['Year'], row['Month'], row['DayofMonth'], row['CRSDepTime']))
In [27]:
# type(head)
In [28]:
# head
In [29]:
# create a sample for dask to figure out the data types
transformation_sample = head.apply(create_timestamp, axis='columns')
In [32]:
type(transformation_sample)
Out[32]:
In [33]:
transformation_sample
Out[33]:
In [34]:
# meta_information = {'@timestamp': pd.Timestamp}
meta_information = transformation_sample
df['@timestamp'] = df.apply(lambda row: pd.Timestamp('%s-%s-%s;%04d'%(row['Year'], row['Month'], row['DayofMonth'], row['CRSDepTime'])),
axis='columns',
meta=meta_information)
In [35]:
# df.head()
In [36]:
from pyelasticsearch import ElasticSearch, bulk_chunks
In [37]:
ES_HOST = 'http://localhost:9200/'
INDEX_NAME = "expo2009"
DOC_TYPE = "flight"
es = ElasticSearch(ES_HOST)
In [38]:
# https://pyelasticsearch.readthedocs.io/en/latest/api/#pyelasticsearch.ElasticSearch.bulk
def documents(records):
for flight in records:
yield es.index_op(flight)
def chunk_import(records):
# bulk_chunks() breaks your documents into smaller requests for speed:
for chunk in bulk_chunks(documents(records=records),
docs_per_chunk=50000,
bytes_per_chunk=10000000):
# We specify a default index and doc type here so we don't
# have to repeat them in every operation:
es.bulk(chunk, doc_type=DOC_TYPE, index=INDEX_NAME)
In [39]:
# should be 2 initially or 0, depending on if kibana hasrun already
es.count('*')['count']
Out[39]:
In [40]:
df.npartitions
Out[40]:
In [41]:
begin_partition = 0
end_partition = df.npartitions
# begin_partition = 23
# end_partition = 25
In [42]:
for partition_nr in range(df.npartitions):
if partition_nr >= end_partition:
break
if partition_nr < begin_partition:
continue
print ("Importing partition %d"%(partition_nr))
partition = df.get_partition(partition_nr)
records = partition.compute().to_dict(orient='records')
print ("Importing into ES: %d"%(len(records)))
chunk_import(records)
cnt = es.count('*')['count']
print ("Datasets in ES: %d"%(cnt))
In [43]:
!mkdir feed
In [45]:
!curl -O http://stat-computing.org/dataexpo/2009/2003.csv.bz2
In [46]:
!bzip2 -d 2003.csv.bz2
In [49]:
!mv 2003.csv feed
In [50]:
!ls -l feed
In [ ]:
In [ ]:
# for live reload of data during demo
# execute this and repeat steps from dd.read_csv in Cell 8
cd ..
mkdir feed
cd feed
!curl -O http://stat-computing.org/dataexpo/2009/2003.csv.bz2
!bzip2 -d 2003.csv.bz2