In [33]:
import boto3
import getopt
import datetime as dt
from datetime import datetime
import time
import multiprocessing
import json
import random
import uuid

click_data_template = {}
impression_data_template = {}

# ## Configurations BEGIN

TEST_STREAM_LIST = ["QA-Test"]
# TEST_STREAM_LIST = ["clicks", "impressions"]
TEST_CLICKS_STREAMS = ['QA-Test', 'clicks']  # write clicks only to these streams
TEST_IMPRESSIONS_STREAMS = ['QA-Test', 'impressions']  # write impressions only to these streams
WRITE_INTERVAL_SECONDS = .01  # pause between writes, per process
PROCESSES_PER_STREAM = multiprocessing.cpu_count() * 4

# ## Configurations END

def put_events_on_queue(q, clicks=0, lite_clicks=0, impressions=0, append_kill_signal=True):
    for click in range(0, clicks):
        q.put(create_event_batch(1, click_data_template, lite_click=False))
    for lite_click in range(0, lite_clicks):
        q.put(create_event_batch(1, click_data_template, lite_click=True))
    for impression in range(0, impressions):
        q.put(create_event_batch(1, impression_data_template, lite_click=False))
    if append_kill_signal:
        for i in range(0, PROCESSES_PER_STREAM):
            q.put(None)


def create_event_batch(count, event_template, lite_click):
    records_list = []
    for i in range(0, count):
        records_list.append(new_event_record_from(event_template, lite_click=lite_click))
    return records_list


def new_event_record_from(event_template, lite_click=False):
    new_event = None
    if 'click_id' in event_template:
        new_event = click_data_template.copy()
        new_event['traffic_type_id'] = 1
    elif 'impression_id' in event_template:
        new_event = impression_data_template.copy()
        new_event['traffic_type_id'] = 3
    else:
        new_event = "borked template detection"
    if lite_click:
        new_event['traffic_type_id'] = 2  # override the default of id 1
    new_event['subid_4'] = random.randrange(1000, 2000)
    current_time = datetime.strftime(datetime.utcnow(), '%Y-%m-%dT%H:%M:%S.%fZ')
    new_event['request_date'] = current_time
    new_event['request_date_utc'] = current_time
    if 'impression_id' in new_event:
        new_event['impression_id'] = str(uuid.uuid1())
    if 'click_id' in new_event:
        new_event['click_id'] = str(uuid.uuid1())
    new_event['client_id'] = random.randrange(1000, 1250)
    record = {'Data': json.dumps(new_event), 'PartitionKey': str(uuid.uuid1())}
    return record


def get_streams():
    kinesis = boto3.client('kinesis')
    stream_response = kinesis.list_streams()['StreamNames']
    return [stream_name for stream_name in stream_response if stream_name in TEST_STREAM_LIST]


def get_shard_ids_for(stream_name, tuples_with_stream_names=False):
    kinesis = boto3.client('kinesis')
    shard_ids = []
    stream_description = kinesis.describe_stream(StreamName=stream_name)
    for item in stream_description['StreamDescription']['Shards']:
        value = item['ShardId']
        if tuples_with_stream_names:
            value = (stream_name, item['ShardId'])
        shard_ids.append(value)
    return shard_ids


def get_streams_and_shard_ids():
    shards = []
    for stream_name in get_streams():
        if stream_name in TEST_STREAM_LIST:  # TODO: alter this to read and write on other streams
            shards.extend(get_shard_ids_for(stream_name, tuples_with_stream_names=True))
    return shards


def stream_write_worker(stream_name, shard_ids, process_name, event_q, event_results_q):
    kinesis = boto3.client('kinesis')
    write_count = 0
    while True:
        time.sleep(WRITE_INTERVAL_SECONDS)
        event_data = event_q.get()
        if event_data is None:
            event_q.task_done()
            print("[DEBUG] {} thread recieved END SIGNAL after writing {} lines to {}".format(
                process_name, write_count, stream_name))
            break
        response = kinesis.put_records(
            Records=event_data,
            StreamName=stream_name,
        )
        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
            print('[ERROR] Write Process {} got response of {}'.format(
                  process_name, response['ResponseMetadata']['HTTPStatusCode']))
        log_line = "Write Response {}, {} at {}".format(
            process_name,
            response['ResponseMetadata']['HTTPStatusCode'],
            datetime.now())
        write_count += 1
        event_results_q.put(log_line)
        event_q.task_done()


def set_event_templates():
    global click_data_template, impression_data_template
    with open('click_template.json', 'r') as infile:
        click_data_template = json.loads(infile.read())
    with open('impression_template.json') as infile:
        impression_data_template = json.loads(infile.read())


def write_kinesis_info(clicks=0, lite_clicks=0, impressions=0):
    set_event_templates()
    try:
        processes = []
        click_q = multiprocessing.JoinableQueue(clicks + lite_clicks + PROCESSES_PER_STREAM)
        impression_q = multiprocessing.JoinableQueue(impressions + PROCESSES_PER_STREAM)
        event_results_q = multiprocessing.JoinableQueue(
            clicks + lite_clicks + impressions + PROCESSES_PER_STREAM)
        print("[LOG] setting up queues with {} clicks, {} lite_clicks, and {} impressions".format(
            clicks, lite_clicks, impressions))
        if clicks > 0:
            put_events_on_queue(click_q, clicks=clicks, append_kill_signal=False)
        if lite_clicks > 0:
            put_events_on_queue(click_q, lite_clicks=lite_clicks)
        if impressions > 0:
            put_events_on_queue(impression_q, impressions=impressions)
        # sentinal = multiprocessing.Process(
        #     target=sentinal_process, args=(processes, [click_q, impression_q]))
        # sentinal.start()
        for stream_name in get_streams():
            print('[LOG] setting up for stream name {}'.format(stream_name.upper()))
            shard_ids = get_shard_ids_for(stream_name)
            for process_number in range(0, PROCESSES_PER_STREAM):
                if len(shard_ids) == 0:
                    print("[WARN] write process found ZERO shards for {}".format(stream_name))
                    next
                if stream_name in TEST_CLICKS_STREAMS:  # clicks limited to these streams
                    click_process_name = "{}_click_process-{}".format(
                        stream_name, process_number)
                    click_process = multiprocessing.Process(
                        target=stream_write_worker,
                        args=(stream_name, shard_ids, click_process_name, click_q, event_results_q)
                    )
                    click_process.daemon = True
                    click_process.start()
                    processes.append(click_process)
                if stream_name in TEST_IMPRESSIONS_STREAMS:  # impressions limited to these streams
                    impression_process_name = "{}_impression_process-{}".format(
                        stream_name, process_number)
                    impression_process = multiprocessing.Process(
                        target=stream_write_worker,
                        args=(stream_name,
                              shard_ids, impression_process_name, impression_q, event_results_q)
                    )
                    impression_process.daemon = True
                    impression_process.start()
                    processes.append(impression_process)
        print("[LOG] Started {} processes".format(len(processes)))
        impression_q.join()
        click_q.join()
        print("[LOG] all processes completed")
    finally:
        for process in processes:
            if process is not None:
                process.terminate()
    return [1, 2, 3, 4]


def sentinal_process(plist, queues):
    empties = [False, False]
    while True:
        if queues[0].empty():
            empties[0] = True
        if queues[1].empty():
            empties[1] = True
        if False not in empties:
            print("[WARN] Queues empty. \
                    Sentinal process killing all remaining processes and exiting.")
            for p in plist:
                p.terminate()
            break
        time.sleep(1)


def read_shard_stream(shard_iterator, process_name):
    # print(shard_iterator)
    client = boto3.client('kinesis')
    shard_id = shard_iterator["ShardIterator"]
    while True:
        records = client.get_records(ShardIterator=shard_id, Limit=25)
        shard_id = records["NextShardIterator"]
        for record in records['Records']:
            data = json.loads(record['Data'].decode('utf-8'))
            print("{} {}".format(process_name, data))  # data['click_id']))
        time.sleep(0.2)


def get_kinesis_info(clicks=0, lite_clicks=0, impressions=0):
    processes = []
    kinesis = boto3.client('kinesis')

    for (stream_name, shard_id) in get_streams_and_shard_ids():
        print('[DEBUG] setting up GET process for {}, {}'.format(stream_name, shard_id))
        shard_iterator = kinesis.get_shard_iterator(
            StreamName=stream_name,
            ShardId=shard_id,
            ShardIteratorType='AT_TIMESTAMP',
            Timestamp=datetime.utcnow() - dt.timedelta(seconds=20)
        )
        process = multiprocessing.Process(
            target=read_shard_stream,
            args=(shard_iterator, "{}_{}".format(stream_name, shard_id))
        )
        process.start()
        processes.append(process)

    for process in processes:
        process.join()


def main(argv):
    try:
        opts, args = getopt.getopt(argv, "rc:l:i:",
                                   ["reader", "clicks=", "lite_clicks=" "impressions="])
    except getopt.GetoptError:
        print("-c 100 -l 125 -i 50 or --clicks=100 --lite-clicks=125 --impressions=50")
        sys.exit(2)
    clicks = 0
    lite_clicks = 0
    impressions = 0
    for opt, arg in opts:
        if opt in ['-i', '--impressions']:
            impressions = int(arg)
        if opt in ['-c', 'clicks']:
            clicks = int(arg)
        if opt in ['-l', '--lite-clicks']:
            lite_clicks = int(arg)
        if opt in ['-r', '--reader']:
            get_kinesis_info(clicks=clicks, lite_clicks=lite_clicks, impressions=impressions)
            exit(0)

    if lite_clicks or clicks or impressions:
        write_kinesis_info(clicks=clicks, lite_clicks=lite_clicks, impressions=impressions)


if __name__ == "__main__":
    import sys
    main(["-l", "10", "-i", "10"])  # iPython testing
#     while True:
#         main(sys.argv[1:])  # local command line testing
#         time.sleep(15)


[LOG] setting up queues with 0 clicks, 10 lite_clicks, and 10 impressions
[LOG] setting up for stream name QA-TEST
[DEBUG] QA-Test_click_process-10 thread recieved END SIGNAL after writing 0 lines to QA-Test
[DEBUG] QA-Test_click_process-11 thread recieved END SIGNAL after writing 0 lines to QA-Test
[DEBUG] QA-Test_impression_process-10 thread recieved END SIGNAL after writing 0 lines to QA-Test
[DEBUG] QA-Test_impression_process-11 thread recieved END SIGNAL after writing 0 lines to QA-Test
[DEBUG] QA-Test_click_process-12 thread recieved END SIGNAL after writing 0 lines to QA-Test
[DEBUG] QA-Test_impression_process-12 thread recieved END SIGNAL after writing 0 lines to QA-Test
[DEBUG] QA-Test_impression_process-13 thread recieved END SIGNAL after writing 0 lines to QA-Test
[DEBUG] QA-Test_click_process-13 thread recieved END SIGNAL after writing 0 lines to QA-Test
[DEBUG] QA-Test_click_process-14 thread recieved END SIGNAL after writing 0 lines to QA-Test
[DEBUG] QA-Test_click_process-15 thread recieved END SIGNAL after writing 0 lines to QA-Test
[DEBUG] QA-Test_impression_process-14 thread recieved END SIGNAL after writing 0 lines to QA-Test
[DEBUG] QA-Test_impression_process-15 thread recieved END SIGNAL after writing 0 lines to QA-Test
[LOG] Started 32 processes
[DEBUG] QA-Test_impression_process-0 thread recieved END SIGNAL after writing 1 lines to QA-Test
[DEBUG] QA-Test_click_process-3 thread recieved END SIGNAL after writing 1 lines to QA-Test
[DEBUG] QA-Test_click_process-0 thread recieved END SIGNAL after writing 1 lines to QA-Test
[DEBUG] QA-Test_impression_process-1 thread recieved END SIGNAL after writing 1 lines to QA-Test
[DEBUG] QA-Test_click_process-4 thread recieved END SIGNAL after writing 1 lines to QA-Test
[DEBUG] QA-Test_impression_process-3 thread recieved END SIGNAL after writing 1 lines to QA-Test
[DEBUG] QA-Test_click_process-6 thread recieved END SIGNAL after writing 1 lines to QA-Test
[DEBUG] QA-Test_impression_process-6 thread recieved END SIGNAL after writing 1 lines to QA-Test
[DEBUG] QA-Test_click_process-7 thread recieved END SIGNAL after writing 1 lines to QA-Test
[DEBUG] QA-Test_impression_process-7 thread recieved END SIGNAL after writing 1 lines to QA-Test
[DEBUG] QA-Test_click_process-1 thread recieved END SIGNAL after writing 1 lines to QA-Test
[DEBUG] QA-Test_impression_process-2 thread recieved END SIGNAL after writing 1 lines to QA-Test
[DEBUG] QA-Test_impression_process-5 thread recieved END SIGNAL after writing 1 lines to QA-Test
[DEBUG] QA-Test_click_process-2 thread recieved END SIGNAL after writing 1 lines to QA-Test
[DEBUG] QA-Test_click_process-5 thread recieved END SIGNAL after writing 1 lines to QA-Test
[DEBUG] QA-Test_click_process-9 thread recieved END SIGNAL after writing 1 lines to QA-Test
[DEBUG] QA-Test_click_process-8 thread recieved END SIGNAL after writing 1 lines to QA-Test
[DEBUG] QA-Test_impression_process-4 thread recieved END SIGNAL after writing 1 lines to QA-Test
[DEBUG] QA-Test_impression_process-9 thread recieved END SIGNAL after writing 1 lines to QA-Test
[LOG] all processes completed