Getting the Enron dataset and indexing it in Elasticsearch

In the concept search sub-project, we'll be working with the Enron email dataset. To implement query expansion algorithms, it is probably easiest to work directly with the raw text files on your hard disk. However, because some query expansion techniques require using search results, it is also convenient to have a copy of the data in a search engine. This notebook contains scripts to extract the dataset and indexing it in Elasticsearch. It also contains examples of how to query Elasticsearch.

First, download the dataset from OneDrive (choose enron_mail_clean.tar.gz).

Next, update f = and out_dir = to match where you downloaded the archive and where you want to extract the data to.


In [16]:
import tarfile
import os
import sys

f = '/Users/wrvhage/Downloads/enron_mail_clean.tar.gz'
out_dir = '/Users/wrvhage/Data'

In [17]:
# Extract text files 
def extract_documents(members):
    for tarinfo in members:
        p, t = os.path.split(tarinfo.name)
        if p.endswith('all_documents'):
            yield tarinfo

tar = tarfile.open(f)
tar.extractall(path=out_dir, members=extract_documents(tar))
tar.close()

In [18]:
# convert data to json
import json

data_dir = os.path.join(out_dir, 'enron_mail_clean')

dump_dir = os.path.join(out_dir, 'enron_email_clean_json')
if not os.path.exists(dump_dir):
    os.makedirs(dump_dir)

for person in os.listdir(data_dir):
    with open(os.path.join(dump_dir, person), 'w') as out_file:
        document_dir = os.path.join(data_dir, person, 'all_documents')
        for doc in os.listdir(document_dir):
            with open(os.path.join(document_dir, doc), 'r') as f:
                text = f.read()
            a = { 'index' : { '_index' : 'enron', '_type' : 'email', '_id': '{}/{}'.format(person, doc)} }
            out_file.write(json.dumps(a))
            out_file.write('\n')
    
            d = {'text': text}
            out_file.write(json.dumps(d))
            out_file.write('\n')

Elasticsearch

Install Elasticsearch (instructions)

Start Elasticsearch by typing ./bin/elasticsearch in the directory where you installed it.

More info on getting started with Elasticsearch (including links to useful plugins).

Install the Python Elasticsearch Client:

pip install elasticsearch-py

In [19]:
from elasticsearch import Elasticsearch

es = Elasticsearch()

In [20]:
# create index

config = {}
config['settings'] = {
    'analysis' : {
        'analyzer': {
            'default': {
                'type':'standard',
                'stopwords': '_english_',
            }
        }
    }
}

config['mappings'] = { 
    'email': {
        'properties': {
            'text': {
                'type': 'string', 
                'term_vector': 'with_positions_offsets_payloads'
            },
        }
    }
}
es.indices.create(index='enron', body=config)


WARNING:elasticsearch:PUT /enron [status:400 request:0.015s]
---------------------------------------------------------------------------
RequestError                              Traceback (most recent call last)
<ipython-input-20-757568f56401> in <module>()
     23     }
     24 }
---> 25 es.indices.create(index='enron', body=config)

/usr/local/lib/python2.7/site-packages/elasticsearch/client/utils.pyc in _wrapped(*args, **kwargs)
     67                 if p in kwargs:
     68                     params[p] = kwargs.pop(p)
---> 69             return func(*args, params=params, **kwargs)
     70         return _wrapped
     71     return _wrapper

/usr/local/lib/python2.7/site-packages/elasticsearch/client/indices.pyc in create(self, index, body, params)
    103             raise ValueError("Empty value passed for a required argument 'index'.")
    104         _, data = self.transport.perform_request('PUT', _make_path(index),
--> 105             params=params, body=body)
    106         return data
    107 

/usr/local/lib/python2.7/site-packages/elasticsearch/transport.pyc in perform_request(self, method, url, params, body)
    305 
    306             try:
--> 307                 status, headers, data = connection.perform_request(method, url, params, body, ignore=ignore, timeout=timeout)
    308 
    309             except TransportError as e:

/usr/local/lib/python2.7/site-packages/elasticsearch/connection/http_urllib3.pyc in perform_request(self, method, url, params, body, timeout, ignore)
     91         if not (200 <= response.status < 300) and response.status not in ignore:
     92             self.log_request_fail(method, url, body, duration, response.status)
---> 93             self._raise_error(response.status, raw_data)
     94 
     95         self.log_request_success(method, full_url, url, body, response.status,

/usr/local/lib/python2.7/site-packages/elasticsearch/connection/base.pyc in _raise_error(self, status_code, raw_data)
    103             pass
    104 
--> 105         raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
    106 
    107 

RequestError: TransportError(400, u'index_already_exists_exception')

In [21]:
# index data
for p in os.listdir(dump_dir):
    with open(os.path.join(dump_dir, p), 'r') as f:
         data = f.read()
    
    es.bulk(index='enron', doc_type='email', body=data)

Example Elasticsearch Queries

More info:


In [22]:
# match all
query = {'query': {'match_all': {}}}
res = es.search(index='enron', doc_type='email', body=query)

print("Got %d Hits:" % res['hits']['total'])

#print json.dumps(res, indent=4, separators=(',', ': '))


Got 128103 Hits:

In [26]:
# query string query (complex boolean queries possible. See: 
# https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html)
query = {
    "query": {
        "query_string": {
            "query": "(natural AND gas) OR industrial"
        }
    }
}
res = es.search(index='enron', doc_type='email', body=query)

print("Got %d Hits:" % res['hits']['total'])

#print json.dumps(res, indent=4, separators=(',', ': '))


Got 4884 Hits:

In [23]:
# Term query
# https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-term-query.html
query = {
    "query": {
        "term" : { "text" : "natural" } 
    }
}
res = es.search(index='enron', doc_type='email', body=query)

print("Got %d Hits:" % res['hits']['total'])

#print json.dumps(res, indent=4, separators=(',', ': '))


Got 3967 Hits:

In [28]:
# significant terms aggregation 
# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-significantterms-aggregation.html
query = {
    "query": {
        "query_string": {
            "query": "(natural AND gas) OR industrial"
        }
    },
    "aggregations" : {
        "significantTerms" : {
            "significant_terms" : { "field" : "text", "size": 15 }
        }
    }
}
res = es.search(index='enron', doc_type='email', body=query, size=0)

print("Got %d Hits:" % res['hits']['total'])

#print json.dumps(res, indent=4, separators=(',', ': '))


Got 4884 Hits:

In [29]:
# Delete enron index (uncomment if needed)
#es.indices.delete(index='enron', ignore=[400, 404])