Pivot review data with Elastic data frames

This notebook shows how data can be pivoted with Elastic data frames to reveal insights into the behaviour of reviewers. The use case and data is from Mark Harwood's talk on entity-centric indexing.

An alternative version of this notebook uses python pandas to create the same results.


In [1]:
import bz2
import matplotlib.pyplot as plt
import csv
import time
import pandas as pd

from elasticsearch import helpers
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError

Connect to Elasticsearch

First connect to Elasticsearch. This assumes access is via localhost:9200, change next line to change connection parameters (see https://elasticsearch-py.readthedocs.io/en/master/api.html).


In [2]:
es = Elasticsearch()

Read data to Elasticsearch

Note this deletes and creates indices.


In [3]:
index_name = "anonreviews"
index_name_pivot = 'anonreviews_pivot'
index_settings = { "settings": { "number_of_shards": 1, "number_of_replicas": 0 }, "mappings": { "properties": { "reviewerId": { "type": "keyword" }, "vendorId": { "type": "keyword" }, "date": { "type": "date", "format" : "yyyy-MM-dd HH:mm" }, "rating": { "type": "integer" } } } }
es.indices.delete(index=index_name, ignore=[400, 404])
es.indices.create(index=index_name, body=index_settings)


Out[3]:
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'anonreviews'}

In [4]:
actions = []
bulk_batch_size = 10000
n = 0

csv_handle = bz2.open('./anonreviews.csv.bz2', 'rt')
csv_reader = csv.DictReader(csv_handle)
for row in csv_reader:
    n += 1
    action = { "_index": index_name, "_source": row }
    actions.append(action)
    
    if n % bulk_batch_size == 0:
        helpers.bulk(es, actions)
        actions = []
        
if len(actions) > 0:
    helpers.bulk(es, actions)

Flush to ensure all docs are indexed, then summarise indexing.


In [5]:
es.indices.flush(index=index_name)
es.count(index=index_name)


Out[5]:
{'count': 578805,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}}

Aggregate and Pivot data

Pivot data so we get summaries for each reviewer.

In pandas, we do the following:

aggregations = {
    'rating':'mean',
    'vendorId':'nunique',
    'reviewerId':'count'
}

grouped = reviews.groupby('reviewerId').agg(aggregations)
grouped.columns=['avg_rating', 'dc_vendorId', 'count']

In Elasticsearch we can use data frames to achieve the same transformation. First preview the transformation:


In [6]:
request = {
  "source": {
    "index": index_name
  },
  "dest": {
    "index": index_name_pivot
  },
  "pivot": {
    "group_by": {
      "reviewerId": {
        "terms": {
          "field": "reviewerId"
        }
      }
    },
    "aggregations": {
      "avg_rating": {
        "avg": {
          "field": "rating"
        }
      },
      "dc_vendorId": {
        "cardinality": {
          "field": "vendorId"
        }
      },
      "count": {
        "value_count": {
          "field": "_id"
        }
      }
    }
  }
}

response = es.transport.perform_request('POST', '/_data_frame/transforms/_preview', body=request)

In [7]:
response['preview'][0:10]


Out[7]:
[{'reviewerId': '0', 'avg_rating': 5.0, 'count': 1.0, 'dc_vendorId': 1.0},
 {'reviewerId': '1', 'avg_rating': 5.0, 'count': 9.0, 'dc_vendorId': 7.0},
 {'reviewerId': '10', 'avg_rating': 5.0, 'count': 1.0, 'dc_vendorId': 1.0},
 {'reviewerId': '100', 'avg_rating': 5.0, 'count': 1.0, 'dc_vendorId': 1.0},
 {'reviewerId': '1000', 'avg_rating': 4.5, 'count': 4.0, 'dc_vendorId': 4.0},
 {'reviewerId': '10000', 'avg_rating': 5.0, 'count': 1.0, 'dc_vendorId': 1.0},
 {'reviewerId': '100000', 'avg_rating': 5.0, 'count': 1.0, 'dc_vendorId': 1.0},
 {'reviewerId': '100001', 'avg_rating': 5.0, 'count': 1.0, 'dc_vendorId': 1.0},
 {'reviewerId': '100002', 'avg_rating': 5.0, 'count': 1.0, 'dc_vendorId': 1.0},
 {'reviewerId': '100003', 'avg_rating': 5.0, 'count': 1.0, 'dc_vendorId': 1.0}]

Create Elastic Data Frame


In [8]:
# First delete old index if it exists
es.indices.delete(index='anonreviews_pivot', ignore=[400, 404])

# Stop and delete any old jobs (ignore if they don't exist)
try:
    es.transport.perform_request('POST', '/_data_frame/transforms/anonreviews_pivot/_stop')
    es.transport.perform_request('DELETE', '/_data_frame/transforms/anonreviews_pivot')
except NotFoundError:
    pass
    
# Now create data frame job (called anonreviews_pivot)
es.transport.perform_request('PUT', '/_data_frame/transforms/anonreviews_pivot', body=request)


Out[8]:
{'acknowledged': True}

In [9]:
# Start job
es.transport.perform_request('POST', '/_data_frame/transforms/anonreviews_pivot/_start')


Out[9]:
{'acknowledged': True}

In [10]:
# Poll for progress
while True:
    response = es.transport.perform_request('GET', '/_data_frame/transforms/anonreviews_pivot/_stats')
    
    if response['transforms'][0]['state']['task_state'] == 'stopped':
        print(response['transforms'][0]['state']['progress'])
        break
    if 'progress' in response['transforms'][0]['state']:
        print(response['transforms'][0]['state']['progress'])
    time.sleep(5)


{'total_docs': 578805, 'docs_remaining': 509542, 'percent_complete': 11.966551774777344}
{'total_docs': 578805, 'docs_remaining': 452708, 'percent_complete': 21.78574822263111}
{'total_docs': 578805, 'docs_remaining': 399715, 'percent_complete': 30.941336028541564}
{'total_docs': 578805, 'docs_remaining': 351594, 'percent_complete': 39.255189571617386}
{'total_docs': 578805, 'docs_remaining': 302940, 'percent_complete': 47.66112939591054}
{'total_docs': 578805, 'docs_remaining': 255684, 'percent_complete': 55.82553709798637}
{'total_docs': 578805, 'docs_remaining': 211096, 'percent_complete': 63.52899508470038}
{'total_docs': 578805, 'docs_remaining': 172708, 'percent_complete': 70.16128056944912}
{'total_docs': 578805, 'docs_remaining': 136928, 'percent_complete': 76.34298252433894}
{'total_docs': 578805, 'docs_remaining': 86007, 'percent_complete': 85.14059139088295}
{'total_docs': 578805, 'docs_remaining': 16588, 'percent_complete': 97.13409524796779}
{'total_docs': 578805, 'docs_remaining': 0, 'percent_complete': 100.0}

In [11]:
def hits_to_df(response, create_index=True):
    hits = []
    index = []
    for hit in response['hits']['hits']:
        hits.append(hit['_source'])
        index.append(hit['_source']['reviewerId'])
    if create_index:
        return pd.DataFrame(hits, index=index)
    else:
        return pd.DataFrame(hits)

Find 'haters'

Reviewers that give more than five zero star reviews to one vendor


In [12]:
q =  "dc_vendorId:1 AND count :>5 AND avg_rating:0"
sort = "count:desc"

response = es.search(index='anonreviews_pivot', q=q, sort=sort, size=100)

In [13]:
hits_to_df(response)


Out[13]:
avg_rating count dc_vendorId reviewerId
10392 0.0 94.0 1.0 10392
17033 0.0 51.0 1.0 17033
21046 0.0 25.0 1.0 21046
11479 0.0 20.0 1.0 11479
27448 0.0 19.0 1.0 27448
17602 0.0 15.0 1.0 17602
8185 0.0 15.0 1.0 8185
13984 0.0 10.0 1.0 13984
228129 0.0 9.0 1.0 228129
25267 0.0 8.0 1.0 25267
53432 0.0 8.0 1.0 53432
19813 0.0 7.0 1.0 19813
135506 0.0 6.0 1.0 135506
11987 0.0 6.0 1.0 11987

For example, reviewer 10392 gives 94 zero star reviews to vendor 122


In [14]:
q =  "reviewerId:10392"

response = es.search(index='anonreviews', q=q, size=5) # top 5 only

In [15]:
hits_to_df(response, False)


Out[15]:
date rating reviewerId vendorId
0 2006-06-11 09:14 0 10392 122
1 2006-06-13 14:19 0 10392 122
2 2006-06-15 21:03 0 10392 122
3 2006-06-17 09:22 0 10392 122
4 2006-05-17 06:52 0 10392 122

Find 'fanboys'

Reviewers that give more than five five star reviews to one vendor


In [16]:
q =  "dc_vendorId:1 AND count :>5 AND avg_rating:5"
sort = "count:desc"

response = es.search(index='anonreviews_pivot', q=q, sort=sort, size=100)

In [17]:
hits_to_df(response)


Out[17]:
avg_rating count dc_vendorId reviewerId
183751 5.0 73.0 1.0 183751
260225 5.0 69.0 1.0 260225
205864 5.0 35.0 1.0 205864
345080 5.0 23.0 1.0 345080
179944 5.0 22.0 1.0 179944
345082 5.0 21.0 1.0 345082
345068 5.0 20.0 1.0 345068
345081 5.0 20.0 1.0 345081
345069 5.0 19.0 1.0 345069
345070 5.0 18.0 1.0 345070
345083 5.0 18.0 1.0 345083
345086 5.0 18.0 1.0 345086
345084 5.0 17.0 1.0 345084
345085 5.0 17.0 1.0 345085
264635 5.0 13.0 1.0 264635
321206 5.0 12.0 1.0 321206
12539 5.0 11.0 1.0 12539
159035 5.0 10.0 1.0 159035
114661 5.0 9.0 1.0 114661
39655 5.0 8.0 1.0 39655
22515 5.0 7.0 1.0 22515
180082 5.0 7.0 1.0 180082
58447 5.0 7.0 1.0 58447
160018 5.0 6.0 1.0 160018
168143 5.0 6.0 1.0 168143
180085 5.0 6.0 1.0 180085
28814 5.0 6.0 1.0 28814
30474 5.0 6.0 1.0 30474
35048 5.0 6.0 1.0 35048
393237 5.0 6.0 1.0 393237
75010 5.0 6.0 1.0 75010

Reviewer 183751 gives 73 five star reviews to vendor 190


In [18]:
q =  "reviewerId:183751"

response = es.search(index='anonreviews', q=q, size=5) # top 5 only

In [19]:
hits_to_df(response, False)


Out[19]:
date rating reviewerId vendorId
0 2006-09-22 16:36 5 183751 190
1 2006-09-22 16:36 5 183751 190
2 2006-09-22 16:35 5 183751 190
3 2006-09-22 15:53 5 183751 190
4 2006-09-22 15:53 5 183751 190