There are two ways to get data from elasticsearch: query and streaming. Query is prefered method for all searches that return in less than 1-5 minutes. To stream much more data (potentially all) one uses scan method. To further optimize the scan, one can request only subset of fields and not full documents.
In [1]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
import time
In [6]:
start = 1505573241 # Sep 16 9:... AM
interval = 3600 #seconds
end = start + interval
In [7]:
es_chicago = Elasticsearch(['atlas-kibana.mwt2.org:9200'],timeout=60)
query_chicago = {
"_source": [ "timestamp", "srcSite", "destSite", "delay_mean" ],
"query": {
"bool": {
"must": [
{ "range": { "timestamp": { "gte": start, "lte": end, "format": "epoch_second" } } },
{ "term" : {"_type" : "latency"}},
{ "term" : {"srcVO" : "ATLAS"}},
{ "term" : {"destVO" : "ATLAS"}}
] } }
}
In [14]:
scroll_chicago = scan(client=es_chicago, query=query_chicago, index="network_weather-2017*", scroll='5m', timeout='5m', size=10000, filter_path=['_scroll_id', '_shards', 'hits.hits._source'])
counter = 0
start_time = time.time()
for entry in scroll_chicago:
if not counter%100000:
print("Processing event number ", counter)
#print (entry)
# break
counter += 1
print("%s entries processes in %i seconds. Rate was: %f doc/s." %(counter, time.time() - start_time, counter/(time.time() - start_time) ))
In [ ]: