This notebook shows how data can be pivoted with Elastic data frames to reveal insights into the behaviour of reviewers. The use case and data is from Mark Harwood's talk on entity-centric indexing.
An alternative version of this notebook uses python pandas to create the same results.
In [1]:
import bz2
import matplotlib.pyplot as plt
import csv
import time
import pandas as pd
from elasticsearch import helpers
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError
First connect to Elasticsearch. This assumes access is via localhost:9200
, change next line to change connection parameters (see https://elasticsearch-py.readthedocs.io/en/master/api.html).
In [2]:
es = Elasticsearch()
In [3]:
index_name = "anonreviews"
index_name_pivot = 'anonreviews_pivot'
index_settings = { "settings": { "number_of_shards": 1, "number_of_replicas": 0 }, "mappings": { "properties": { "reviewerId": { "type": "keyword" }, "vendorId": { "type": "keyword" }, "date": { "type": "date", "format" : "yyyy-MM-dd HH:mm" }, "rating": { "type": "integer" } } } }
es.indices.delete(index=index_name, ignore=[400, 404])
es.indices.create(index=index_name, body=index_settings)
Out[3]:
In [4]:
actions = []
bulk_batch_size = 10000
n = 0
csv_handle = bz2.open('./anonreviews.csv.bz2', 'rt')
csv_reader = csv.DictReader(csv_handle)
for row in csv_reader:
n += 1
action = { "_index": index_name, "_source": row }
actions.append(action)
if n % bulk_batch_size == 0:
helpers.bulk(es, actions)
actions = []
if len(actions) > 0:
helpers.bulk(es, actions)
Flush to ensure all docs are indexed, then summarise indexing.
In [5]:
es.indices.flush(index=index_name)
es.count(index=index_name)
Out[5]:
In pandas, we do the following:
aggregations = {
'rating':'mean',
'vendorId':'nunique',
'reviewerId':'count'
}
grouped = reviews.groupby('reviewerId').agg(aggregations)
grouped.columns=['avg_rating', 'dc_vendorId', 'count']
In Elasticsearch we can use data frames to achieve the same transformation. First preview the transformation:
In [6]:
request = {
"source": {
"index": index_name
},
"dest": {
"index": index_name_pivot
},
"pivot": {
"group_by": {
"reviewerId": {
"terms": {
"field": "reviewerId"
}
}
},
"aggregations": {
"avg_rating": {
"avg": {
"field": "rating"
}
},
"dc_vendorId": {
"cardinality": {
"field": "vendorId"
}
},
"count": {
"value_count": {
"field": "_id"
}
}
}
}
}
response = es.transport.perform_request('POST', '/_data_frame/transforms/_preview', body=request)
In [7]:
response['preview'][0:10]
Out[7]:
In [8]:
# First delete old index if it exists
es.indices.delete(index='anonreviews_pivot', ignore=[400, 404])
# Stop and delete any old jobs (ignore if they don't exist)
try:
es.transport.perform_request('POST', '/_data_frame/transforms/anonreviews_pivot/_stop')
es.transport.perform_request('DELETE', '/_data_frame/transforms/anonreviews_pivot')
except NotFoundError:
pass
# Now create data frame job (called anonreviews_pivot)
es.transport.perform_request('PUT', '/_data_frame/transforms/anonreviews_pivot', body=request)
Out[8]:
In [9]:
# Start job
es.transport.perform_request('POST', '/_data_frame/transforms/anonreviews_pivot/_start')
Out[9]:
In [10]:
# Poll for progress
while True:
response = es.transport.perform_request('GET', '/_data_frame/transforms/anonreviews_pivot/_stats')
if response['transforms'][0]['state']['task_state'] == 'stopped':
print(response['transforms'][0]['state']['progress'])
break
if 'progress' in response['transforms'][0]['state']:
print(response['transforms'][0]['state']['progress'])
time.sleep(5)
In [11]:
def hits_to_df(response, create_index=True):
hits = []
index = []
for hit in response['hits']['hits']:
hits.append(hit['_source'])
index.append(hit['_source']['reviewerId'])
if create_index:
return pd.DataFrame(hits, index=index)
else:
return pd.DataFrame(hits)
In [12]:
q = "dc_vendorId:1 AND count :>5 AND avg_rating:0"
sort = "count:desc"
response = es.search(index='anonreviews_pivot', q=q, sort=sort, size=100)
In [13]:
hits_to_df(response)
Out[13]:
For example, reviewer 10392 gives 94 zero star reviews to vendor 122
In [14]:
q = "reviewerId:10392"
response = es.search(index='anonreviews', q=q, size=5) # top 5 only
In [15]:
hits_to_df(response, False)
Out[15]:
In [16]:
q = "dc_vendorId:1 AND count :>5 AND avg_rating:5"
sort = "count:desc"
response = es.search(index='anonreviews_pivot', q=q, sort=sort, size=100)
In [17]:
hits_to_df(response)
Out[17]:
Reviewer 183751 gives 73 five star reviews to vendor 190
In [18]:
q = "reviewerId:183751"
response = es.search(index='anonreviews', q=q, size=5) # top 5 only
In [19]:
hits_to_df(response, False)
Out[19]: