In [1]:
import multiprocessing as mp
from time import sleep
from queue import Empty
import sys

In [128]:
class Job(object):
    def __init__(self, function, args=None, kwargs=None, depends=None):
        self.function = function,
        self.args = args
        self.kwargs = kwargs
        self.depends = depends
        self.jobno = None

In [384]:
def job_runner(jobqueue, outputs, cores=1, jobno=1):
    """
    jobs: [(command, args)]
    outputs: {id: retval}
    """
    import sys
    def output(out):
        """Let's try and explicitly clear the dictionary before sending the output."""
        lastout = outputs.get() if not outputs.empty() else ''
        if out == lastout:
            return
        while not outputs.empty():
            # Clear the output object
            outputs.get()
        outputs.put(out)
    
    # Make sure we have Queue objects
    if not isinstance(jobqueue, mp.queues.Queue) \
            or not isinstance(outputs, mp.queues.Queue):
        raise ClusterError('jobqueue and outputs must be multiprocessing ' +
                           'Queue objects')

    jobno = int(jobno) if jobno else 1
    jobno = jobno-1 if jobno is not 0 else 0
    jobs = {}
    runners = {}
    started = []
    done = []
    pool = mp.Pool(cores)
    
    while True:
        if not jobqueue.empty():
            jobno += 1
            job = jobqueue.get_nowait()
            #function = job.function
            #args = job.args
            #depends = job.depends
            function, args, depends = job
            jobs[jobno] = {'func': function, 'args': args, 'kwargs': None, 'depends': depends, 'state': None, 'out': None}
            output(jobs)
        if jobs:
            for jobno, job_info in jobs.items():
                if job_info['state'] == 'done':
                    continue
                ready = True
                if job_info['depends']:
                    for depend in job_info['depends']:
                        if not depend in done:
                            ready = False
                if ready and not jobno in started:
                    
                    if job_info['args'] and job_info['kwargs']:
                        runners[jobno] = pool.apply_async(job_info['func'], (job_info['args'],), job_info['kwargs'])
                    elif job_info['args']:
                        runners[jobno] = pool.apply_async(job_info['func'], (job_info['args'],))
                    elif job_info['kwargs']:
                        runners[jobno] = pool.apply_async(job_info['func'], kwds=job_info['kwargs'])
                    else:
                        runners[jobno] = pool.apply_async(job_info['func'])
                    job_info['state'] = 'started'
                    started.append(jobno)
                    output(jobs)
                    sleep(0.5)  # Wait for a second to allow job to start
                if job_info['state'] == 'started' and not jobno in done and runners[jobno].ready():
                    job_info['out'] = runners[jobno].get()
                    job_info['state'] = 'done'
                    done.append(jobno)
                    output(jobs)
        sleep(0.5)

In [385]:
queue = mp.Queue()
outqueue = mp.Queue()

In [386]:
runner = mp.Process(target=job_runner, args=(queue, outqueue))

In [387]:
runner.start()

In [388]:
queue.empty()


Out[388]:
True

In [389]:
outqueue.empty()


Out[389]:
True

In [390]:
runner.is_alive()


Out[390]:
True

In [391]:
myjobs = {}

In [392]:
queue.put([jon, 'wjonred', None])

In [393]:
myjobs = update_jobs(myjobs, outqueue)

In [394]:
myjobs


Out[394]:
{1: {'args': 'wjonred',
  'depends': None,
  'func': <function __main__.jon>,
  'kwargs': None,
  'out': 'hi wjonred',
  'state': 'done'}}

In [395]:
outqueue.empty()


Out[395]:
True

In [396]:
queue.put((jon, 'bob', [4]))

In [397]:
queue.put((jon, 'joe', None))

In [398]:
outqueue.empty()


Out[398]:
True

In [399]:
myjobs = update_jobs(myjobs, outqueue)
myjobs


Out[399]:
{1: {'args': 'wjonred',
  'depends': None,
  'func': <function __main__.jon>,
  'kwargs': None,
  'out': 'hi wjonred',
  'state': 'done'},
 2: {'args': 'bob',
  'depends': [4],
  'func': <function __main__.jon>,
  'kwargs': None,
  'out': None,
  'state': None},
 3: {'args': 'joe',
  'depends': None,
  'func': <function __main__.jon>,
  'kwargs': None,
  'out': 'hi joe',
  'state': 'done'}}

Note that job 2 has not executed because it is waiting for job 4, which has not run yet


In [400]:
queue.put([jon, 'done', None])

In [401]:
myjobs = update_jobs(myjobs, outqueue)
myjobs


Out[401]:
{1: {'args': 'wjonred',
  'depends': None,
  'func': <function __main__.jon>,
  'kwargs': None,
  'out': 'hi wjonred',
  'state': 'done'},
 2: {'args': 'bob',
  'depends': [4],
  'func': <function __main__.jon>,
  'kwargs': None,
  'out': 'hi bob',
  'state': 'done'},
 3: {'args': 'joe',
  'depends': None,
  'func': <function __main__.jon>,
  'kwargs': None,
  'out': 'hi joe',
  'state': 'done'},
 4: {'args': 'done',
  'depends': None,
  'func': <function __main__.jon>,
  'kwargs': None,
  'out': 'hi done',
  'state': 'done'}}

In [402]:
runner.is_alive()


Out[402]:
True

In [403]:
outqueue.empty()


Out[403]:
True

In [404]:
myjobs = update_jobs(myjobs, outqueue)
myjobs


Out[404]:
{1: {'args': 'wjonred',
  'depends': None,
  'func': <function __main__.jon>,
  'kwargs': None,
  'out': 'hi wjonred',
  'state': 'done'},
 2: {'args': 'bob',
  'depends': [4],
  'func': <function __main__.jon>,
  'kwargs': None,
  'out': 'hi bob',
  'state': 'done'},
 3: {'args': 'joe',
  'depends': None,
  'func': <function __main__.jon>,
  'kwargs': None,
  'out': 'hi joe',
  'state': 'done'},
 4: {'args': 'done',
  'depends': None,
  'func': <function __main__.jon>,
  'kwargs': None,
  'out': 'hi done',
  'state': 'done'}}

Here is how to make a dependency queue with pool

We have to sleep a lot in this script to allow the queue to update.


In [2]:
def job_runner(cores, jobqueue, outputs):
    """
    jobs: [(command, args)]
    outputs: {id: retval}
    """
    import sys
    def output(out):
        """Let's try and explicitly clear the dictionary before sending the output."""
        lastout = outputs.get() if not outputs.empty() else ''
        if out == lastout:
            return
        while not outputs.empty():
            # Clear the output object
            outputs.get()
        outputs.put(out)
    jobno = 0
    jobs = {}
    runners = {}
    done = []
    pool = mp.Pool(cores)
    while True:
        if not jobqueue.empty():
            fun_args = jobqueue.get_nowait()
            if len(fun_args) == 1:
                function = fun_args[0]
                args = None
                depends = None
            elif len(fun_args) == 2:
                function, args = fun_args
                depends = None
            elif len(fun_args) == 3:
                function, args, depends = fun_args
            else:
                continue
            jobno += 1
            jobs[jobno] = {'func': function, 'args': args, 'depends': depends, 'done': False, 'out': None,
                           'started': False}
            output(jobs)
        if jobs:
            for jobno, job_info in jobs.items():
                if job_info['done']:
                    continue
                ready = True
                if job_info['depends']:
                    for depend in job_info['depends']:
                        if not depend in done:
                            ready = False
                if ready and not job_info['started']:
                    if job_info['args']:
                        runners[jobno] = pool.apply_async(job_info['func'], (job_info['args'],))
                    else:
                        runners[jobno] = pool.apply_async(job_info['func'])
                    job_info['started'] = True
                    output(jobs)
                    sleep(0.5)  # Wait for a second to allow job to start
                if job_info['started'] and not job_info['done'] and runners[jobno].ready():
                    job_info['out'] = runners[jobno].get()
                    job_info['done'] = True
                    done.append(jobno)
                    output(jobs)
                    #if job_info['depends']:
                    #    outputs.put(jobs.copy())
        sleep(0.5)

In [2]:
def update_jobs(jobdict, outqueue):
    sleep(2)
    while not outqueue.empty():    
        jobdict.update(outqueue.get_nowait())
    return jobdict

In [3]:
def jon(string='bob'):
    return 'hi ' + string

In [5]:
queue = mp.Queue()
outqueue = mp.Queue()

In [6]:
runner = mp.Process(target=job_runner, args=(3, queue, outqueue))

In [7]:
runner.start()

In [8]:
queue.empty()


Out[8]:
True

In [9]:
outqueue.empty()


Out[9]:
True

In [10]:
runner.is_alive()


Out[10]:
True

In [11]:
myjobs = {}

In [12]:
queue.put([jon, 'fred', None])

In [13]:
myjobs = update_jobs(myjobs, outqueue)

In [14]:
myjobs


Out[14]:
{1: {'args': 'fred',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi fred',
  'started': True}}

In [15]:
outqueue.empty()


Out[15]:
True

In [16]:
queue.put([jon, 'bob', [4]])

In [17]:
queue.put([jon, 'joe', None])

In [18]:
outqueue.empty()


Out[18]:
True

In [19]:
myjobs = update_jobs(myjobs, outqueue)
myjobs


Out[19]:
{1: {'args': 'fred',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi fred',
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': False},
 3: {'args': 'joe',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi joe',
  'started': True}}

Note that job 2 has not executed because it is waiting for job 4, which has not run yet


In [20]:
queue.put([jon, 'done', None])

In [21]:
myjobs = update_jobs(myjobs, outqueue)
myjobs


Out[21]:
{1: {'args': 'fred',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi fred',
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi bob',
  'started': True},
 3: {'args': 'joe',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi joe',
  'started': True},
 4: {'args': 'done',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi done',
  'started': True}}

In [22]:
runner.is_alive()


Out[22]:
True

In [23]:
outqueue.empty()


Out[23]:
True

In [24]:
myjobs = update_jobs(myjobs, outqueue)
myjobs


Out[24]:
{1: {'args': 'fred',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi fred',
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi bob',
  'started': True},
 3: {'args': 'joe',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi joe',
  'started': True},
 4: {'args': 'done',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi done',
  'started': True}}

Now job 2 and job 4 have both completed. Works perfectly.

Note: It takes a long time for the function to run jobs, so a lot of sleeping is required.

Here is the same idea, but with no pool, it is simpler


In [25]:
def job_runner(cores, jobqueue, outputs):
    """
    jobs: [(command, args)]
    outputs: {id: retval}
    """
    import sys
    def output(out):
        """Let's try and explicitly clear the dictionary before sending the output."""
        lastout = outputs.get() if not outputs.empty() else ''
        if out == lastout:
            return
        sleep(0.5)
        while not outputs.empty():
            # Clear the output object
            outputs.get()
        outputs.put(out)
    jobno = 0
    jobs = {}
    done = []
    while True:
        try:
            fun_args = jobqueue.get_nowait()
            if len(fun_args) == 1:
                function = fun_args[0]
                args = None
                depends = None
            elif len(fun_args) == 2:
                function, args = fun_args
                depends = None
            elif len(fun_args) == 3:
                function, args, depends = fun_args
            else:
                continue
            jobno += 1
            jobs[jobno] = {'func': function, 'args': args, 'depends': depends, 'done': False, 'out': None,
                           'started': False}
            output(jobs)
        except Empty:
            pass
        if jobs:
            for jobno, job_info in jobs.items():
                if job_info['done']:
                    continue
                ready = True
                if job_info['depends']:
                    for depend in job_info['depends']:
                        if not depend in done:
                            ready = False
                if ready:
                    if job_info['args']:
                        job_info['out'] = job_info['func'](job_info['args'])
                    else:
                        job_info['out'] = job_info['func']()
                    job_info['started'] = True
                    job_info['done'] = True
                    done.append(jobno)
                    if job_info['depends']:
                        output(jobs)
        sleep(1)

In [26]:
queue = mp.Queue()
outqueue = mp.Queue()

In [27]:
runner = mp.Process(target=job_runner, args=(3, queue, outqueue))

In [28]:
runner.start()

In [29]:
queue.empty()


Out[29]:
True

In [30]:
outqueue.empty()


Out[30]:
True

In [31]:
runner.is_alive()


Out[31]:
True

In [32]:
myjobs = {}

In [33]:
queue.put([jon, 'fred', None])

In [34]:
myjobs = update_jobs(myjobs, outqueue)

In [35]:
myjobs


Out[35]:
{1: {'args': 'fred',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi fred',
  'started': True}}

In [36]:
myjobs = update_jobs(myjobs, outqueue)
myjobs


Out[36]:
{1: {'args': 'fred',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi fred',
  'started': True}}

In [37]:
queue.put([jon, 'bob', [4]])

In [38]:
queue.put([jon, 'joe', None])

In [39]:
myjobs = update_jobs(myjobs, outqueue)
myjobs


Out[39]:
{1: {'args': 'fred',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi fred',
  'started': True}}

Note that the get command only gets the first dictionary in the stack, it needs to be run twice if two commands are put.


In [40]:
myjobs = update_jobs(myjobs, outqueue)
myjobs


Out[40]:
{1: {'args': 'fred',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi fred',
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': False},
 3: {'args': 'joe',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi joe',
  'started': True}}

In [41]:
queue.put([jon, 'done', None])

In [42]:
myjobs = update_jobs(myjobs, outqueue)
myjobs


Out[42]:
{1: {'args': 'fred',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi fred',
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': False},
 3: {'args': 'joe',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi joe',
  'started': True}}

Note that job 2 has not completed.


In [43]:
myjobs = update_jobs(myjobs, outqueue)
myjobs


Out[43]:
{1: {'args': 'fred',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi fred',
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi bob',
  'started': True},
 3: {'args': 'joe',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi joe',
  'started': True},
 4: {'args': 'done',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi done',
  'started': True}}

Now it has though, we needed to get it a second time, as one get only fetches one successful loop.

Here is an earlier Pool example that does not work, good example of issues with dictionary sharing


In [44]:
def job_runner(cores, jobqueue, outputs):
    """
    jobs: [(command, args)]
    outputs: {id: retval}
    """
    import sys
    jobno = 0
    jobs = {}
    runners = {}
    done = []
    pool = mp.Pool()
    while True:
        try:
            fun_args = jobqueue.get_nowait()
            if len(fun_args) == 1:
                function = fun_args[0]
                args = None
                depends = None
            elif len(fun_args) == 2:
                function, args = fun_args
                depends = None
            elif len(fun_args) == 3:
                function, args, depends = fun_args
            else:
                continue
            jobno += 1
            jobs[jobno] = {'func': function, 'args': args, 'depends': depends, 'done': False, 'out': None,
                           'started': False}
            outputs.put(jobs.copy())
        except Empty:
            pass
        if jobs:
            for jobno, job_info in jobs.items():
                if job_info['done']:
                    continue
                ready = True
                if job_info['depends']:
                    for depend in job_info['depends']:
                        if not depend in done:
                            ready = False
                if ready:
                    if job_info['args']:
                        runners[jobno] = pool.apply_async(job_info['func'], (job_info['args'],))
                    else:
                        runners[jobno] = pool.apply_async(job_info['func'])
                    job_info['started'] = True
                if job_info['started'] and runners[jobno].ready():
                    job_info['out'] = runners[jobno].join()
                    job_info['done'] = True
                    done.append(jobno)
                    outputs.put(jobs.copy())
                    #if job_info['depends']:
                    #    outputs.put(jobs.copy())
        sleep(1)

In [45]:
queue = mp.Queue()
outqueue = mp.Queue()

In [46]:
runner = mp.Process(target=job_runner, args=(3, queue, outqueue))

In [47]:
runner.start()

In [48]:
queue.empty()


Out[48]:
True

In [49]:
outqueue.empty()


Out[49]:
True

In [50]:
runner.is_alive()


Out[50]:
True

In [51]:
queue.put([jon, 'fred', None])

In [52]:
outdict = {}

In [53]:
sleep(2)
try:
    j = outqueue.get_nowait()
except Empty as e:
    print(e)

In [54]:
j


Out[54]:
{1: {'args': 'fred',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True}}

In [55]:
if j:
    outdict.update(j)

In [56]:
outdict


Out[56]:
{1: {'args': 'fred',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True}}

Unexpected behavior

That should have completed, but it didn't, let's try to get a second time to see if it worked yet.


In [57]:
sleep(2)
try:
    j = outqueue.get_nowait()
except Empty as e:
    print(e)




In [58]:
j


Out[58]:
{1: {'args': 'fred',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True}}

Nope Still isn't completing why?


In [59]:
queue.put([jon, 'bob', [4]])

In [60]:
queue.put([jon, 'joe', None])

In [61]:
sleep(1)
j = outqueue.get_nowait()

In [62]:
if j:
    outdict.update(j)

In [63]:
outdict


Out[63]:
{1: {'args': 'fred',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': False}}

It worked here once... but it isn't anymore. I am not sure why. Even after though, it was overwritten with the incomplete entry... maybe the dictionary copying is part of the issue?


In [64]:
sleep(2)
j = outqueue.get_nowait()

In [65]:
if j:
    outdict.update(j)

In [66]:
outdict


Out[66]:
{1: {'args': 'fred',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': False},
 3: {'args': 'joe',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True}}

Unexpected behavior

We have reverted back to the prior dictionary... the completed info for job 1 is gone.


In [67]:
queue.put([jon, 'done', None])

In [68]:
sleep(2)
j = outqueue.get_nowait()

In [69]:
if j:
    outdict.update(j)

In [70]:
outdict


Out[70]:
{1: {'args': 'fred',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': False},
 3: {'args': 'joe',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True},
 4: {'args': 'done',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True}}

No jobs have completed now...


In [71]:
sleep(2)
try:
    j = outqueue.get_nowait()
except Empty as e:
    print(repr(e))


Empty()

In [72]:
j


Out[72]:
{1: {'args': 'fred',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': False},
 3: {'args': 'joe',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True},
 4: {'args': 'done',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True}}

In [73]:
outqueue.qsize()


---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
<ipython-input-73-069b61adc484> in <module>()
----> 1 outqueue.qsize()

/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/queues.py in qsize(self)
    115     def qsize(self):
    116         # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
--> 117         return self._maxsize - self._sem._semlock._get_value()
    118 
    119     def empty(self):

NotImplementedError: 

class Queue(object):

def job_runner(pool, jobqueue, outputs):
    """
    jobs: [(command, args)]
    outputs: {id: retval}
    """
    import sys
    jobno = 0
    outqueue = {}
    jobs = {}
    done = []
    while True:
        try:
            fun_args = jobqueue.get_nowait()
            function, args, depends 
            jobs[jobno] = {'func': function, 'args': args, 'depends': depends, 'done': False, 'out': None}
            jobno += 1
        except multiprocessing.Empty:
            pass
        if jobs:
            for jobno, job_info in jobs.items():
                if job_info['depends']:
                    ready = True
                    for depend in depends:
                        if not depend in done:
                            ready = False
                    if ready:
                        job_info['out'] = job_info['func'](job_info['args'])
                        job_info['done'] = True
        output = function(args)
        outputs.put(output)
        sleep(1)

def submit(self, func, args, depends):
    pass

def __init__(self):
    self.queue = multiprocessing.Queue()
    self.outqueue = multiprocessing.Queue()
    self.pool = multiprocessing.Pool()
    self.jobno = 0
    self.jobs = {}
    self.runner = multiprocessing.Process(target=job_runner, args=(self.pool, self.queue, self.outqueue))

In [ ]: