Streaming selected data from Elasticsearch

There are two ways to get data from elasticsearch: query and streaming. Query is prefered method for all searches that return in less than 1-5 minutes. To stream much more data (potentially all) one uses scan method. To further optimize the scan, one can request only subset of fields and not full documents.

imports


In [1]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
import time

time range to read


In [6]:
start = 1505573241 # Sep 16  9:... AM    
interval = 3600 #seconds     
end = start + interval

create query. This does not execute it.


In [7]:
es_chicago = Elasticsearch(['atlas-kibana.mwt2.org:9200'],timeout=60)                                              

query_chicago = {
    "_source": [   "timestamp", "srcSite", "destSite", "delay_mean"  ],
    "query": {
        "bool": {
        "must": [
            { "range": {  "timestamp": { "gte": start,  "lte": end,  "format": "epoch_second"  }  } },
            { "term" : {"_type" : "latency"}},
            { "term" : {"srcVO" : "ATLAS"}},
            { "term" : {"destVO" : "ATLAS"}}
        ]  }  }
}

now we actually stream data


In [14]:
scroll_chicago = scan(client=es_chicago, query=query_chicago, index="network_weather-2017*", scroll='5m', timeout='5m', size=10000, filter_path=['_scroll_id', '_shards', 'hits.hits._source'])

counter = 0
start_time = time.time()
for entry in scroll_chicago:
    if not counter%100000:  
        print("Processing event number ", counter)
    #print (entry)
#     break
    counter += 1

print("%s entries processes in %i seconds. Rate was: %f doc/s." %(counter, time.time() - start_time, counter/(time.time() - start_time) ))


Processing event number  0
Processing event number  100000
Processing event number  200000
232401 entries processes in 16 seconds. Rate was: 14502.876678

In [ ]: