In [10]:
import boto3
import botocore
import os
import sys
from datetime import datetime
import csv
from collections import defaultdict
import multiprocessing
import uuid
import time

S3DOWNLOAD_PROCESS_COUNT = multiprocessing.cpu_count() * 5


def get_previous_quarter_hour_end():
    now = datetime.now()
    now_mins = int(now.strftime('%M'))
    quarter_hour_increments = str(now_mins//15)
    window_end_mins = str(int(quarter_hour_increments)*15)
    if len(window_end_mins) < 2:
        window_end_mins += "0" + window_end_mins
    return window_end_mins


def get_filenames_from_manifest(bucket_resource):
    filenames_from_manifest = []
    item_prefix = 'tmp/traffic-streams-us-west-2/manifests/pending/'
    # window_end_mins = get_previous_quarter_hour_end()
    # item_prefix += datetime.now().strftime("%Y%m%d{}".format(window_end_mins))
    print("[DEBUG] Getting files with prefix {}".format(item_prefix))
    # Currently all of 'today', not the last 15 min window
    for obj in bucket_resource.objects.filter(Prefix=item_prefix):
        try:
            body = obj.get()['Body'].read()
            for line in body.decode("utf-8").split('\n'):
                line_list = line.split(',')
                if len(line_list) > 1:
                    filenames_from_manifest.append(line_list[1])
        except Exception as e:
            print(e)
    return filenames_from_manifest


def activity_file_processor(file_q, results_q):
    while True:
        print(".  ", end="")
        sys.stdout.flush()
        stats = {'row_count': 0, "clicks": 0, "lite-clicks": 0}
        try:
            s3dl_filename = "s3_temp{}.dl".format(str(uuid.uuid1()))
            s3 = boto3.client('s3')
            q_item = file_q.get()
            if q_item is None:
                q_item.task_done()
                print("[DEBUG] Q empty. Process terminating.")
                break  # None is the termination signal for End Of Stream
            bucket_name, filename = q_item.split('/', 1)
            s3.download_file(bucket_name, filename, s3dl_filename)
            with open(s3dl_filename, 'r') as s3file:
                s3_csv_reader = csv.reader(s3file, delimiter=',', quotechar='"')
                for row in s3_csv_reader:
                    stats['row_count'] += 1
                    if 'lite-click' in filename:  # first to catch it before 'clicks' does
                        stats['lite-clicks'] += 1
                    elif 'click' in filename:
                        stats['clicks'] += 1
                    elif 'impression' in filename:
                        stats['impressions'] += 1
            os.remove(s3dl_filename)
        except AttributeError:
            print("[EXCEPTION] Attrubite error caught (q closed during processing?)")
        except botocore.exceptions.ClientError:
            print("[EXCEPTION] S3 File [{}] not found?".format(filename))
        except FileNotFoundError:
            print("[EXCEPTION] S3 downloaded temp file could not be found. (delete failure?)")
        finally:
            results_q.put(stats)
            file_q.task_done()


def tally_stats(results_q):
    results_dict = defaultdict(int)
    while True:
        if results_q.empty():
            print("[DEBUG] results q after tally {}".format(results_q))
            break
        result = results_q.get()
        print(results_dict)
        for key in result.keys():
            results_dict[key] += result[key]
    return results_dict


def get_s3_info():
    file_q = multiprocessing.JoinableQueue()
    event_accumulator_q = multiprocessing.JoinableQueue()
    ttl_stats = {}
    filenames_from_manifest = []
    process_list = []  # s3 activity file processors
    s3client = boto3.client('s3')
    buckets = s3client.list_buckets()['Buckets']
    for bucket in buckets:
        if bucket["Name"].endswith('traffic-streams-us-west-2'):
            print("[DEBUG] Checking manifest files in s3 bucket {} ".format(bucket["Name"]))
            s3 = boto3.resource('s3')
            bucket_resource = s3.Bucket(bucket["Name"])
            filenames_from_manifest = get_filenames_from_manifest(bucket_resource)
            event_file_count = 0
            print("[DEBUG] Adding event files to working Q.")
            for filename in filenames_from_manifest:
                if event_file_count > 10:
                    break
                event_file_count += 1
                print(".", end="")
                file_q.put(filename)
            print("\n[DEBUG] Found {} event files to process".format(event_file_count))
            if event_file_count == 0:
                print("[INFO] No event files found to process. Exiting.")
                break
            for i in range(0, S3DOWNLOAD_PROCESS_COUNT):
                file_q.put(None)  # None signals to a process to terminate, q empty
            for i in range(0, S3DOWNLOAD_PROCESS_COUNT):
                proc = multiprocessing.Process(
                    target=activity_file_processor,
                    args=[file_q, event_accumulator_q])
                # proc.daemon = True
                proc.start()
                process_list.append(proc)
    print("[INFO] Started {} S3 download threads".format(len(process_list)))
    # for proc in process_list:
    #     proc.join()
    if file_q:
        file_q.join()
    sentinal = multiprocessing.Process(
        target=sentinal_process, args=[file_q, process_list]).start()
    if sentinal:
        sentinal.join()
    ttl_stats = tally_stats(event_accumulator_q)
    return ttl_stats


def sentinal_process(q, process_list):
    live = True
    print("[INFO] Sentinal process has started.")
    while live:
        time.sleep(1)
        if q.empty():
            print("\n[INFO] Sentinal noted that the file Q is empty. Killing all processes.")
            time.sleep(3)
            q.close()
            for proc in process_list:
                proc.terminate()
                live = False


if __name__ == "__main__":
    print(get_s3_info())


The history saving thread hit an unexpected error (OperationalError('unable to open database file',)).History will not be written to the database.
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2885, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-10-0733ed273c1b>", line 157, in <module>
    print(get_s3_info())
  File "<ipython-input-10-0733ed273c1b>", line 96, in get_s3_info
    event_accumulator_q = multiprocessing.JoinableQueue()
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/context.py", line 106, in JoinableQueue
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/queues.py", line 281, in __init__
    Queue.__init__(self, maxsize, ctx=ctx)
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/queues.py", line 47, in __init__
    self._wlock = ctx.Lock()
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/context.py", line 66, in Lock
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/synchronize.py", line 163, in __init__
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/synchronize.py", line 60, in __init__
OSError: [Errno 24] Too many open files

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 1827, in showtraceback
    stb = value._render_traceback_()
AttributeError: 'OSError' object has no attribute '_render_traceback_'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.5/site-packages/IPython/core/ultratb.py", line 1120, in get_records
  File "/usr/local/lib/python3.5/site-packages/IPython/core/ultratb.py", line 301, in wrapped
  File "/usr/local/lib/python3.5/site-packages/IPython/core/ultratb.py", line 346, in _fixed_getinnerframes
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/inspect.py", line 1453, in getinnerframes
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/inspect.py", line 1410, in getframeinfo
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/inspect.py", line 672, in getsourcefile
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/inspect.py", line 701, in getmodule
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/inspect.py", line 685, in getabsfile
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/posixpath.py", line 361, in abspath
OSError: [Errno 24] Too many open files
ERROR: Internal Python error in the inspect module.
Below is the traceback from this internal error.


Unfortunately, your original traceback can not be constructed.