map
takes one task at a time, assigning it to whatever thread is free.
In [11]:
from concurrent import futures
import time
import sys
import threading
import toolz
from datetime import datetime
def diagnostic_map(mapper, fn, data):
def wrapper(x):
thread = threading.get_ident()
start = str(datetime.now().time())
start2 = time.time()
result = fn(x)
end = str(datetime.now().time())
total_time = time.time() - start2
d = dict(result=result, thread=thread, start=start, end=end, total_time=total_time)
return d
start = time.time()
out = list(mapper(wrapper, data))
total_time = time.time() - start
result = [d['result'] for d in out]
for d in out:
del d['result']
diag = dict(total_time=total_time, tasks=out)
return result, diag
def thread_info(diag):
print('Total time: {}'.format(diag['total_time']))
tasks = diag['tasks']
for i,d in enumerate(tasks):
d['index'] = i
d = toolz.groupby(lambda x: x['thread'], tasks)
for thread in d:
print('### Thread: ', thread)
tasks = d[thread]
for task in tasks:
print('Index: {}'.format(task['index']))
print('Time: {}--{}'.format(task['start'], task['end']))
print('Total time: {}'.format(task['total_time']))
print('')
In [8]:
in_tasks = [1, 10, 2,3,4,5]
In [9]:
result, diag = diagnostic_map(map, time.sleep, in_tasks)
thread_info(diag)
In [13]:
%%time
with futures.ThreadPoolExecutor(6) as ex:
result, diag = diagnostic_map(ex.map, time.sleep, in_tasks)
In [14]:
thread_info(diag)
In [18]:
import scs
num_problems = 20
m = 1000 # size of L1 problem
data = [scs.examples.l1(m, seed=i) for i in range(num_problems)]
In [19]:
def solve(x):
return scs.solve(*x, verbose=False)
In [28]:
%%time
workers = 2 # number of threads/processes
with futures.ThreadPoolExecutor(workers) as ex:
result, diag = diagnostic_map(ex.map, solve, data)
In [30]:
thread_info(diag)
In [29]:
import numpy as np
from concurrent import futures
import time
data = [10**7]*8
In [38]:
def foo(n):
time.sleep(1)
return np.random.randn(n)
In [40]:
%%time
result = sum(map(foo, data))
In [41]:
%%time
with futures.ThreadPoolExecutor(2) as ex:
result = sum(ex.map(foo, data))
In [49]:
def summer(g):
total = 0.0
for i in g:
print('got new element.')
#total = total + i
return total
In [50]:
%%time
data = [10]+[1]*8
with futures.ThreadPoolExecutor(2) as ex:
result = summer(ex.map(time.sleep, data))
In [ ]: