• threadpool map takes one task at a time, assigning it to whatever thread is free.
  • tasks are not assigned ahead of time
  • assigment depends on how long each task takes to run

In [11]:
from concurrent import futures
import time
import sys
import threading
import toolz
from datetime import datetime

def diagnostic_map(mapper, fn, data):
    def wrapper(x):
        thread = threading.get_ident()
        start = str(datetime.now().time())
        start2 = time.time()
        result = fn(x)
        end = str(datetime.now().time())
        total_time = time.time() - start2
        
        d = dict(result=result, thread=thread, start=start, end=end, total_time=total_time)
        
        return d
    
    start = time.time()
    out = list(mapper(wrapper, data))
    total_time = time.time() - start
    
    result = [d['result'] for d in out]

    for d in out:
        del d['result']
    diag = dict(total_time=total_time, tasks=out)
    
    return result, diag

def thread_info(diag):
    print('Total time: {}'.format(diag['total_time']))
    tasks = diag['tasks']
    for i,d in enumerate(tasks):
        d['index'] = i
        
    d = toolz.groupby(lambda x: x['thread'], tasks)
    
    for thread in d:
        print('### Thread: ', thread)
        tasks = d[thread]

        for task in tasks:
            print('Index: {}'.format(task['index']))
            print('Time: {}--{}'.format(task['start'], task['end']))
            print('Total time: {}'.format(task['total_time']))
            print('')

In [8]:
in_tasks = [1, 10, 2,3,4,5]

In [9]:
result, diag = diagnostic_map(map, time.sleep, in_tasks)
thread_info(diag)


### Thread:  140735190237184
Index: 0
Time: 15:09:49.694203--15:09:50.694417
Total time: 1.0002198219299316

Index: 1
Time: 15:09:50.694472--15:10:00.695154
Total time: 10.000715017318726

Index: 2
Time: 15:10:00.695219--15:10:02.698314
Total time: 2.0031449794769287

Index: 3
Time: 15:10:02.698402--15:10:05.703637
Total time: 3.0052950382232666

Index: 4
Time: 15:10:05.703752--15:10:09.709083
Total time: 4.005376100540161

Index: 5
Time: 15:10:09.709193--15:10:14.710507
Total time: 5.001398086547852


In [13]:
%%time
with futures.ThreadPoolExecutor(6) as ex: 
    result, diag = diagnostic_map(ex.map, time.sleep, in_tasks)


CPU times: user 4.64 ms, sys: 3.34 ms, total: 7.97 ms
Wall time: 10 s

In [14]:
thread_info(diag)


Total time: 10.004420042037964
### Thread:  123145325469696
Index: 0
Time: 15:13:14.654960--15:13:15.659452
Total time: 1.0045280456542969

### Thread:  123145330724864
Index: 1
Time: 15:13:14.656013--15:13:24.658673
Total time: 10.002716064453125

### Thread:  123145351745536
Index: 5
Time: 15:13:14.656904--15:13:19.659410
Total time: 5.002537965774536

### Thread:  123145341235200
Index: 3
Time: 15:13:14.656561--15:13:17.661792
Total time: 3.0052831172943115

### Thread:  123145346490368
Index: 4
Time: 15:13:14.656745--15:13:18.662054
Total time: 4.005357980728149

### Thread:  123145335980032
Index: 2
Time: 15:13:14.656317--15:13:16.661559
Total time: 2.005284070968628


In [18]:
import scs
num_problems = 20
m = 1000 # size of L1 problem

data = [scs.examples.l1(m, seed=i) for i in range(num_problems)]

In [19]:
def solve(x):
    return scs.solve(*x, verbose=False)

In [28]:
%%time
workers = 2 # number of threads/processes
with futures.ThreadPoolExecutor(workers) as ex: 
    result, diag = diagnostic_map(ex.map, solve, data)


CPU times: user 48.7 s, sys: 303 ms, total: 49 s
Wall time: 25.2 s

In [30]:
thread_info(diag)


Total time: 25.186974048614502
### Thread:  123145325469696
Index: 0
Time: 15:29:38.602435--15:29:41.325301
Total time: 2.7228751182556152

Index: 3
Time: 15:29:41.325433--15:29:43.737189
Total time: 2.411777973175049

Index: 5
Time: 15:29:43.737322--15:29:46.269704
Total time: 2.5323970317840576

Index: 7
Time: 15:29:46.269817--15:29:48.668321
Total time: 2.398531913757324

Index: 9
Time: 15:29:48.668455--15:29:51.360616
Total time: 2.6921770572662354

Index: 11
Time: 15:29:51.360745--15:29:53.971491
Total time: 2.610764980316162

Index: 13
Time: 15:29:53.971609--15:29:56.361483
Total time: 2.3898909091949463

Index: 15
Time: 15:29:56.361594--15:29:58.965721
Total time: 2.6041507720947266

Index: 17
Time: 15:29:58.965846--15:30:01.463124
Total time: 2.497299909591675

Index: 19
Time: 15:30:01.463264--15:30:03.788873
Total time: 2.3256280422210693

### Thread:  123145330724864
Index: 1
Time: 15:29:38.602939--15:29:40.968389
Total time: 2.365457057952881

Index: 2
Time: 15:29:40.968497--15:29:43.303341
Total time: 2.3348639011383057

Index: 4
Time: 15:29:43.303468--15:29:45.637890
Total time: 2.3344411849975586

Index: 6
Time: 15:29:45.638014--15:29:48.007919
Total time: 2.369925022125244

Index: 8
Time: 15:29:48.008041--15:29:50.520509
Total time: 2.512489080429077

Index: 10
Time: 15:29:50.520635--15:29:53.120910
Total time: 2.600296974182129

Index: 12
Time: 15:29:53.121039--15:29:55.408197
Total time: 2.2871780395507812

Index: 14
Time: 15:29:55.408319--15:29:57.814547
Total time: 2.4062459468841553

Index: 16
Time: 15:29:57.814662--15:30:00.257504
Total time: 2.4428610801696777

Index: 18
Time: 15:30:00.257613--15:30:02.865872
Total time: 2.6082839965820312

aggregate and broadcast

  • tasks are like SCS workspace objects
  • make the tasks a list of ints, where the int is the size of the array
  • each one sleeps for a second, and creates the appropriate array
  • aggregate the arrays (all at the end, or as they come in?)
  • make them all the same size
  • put them in a dict?
  • when does the summing begin?

In [29]:
import numpy as np
from concurrent import futures
import time

data = [10**7]*8

In [38]:
def foo(n):
    time.sleep(1)
    return np.random.randn(n)

In [40]:
%%time
result = sum(map(foo, data))


CPU times: user 3.78 s, sys: 174 ms, total: 3.95 s
Wall time: 12 s

In [41]:
%%time
with futures.ThreadPoolExecutor(2) as ex: 
    result = sum(ex.map(foo, data))


CPU times: user 3.74 s, sys: 186 ms, total: 3.93 s
Wall time: 6.38 s

In [49]:
def summer(g):
    total = 0.0
    for i in g:
        print('got new element.')
        #total = total + i
    return total

In [50]:
%%time
data = [10]+[1]*8

with futures.ThreadPoolExecutor(2) as ex: 
    result = summer(ex.map(time.sleep, data))


got new element.
got new element.
got new element.
got new element.
got new element.
got new element.
got new element.
got new element.
got new element.
CPU times: user 3.74 ms, sys: 10.8 ms, total: 14.6 ms
Wall time: 10 s

In [ ]: