In [1]:
import multiprocessing as mp
from time import sleep
from queue import Empty
import sys
In [128]:
class Job(object):
def __init__(self, function, args=None, kwargs=None, depends=None):
self.function = function,
self.args = args
self.kwargs = kwargs
self.depends = depends
self.jobno = None
In [384]:
def job_runner(jobqueue, outputs, cores=1, jobno=1):
"""
jobs: [(command, args)]
outputs: {id: retval}
"""
import sys
def output(out):
"""Let's try and explicitly clear the dictionary before sending the output."""
lastout = outputs.get() if not outputs.empty() else ''
if out == lastout:
return
while not outputs.empty():
# Clear the output object
outputs.get()
outputs.put(out)
# Make sure we have Queue objects
if not isinstance(jobqueue, mp.queues.Queue) \
or not isinstance(outputs, mp.queues.Queue):
raise ClusterError('jobqueue and outputs must be multiprocessing ' +
'Queue objects')
jobno = int(jobno) if jobno else 1
jobno = jobno-1 if jobno is not 0 else 0
jobs = {}
runners = {}
started = []
done = []
pool = mp.Pool(cores)
while True:
if not jobqueue.empty():
jobno += 1
job = jobqueue.get_nowait()
#function = job.function
#args = job.args
#depends = job.depends
function, args, depends = job
jobs[jobno] = {'func': function, 'args': args, 'kwargs': None, 'depends': depends, 'state': None, 'out': None}
output(jobs)
if jobs:
for jobno, job_info in jobs.items():
if job_info['state'] == 'done':
continue
ready = True
if job_info['depends']:
for depend in job_info['depends']:
if not depend in done:
ready = False
if ready and not jobno in started:
if job_info['args'] and job_info['kwargs']:
runners[jobno] = pool.apply_async(job_info['func'], (job_info['args'],), job_info['kwargs'])
elif job_info['args']:
runners[jobno] = pool.apply_async(job_info['func'], (job_info['args'],))
elif job_info['kwargs']:
runners[jobno] = pool.apply_async(job_info['func'], kwds=job_info['kwargs'])
else:
runners[jobno] = pool.apply_async(job_info['func'])
job_info['state'] = 'started'
started.append(jobno)
output(jobs)
sleep(0.5) # Wait for a second to allow job to start
if job_info['state'] == 'started' and not jobno in done and runners[jobno].ready():
job_info['out'] = runners[jobno].get()
job_info['state'] = 'done'
done.append(jobno)
output(jobs)
sleep(0.5)
In [385]:
queue = mp.Queue()
outqueue = mp.Queue()
In [386]:
runner = mp.Process(target=job_runner, args=(queue, outqueue))
In [387]:
runner.start()
In [388]:
queue.empty()
Out[388]:
In [389]:
outqueue.empty()
Out[389]:
In [390]:
runner.is_alive()
Out[390]:
In [391]:
myjobs = {}
In [392]:
queue.put([jon, 'wjonred', None])
In [393]:
myjobs = update_jobs(myjobs, outqueue)
In [394]:
myjobs
Out[394]:
In [395]:
outqueue.empty()
Out[395]:
In [396]:
queue.put((jon, 'bob', [4]))
In [397]:
queue.put((jon, 'joe', None))
In [398]:
outqueue.empty()
Out[398]:
In [399]:
myjobs = update_jobs(myjobs, outqueue)
myjobs
Out[399]:
Note that job 2 has not executed because it is waiting for job 4, which has not run yet
In [400]:
queue.put([jon, 'done', None])
In [401]:
myjobs = update_jobs(myjobs, outqueue)
myjobs
Out[401]:
In [402]:
runner.is_alive()
Out[402]:
In [403]:
outqueue.empty()
Out[403]:
In [404]:
myjobs = update_jobs(myjobs, outqueue)
myjobs
Out[404]:
In [2]:
def job_runner(cores, jobqueue, outputs):
"""
jobs: [(command, args)]
outputs: {id: retval}
"""
import sys
def output(out):
"""Let's try and explicitly clear the dictionary before sending the output."""
lastout = outputs.get() if not outputs.empty() else ''
if out == lastout:
return
while not outputs.empty():
# Clear the output object
outputs.get()
outputs.put(out)
jobno = 0
jobs = {}
runners = {}
done = []
pool = mp.Pool(cores)
while True:
if not jobqueue.empty():
fun_args = jobqueue.get_nowait()
if len(fun_args) == 1:
function = fun_args[0]
args = None
depends = None
elif len(fun_args) == 2:
function, args = fun_args
depends = None
elif len(fun_args) == 3:
function, args, depends = fun_args
else:
continue
jobno += 1
jobs[jobno] = {'func': function, 'args': args, 'depends': depends, 'done': False, 'out': None,
'started': False}
output(jobs)
if jobs:
for jobno, job_info in jobs.items():
if job_info['done']:
continue
ready = True
if job_info['depends']:
for depend in job_info['depends']:
if not depend in done:
ready = False
if ready and not job_info['started']:
if job_info['args']:
runners[jobno] = pool.apply_async(job_info['func'], (job_info['args'],))
else:
runners[jobno] = pool.apply_async(job_info['func'])
job_info['started'] = True
output(jobs)
sleep(0.5) # Wait for a second to allow job to start
if job_info['started'] and not job_info['done'] and runners[jobno].ready():
job_info['out'] = runners[jobno].get()
job_info['done'] = True
done.append(jobno)
output(jobs)
#if job_info['depends']:
# outputs.put(jobs.copy())
sleep(0.5)
In [2]:
def update_jobs(jobdict, outqueue):
sleep(2)
while not outqueue.empty():
jobdict.update(outqueue.get_nowait())
return jobdict
In [3]:
def jon(string='bob'):
return 'hi ' + string
In [5]:
queue = mp.Queue()
outqueue = mp.Queue()
In [6]:
runner = mp.Process(target=job_runner, args=(3, queue, outqueue))
In [7]:
runner.start()
In [8]:
queue.empty()
Out[8]:
In [9]:
outqueue.empty()
Out[9]:
In [10]:
runner.is_alive()
Out[10]:
In [11]:
myjobs = {}
In [12]:
queue.put([jon, 'fred', None])
In [13]:
myjobs = update_jobs(myjobs, outqueue)
In [14]:
myjobs
Out[14]:
In [15]:
outqueue.empty()
Out[15]:
In [16]:
queue.put([jon, 'bob', [4]])
In [17]:
queue.put([jon, 'joe', None])
In [18]:
outqueue.empty()
Out[18]:
In [19]:
myjobs = update_jobs(myjobs, outqueue)
myjobs
Out[19]:
Note that job 2 has not executed because it is waiting for job 4, which has not run yet
In [20]:
queue.put([jon, 'done', None])
In [21]:
myjobs = update_jobs(myjobs, outqueue)
myjobs
Out[21]:
In [22]:
runner.is_alive()
Out[22]:
In [23]:
outqueue.empty()
Out[23]:
In [24]:
myjobs = update_jobs(myjobs, outqueue)
myjobs
Out[24]:
Now job 2 and job 4 have both completed. Works perfectly.
Note: It takes a long time for the function to run jobs, so a lot of sleeping is required.
In [25]:
def job_runner(cores, jobqueue, outputs):
"""
jobs: [(command, args)]
outputs: {id: retval}
"""
import sys
def output(out):
"""Let's try and explicitly clear the dictionary before sending the output."""
lastout = outputs.get() if not outputs.empty() else ''
if out == lastout:
return
sleep(0.5)
while not outputs.empty():
# Clear the output object
outputs.get()
outputs.put(out)
jobno = 0
jobs = {}
done = []
while True:
try:
fun_args = jobqueue.get_nowait()
if len(fun_args) == 1:
function = fun_args[0]
args = None
depends = None
elif len(fun_args) == 2:
function, args = fun_args
depends = None
elif len(fun_args) == 3:
function, args, depends = fun_args
else:
continue
jobno += 1
jobs[jobno] = {'func': function, 'args': args, 'depends': depends, 'done': False, 'out': None,
'started': False}
output(jobs)
except Empty:
pass
if jobs:
for jobno, job_info in jobs.items():
if job_info['done']:
continue
ready = True
if job_info['depends']:
for depend in job_info['depends']:
if not depend in done:
ready = False
if ready:
if job_info['args']:
job_info['out'] = job_info['func'](job_info['args'])
else:
job_info['out'] = job_info['func']()
job_info['started'] = True
job_info['done'] = True
done.append(jobno)
if job_info['depends']:
output(jobs)
sleep(1)
In [26]:
queue = mp.Queue()
outqueue = mp.Queue()
In [27]:
runner = mp.Process(target=job_runner, args=(3, queue, outqueue))
In [28]:
runner.start()
In [29]:
queue.empty()
Out[29]:
In [30]:
outqueue.empty()
Out[30]:
In [31]:
runner.is_alive()
Out[31]:
In [32]:
myjobs = {}
In [33]:
queue.put([jon, 'fred', None])
In [34]:
myjobs = update_jobs(myjobs, outqueue)
In [35]:
myjobs
Out[35]:
In [36]:
myjobs = update_jobs(myjobs, outqueue)
myjobs
Out[36]:
In [37]:
queue.put([jon, 'bob', [4]])
In [38]:
queue.put([jon, 'joe', None])
In [39]:
myjobs = update_jobs(myjobs, outqueue)
myjobs
Out[39]:
Note that the get command only gets the first dictionary in the stack, it needs to be run twice if two commands are put.
In [40]:
myjobs = update_jobs(myjobs, outqueue)
myjobs
Out[40]:
In [41]:
queue.put([jon, 'done', None])
In [42]:
myjobs = update_jobs(myjobs, outqueue)
myjobs
Out[42]:
Note that job 2 has not completed.
In [43]:
myjobs = update_jobs(myjobs, outqueue)
myjobs
Out[43]:
Now it has though, we needed to get it a second time, as one get only fetches one successful loop.
In [44]:
def job_runner(cores, jobqueue, outputs):
"""
jobs: [(command, args)]
outputs: {id: retval}
"""
import sys
jobno = 0
jobs = {}
runners = {}
done = []
pool = mp.Pool()
while True:
try:
fun_args = jobqueue.get_nowait()
if len(fun_args) == 1:
function = fun_args[0]
args = None
depends = None
elif len(fun_args) == 2:
function, args = fun_args
depends = None
elif len(fun_args) == 3:
function, args, depends = fun_args
else:
continue
jobno += 1
jobs[jobno] = {'func': function, 'args': args, 'depends': depends, 'done': False, 'out': None,
'started': False}
outputs.put(jobs.copy())
except Empty:
pass
if jobs:
for jobno, job_info in jobs.items():
if job_info['done']:
continue
ready = True
if job_info['depends']:
for depend in job_info['depends']:
if not depend in done:
ready = False
if ready:
if job_info['args']:
runners[jobno] = pool.apply_async(job_info['func'], (job_info['args'],))
else:
runners[jobno] = pool.apply_async(job_info['func'])
job_info['started'] = True
if job_info['started'] and runners[jobno].ready():
job_info['out'] = runners[jobno].join()
job_info['done'] = True
done.append(jobno)
outputs.put(jobs.copy())
#if job_info['depends']:
# outputs.put(jobs.copy())
sleep(1)
In [45]:
queue = mp.Queue()
outqueue = mp.Queue()
In [46]:
runner = mp.Process(target=job_runner, args=(3, queue, outqueue))
In [47]:
runner.start()
In [48]:
queue.empty()
Out[48]:
In [49]:
outqueue.empty()
Out[49]:
In [50]:
runner.is_alive()
Out[50]:
In [51]:
queue.put([jon, 'fred', None])
In [52]:
outdict = {}
In [53]:
sleep(2)
try:
j = outqueue.get_nowait()
except Empty as e:
print(e)
In [54]:
j
Out[54]:
In [55]:
if j:
outdict.update(j)
In [56]:
outdict
Out[56]:
In [57]:
sleep(2)
try:
j = outqueue.get_nowait()
except Empty as e:
print(e)
In [58]:
j
Out[58]:
Nope Still isn't completing why?
In [59]:
queue.put([jon, 'bob', [4]])
In [60]:
queue.put([jon, 'joe', None])
In [61]:
sleep(1)
j = outqueue.get_nowait()
In [62]:
if j:
outdict.update(j)
In [63]:
outdict
Out[63]:
It worked here once... but it isn't anymore. I am not sure why. Even after though, it was overwritten with the incomplete entry... maybe the dictionary copying is part of the issue?
In [64]:
sleep(2)
j = outqueue.get_nowait()
In [65]:
if j:
outdict.update(j)
In [66]:
outdict
Out[66]:
In [67]:
queue.put([jon, 'done', None])
In [68]:
sleep(2)
j = outqueue.get_nowait()
In [69]:
if j:
outdict.update(j)
In [70]:
outdict
Out[70]:
No jobs have completed now...
In [71]:
sleep(2)
try:
j = outqueue.get_nowait()
except Empty as e:
print(repr(e))
In [72]:
j
Out[72]:
In [73]:
outqueue.qsize()
class Queue(object):
def job_runner(pool, jobqueue, outputs):
"""
jobs: [(command, args)]
outputs: {id: retval}
"""
import sys
jobno = 0
outqueue = {}
jobs = {}
done = []
while True:
try:
fun_args = jobqueue.get_nowait()
function, args, depends
jobs[jobno] = {'func': function, 'args': args, 'depends': depends, 'done': False, 'out': None}
jobno += 1
except multiprocessing.Empty:
pass
if jobs:
for jobno, job_info in jobs.items():
if job_info['depends']:
ready = True
for depend in depends:
if not depend in done:
ready = False
if ready:
job_info['out'] = job_info['func'](job_info['args'])
job_info['done'] = True
output = function(args)
outputs.put(output)
sleep(1)
def submit(self, func, args, depends):
pass
def __init__(self):
self.queue = multiprocessing.Queue()
self.outqueue = multiprocessing.Queue()
self.pool = multiprocessing.Pool()
self.jobno = 0
self.jobs = {}
self.runner = multiprocessing.Process(target=job_runner, args=(self.pool, self.queue, self.outqueue))
In [ ]: