First we want to get a testing library called fake-factory. I will use this to generate fake names for the calls.
In [1]:
from faker import Factory
fake = Factory.create()
In [3]:
# this is what the fake-factory library does
print('{} {} <{}>'.format(
fake.first_name(),
fake.last_name(),
fake.email(),
))
Bring in the Threading library, and the Requests library
In [4]:
import threading
import requests
NUM_THREADS = 10
NUM_ITERATIONS = 50
Define the method that will be called in each of the threads. ...and run it just to see it working.
In [6]:
def call_api(name='Noname'):
'''Method to call our API endpoint with name provided.'''
url = 'https://jdow62ruh9.execute-api.us-east-1.amazonaws.com/test/greeting'
post_data = dict(name=name)
response = requests.post(url, json=post_data)
if response.ok:
return response.json()
else:
{}
result = call_api('{} {}'.format(fake.first_name(), fake.last_name()))
print('{} {}'.format(
result.get('greeting'),
result.get('name'),
))
Finally we do the loops.
The inner loop starts each of the threads, then waits for all of them to finish. Then the outer loop goes around again.
In [7]:
for i in range(0, NUM_ITERATIONS):
thread_list = []
print('.'),
# start a bunch of threads running
for x in range(0, NUM_THREADS):
name = fake.first_name()
th = threading.Thread(
target=call_api,
args=(name,),
)
thread_list.append(th)
th.start()
# wait for all the threads to complete before moving on
for th in thread_list:
th.join()
print('Done')
In [ ]: