In [33]:
import boto3
import getopt
import datetime as dt
from datetime import datetime
import time
import multiprocessing
import json
import random
import uuid
click_data_template = {}
impression_data_template = {}
# ## Configurations BEGIN
TEST_STREAM_LIST = ["QA-Test"]
# TEST_STREAM_LIST = ["clicks", "impressions"]
TEST_CLICKS_STREAMS = ['QA-Test', 'clicks'] # write clicks only to these streams
TEST_IMPRESSIONS_STREAMS = ['QA-Test', 'impressions'] # write impressions only to these streams
WRITE_INTERVAL_SECONDS = .01 # pause between writes, per process
PROCESSES_PER_STREAM = multiprocessing.cpu_count() * 4
# ## Configurations END
def put_events_on_queue(q, clicks=0, lite_clicks=0, impressions=0, append_kill_signal=True):
for click in range(0, clicks):
q.put(create_event_batch(1, click_data_template, lite_click=False))
for lite_click in range(0, lite_clicks):
q.put(create_event_batch(1, click_data_template, lite_click=True))
for impression in range(0, impressions):
q.put(create_event_batch(1, impression_data_template, lite_click=False))
if append_kill_signal:
for i in range(0, PROCESSES_PER_STREAM):
q.put(None)
def create_event_batch(count, event_template, lite_click):
records_list = []
for i in range(0, count):
records_list.append(new_event_record_from(event_template, lite_click=lite_click))
return records_list
def new_event_record_from(event_template, lite_click=False):
new_event = None
if 'click_id' in event_template:
new_event = click_data_template.copy()
new_event['traffic_type_id'] = 1
elif 'impression_id' in event_template:
new_event = impression_data_template.copy()
new_event['traffic_type_id'] = 3
else:
new_event = "borked template detection"
if lite_click:
new_event['traffic_type_id'] = 2 # override the default of id 1
new_event['subid_4'] = random.randrange(1000, 2000)
current_time = datetime.strftime(datetime.utcnow(), '%Y-%m-%dT%H:%M:%S.%fZ')
new_event['request_date'] = current_time
new_event['request_date_utc'] = current_time
if 'impression_id' in new_event:
new_event['impression_id'] = str(uuid.uuid1())
if 'click_id' in new_event:
new_event['click_id'] = str(uuid.uuid1())
new_event['client_id'] = random.randrange(1000, 1250)
record = {'Data': json.dumps(new_event), 'PartitionKey': str(uuid.uuid1())}
return record
def get_streams():
kinesis = boto3.client('kinesis')
stream_response = kinesis.list_streams()['StreamNames']
return [stream_name for stream_name in stream_response if stream_name in TEST_STREAM_LIST]
def get_shard_ids_for(stream_name, tuples_with_stream_names=False):
kinesis = boto3.client('kinesis')
shard_ids = []
stream_description = kinesis.describe_stream(StreamName=stream_name)
for item in stream_description['StreamDescription']['Shards']:
value = item['ShardId']
if tuples_with_stream_names:
value = (stream_name, item['ShardId'])
shard_ids.append(value)
return shard_ids
def get_streams_and_shard_ids():
shards = []
for stream_name in get_streams():
if stream_name in TEST_STREAM_LIST: # TODO: alter this to read and write on other streams
shards.extend(get_shard_ids_for(stream_name, tuples_with_stream_names=True))
return shards
def stream_write_worker(stream_name, shard_ids, process_name, event_q, event_results_q):
kinesis = boto3.client('kinesis')
write_count = 0
while True:
time.sleep(WRITE_INTERVAL_SECONDS)
event_data = event_q.get()
if event_data is None:
event_q.task_done()
print("[DEBUG] {} thread recieved END SIGNAL after writing {} lines to {}".format(
process_name, write_count, stream_name))
break
response = kinesis.put_records(
Records=event_data,
StreamName=stream_name,
)
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
print('[ERROR] Write Process {} got response of {}'.format(
process_name, response['ResponseMetadata']['HTTPStatusCode']))
log_line = "Write Response {}, {} at {}".format(
process_name,
response['ResponseMetadata']['HTTPStatusCode'],
datetime.now())
write_count += 1
event_results_q.put(log_line)
event_q.task_done()
def set_event_templates():
global click_data_template, impression_data_template
with open('click_template.json', 'r') as infile:
click_data_template = json.loads(infile.read())
with open('impression_template.json') as infile:
impression_data_template = json.loads(infile.read())
def write_kinesis_info(clicks=0, lite_clicks=0, impressions=0):
set_event_templates()
try:
processes = []
click_q = multiprocessing.JoinableQueue(clicks + lite_clicks + PROCESSES_PER_STREAM)
impression_q = multiprocessing.JoinableQueue(impressions + PROCESSES_PER_STREAM)
event_results_q = multiprocessing.JoinableQueue(
clicks + lite_clicks + impressions + PROCESSES_PER_STREAM)
print("[LOG] setting up queues with {} clicks, {} lite_clicks, and {} impressions".format(
clicks, lite_clicks, impressions))
if clicks > 0:
put_events_on_queue(click_q, clicks=clicks, append_kill_signal=False)
if lite_clicks > 0:
put_events_on_queue(click_q, lite_clicks=lite_clicks)
if impressions > 0:
put_events_on_queue(impression_q, impressions=impressions)
# sentinal = multiprocessing.Process(
# target=sentinal_process, args=(processes, [click_q, impression_q]))
# sentinal.start()
for stream_name in get_streams():
print('[LOG] setting up for stream name {}'.format(stream_name.upper()))
shard_ids = get_shard_ids_for(stream_name)
for process_number in range(0, PROCESSES_PER_STREAM):
if len(shard_ids) == 0:
print("[WARN] write process found ZERO shards for {}".format(stream_name))
next
if stream_name in TEST_CLICKS_STREAMS: # clicks limited to these streams
click_process_name = "{}_click_process-{}".format(
stream_name, process_number)
click_process = multiprocessing.Process(
target=stream_write_worker,
args=(stream_name, shard_ids, click_process_name, click_q, event_results_q)
)
click_process.daemon = True
click_process.start()
processes.append(click_process)
if stream_name in TEST_IMPRESSIONS_STREAMS: # impressions limited to these streams
impression_process_name = "{}_impression_process-{}".format(
stream_name, process_number)
impression_process = multiprocessing.Process(
target=stream_write_worker,
args=(stream_name,
shard_ids, impression_process_name, impression_q, event_results_q)
)
impression_process.daemon = True
impression_process.start()
processes.append(impression_process)
print("[LOG] Started {} processes".format(len(processes)))
impression_q.join()
click_q.join()
print("[LOG] all processes completed")
finally:
for process in processes:
if process is not None:
process.terminate()
return [1, 2, 3, 4]
def sentinal_process(plist, queues):
empties = [False, False]
while True:
if queues[0].empty():
empties[0] = True
if queues[1].empty():
empties[1] = True
if False not in empties:
print("[WARN] Queues empty. \
Sentinal process killing all remaining processes and exiting.")
for p in plist:
p.terminate()
break
time.sleep(1)
def read_shard_stream(shard_iterator, process_name):
# print(shard_iterator)
client = boto3.client('kinesis')
shard_id = shard_iterator["ShardIterator"]
while True:
records = client.get_records(ShardIterator=shard_id, Limit=25)
shard_id = records["NextShardIterator"]
for record in records['Records']:
data = json.loads(record['Data'].decode('utf-8'))
print("{} {}".format(process_name, data)) # data['click_id']))
time.sleep(0.2)
def get_kinesis_info(clicks=0, lite_clicks=0, impressions=0):
processes = []
kinesis = boto3.client('kinesis')
for (stream_name, shard_id) in get_streams_and_shard_ids():
print('[DEBUG] setting up GET process for {}, {}'.format(stream_name, shard_id))
shard_iterator = kinesis.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType='AT_TIMESTAMP',
Timestamp=datetime.utcnow() - dt.timedelta(seconds=20)
)
process = multiprocessing.Process(
target=read_shard_stream,
args=(shard_iterator, "{}_{}".format(stream_name, shard_id))
)
process.start()
processes.append(process)
for process in processes:
process.join()
def main(argv):
try:
opts, args = getopt.getopt(argv, "rc:l:i:",
["reader", "clicks=", "lite_clicks=" "impressions="])
except getopt.GetoptError:
print("-c 100 -l 125 -i 50 or --clicks=100 --lite-clicks=125 --impressions=50")
sys.exit(2)
clicks = 0
lite_clicks = 0
impressions = 0
for opt, arg in opts:
if opt in ['-i', '--impressions']:
impressions = int(arg)
if opt in ['-c', 'clicks']:
clicks = int(arg)
if opt in ['-l', '--lite-clicks']:
lite_clicks = int(arg)
if opt in ['-r', '--reader']:
get_kinesis_info(clicks=clicks, lite_clicks=lite_clicks, impressions=impressions)
exit(0)
if lite_clicks or clicks or impressions:
write_kinesis_info(clicks=clicks, lite_clicks=lite_clicks, impressions=impressions)
if __name__ == "__main__":
import sys
main(["-l", "10", "-i", "10"]) # iPython testing
# while True:
# main(sys.argv[1:]) # local command line testing
# time.sleep(15)