The simplest way to spawn a second process is to instantiate a Process object with a target function and call start() to let it begin working.


In [1]:
import multiprocessing


def worker():
    """worker function"""
    print('Worker')


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker)
        jobs.append(p)
        p.start()


Worker
Worker
Worker
Worker
Worker

It usually more useful to be able to spawn a process with arguments to tell it what work to do. Unlike with threading, in order to pass arguments to a multiprocessing Process the arguments must be able to be serialized using pickle. This example passes each worker a number to be printed.


In [4]:
import multiprocessing


def worker(num):
    """thread worker function"""
    print('Worker:', num)


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()


Worker: 1
Worker: 2
Worker: 3
Worker: 4
Worker: 0

Importable Target Functions

One difference between the threading and multiprocessing examples is the extra protection for main used in the multiprocessing examples. Due to the way the new processes are started, the child process needs to be able to import the script containing the target function. Wrapping the main part of the application in a check for main ensures that it is not run recursively in each child as the module is imported. Another approach is to import the target function from a separate script. For example, multiprocessing_import_main.py uses a worker function defined in a second module.


In [ ]:
# %load multiprocessing_import_worker.py
def worker():
    """worker function"""
    print('Worker')
    return

In [5]:
import multiprocessing
import multiprocessing_import_worker

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(
            target=multiprocessing_import_worker.worker,
        )
        jobs.append(p)
        p.start()


Worker
Worker
Worker
Worker
Worker

Determining the current process

Passing arguments to identify or name the process is cumbersome, and unnecessary. Each Process instance has a name with a default value that can be changed as the process is created. Naming processes is useful for keeping track of them, especially in applications with multiple types of processes running simultaneously.


In [7]:
import multiprocessing
import time


def worker():
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(2)
    print(name, 'Exiting')


def my_service():
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(3)
    print(name, 'Exiting')


if __name__ == '__main__':
    service = multiprocessing.Process(
        name='my_service',
        target=my_service,
    )
    worker_1 = multiprocessing.Process(
        name='worker 1',
        target=worker,
    )
    worker_2 = multiprocessing.Process(  # default name
        target=worker,
    )

    worker_1.start()
    worker_2.start()
    service.start()


worker 1 Starting
Process-18 Starting
my_service Starting
worker 1 Exiting
Process-18 Exiting
my_service Exiting

Daemon Processes

By default, the main program will not exit until all of the children have exited. There are times when starting a background process that runs without blocking the main program from exiting is useful, such as in services where there may not be an easy way to interrupt the worker, or where letting it die in the middle of its work does not lose or corrupt data (for example, a task that generates “heart beats” for a service monitoring tool).

To mark a process as a daemon, set its daemon attribute to True. The default is for processes to not be daemons.


In [8]:
import multiprocessing
import time
import sys


def daemon():
    p = multiprocessing.current_process()
    print('Starting:', p.name, p.pid)
    sys.stdout.flush()
    time.sleep(2)
    print('Exiting :', p.name, p.pid)
    sys.stdout.flush()


def non_daemon():
    p = multiprocessing.current_process()
    print('Starting:', p.name, p.pid)
    sys.stdout.flush()
    print('Exiting :', p.name, p.pid)
    sys.stdout.flush()


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True

    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False

    d.start()
    time.sleep(1)
    n.start()


Starting: daemon 24598
Starting: non-daemon 24601
Exiting : non-daemon 24601
Exiting : daemon 24598

Waiting for processes

To wait until a process has completed its work and exited, use the join() method.


In [9]:
import multiprocessing
import time
import sys


def daemon():
    name = multiprocessing.current_process().name
    print('Starting:', name)
    time.sleep(2)
    print('Exiting :', name)


def non_daemon():
    name = multiprocessing.current_process().name
    print('Starting:', name)
    print('Exiting :', name)


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True

    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False

    d.start()
    time.sleep(1)
    n.start()

    d.join()
    n.join()


Starting: daemon
Starting: non-daemon
Exiting : non-daemon
Exiting : daemon

By default, join() blocks indefinitely. It is also possible to pass a timeout argument (a float representing the number of seconds to wait for the process to become inactive). If the process does not complete within the timeout period, join() returns anyway.


In [10]:
import multiprocessing
import time
import sys


def daemon():
    name = multiprocessing.current_process().name
    print('Starting:', name)
    time.sleep(2)
    print('Exiting :', name)


def non_daemon():
    name = multiprocessing.current_process().name
    print('Starting:', name)
    print('Exiting :', name)


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True

    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False

    d.start()
    n.start()

    d.join(1)
    print('d.is_alive()', d.is_alive())
    n.join()


Starting: daemon
Starting: non-daemon
Exiting : non-daemon
d.is_alive() True
Exiting : daemon

Terminating Processes

Although it is better to use the poison pill method of signaling to a process that it should exit (see Passing Messages to Processes, later in this chapter), if a process appears hung or deadlocked it can be useful to be able to kill it forcibly. Calling terminate() on a process object kills the child process.


In [11]:
import multiprocessing
import time


def slow_worker():
    print('Starting worker')
    time.sleep(0.1)
    print('Finished worker')


if __name__ == '__main__':
    p = multiprocessing.Process(target=slow_worker)
    print('BEFORE:', p, p.is_alive())

    p.start()
    print('DURING:', p, p.is_alive())

    p.terminate()
    print('TERMINATED:', p, p.is_alive())

    p.join()
    print('JOINED:', p, p.is_alive())


BEFORE: <Process(Process-25, initial)> False
DURING: <Process(Process-25, started)> True
TERMINATED: <Process(Process-25, started)> True
JOINED: <Process(Process-25, stopped[SIGTERM])> False

Process Exit Status

The status code produced when the process exits can be accessed via the exitcode attribute. The ranges allowed are listed in the table below.

Multiprocessing Exit Codes
Exit Code Meaning
== 0 no error was produced
> 0 the process had an error, and exited with that code
< 0 the process was killed with a signal of -1 * exitcode

In [12]:
import multiprocessing
import sys
import time


def exit_error():
    sys.exit(1)


def exit_ok():
    return


def return_value():
    return 1


def raises():
    raise RuntimeError('There was an error!')


def terminated():
    time.sleep(3)


if __name__ == '__main__':
    jobs = []
    funcs = [
        exit_error,
        exit_ok,
        return_value,
        raises,
        terminated,
    ]
    for f in funcs:
        print('Starting process for', f.__name__)
        j = multiprocessing.Process(target=f, name=f.__name__)
        jobs.append(j)
        j.start()

    jobs[-1].terminate()
exit_ok
    for j in jobs:
        j.join()
        print('{:>15}.exitcode = {}'.format(j.name, j.exitcode))


Starting process for exit_error
Starting process for exit_ok
Starting process for return_value
Starting process for raises
Starting process for terminated
Process raises:
Traceback (most recent call last):
  File "/home/scott/anaconda3/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/home/scott/anaconda3/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-12-79ca24877756>", line 19, in raises
    raise RuntimeError('There was an error!')
RuntimeError: There was an error!
     exit_error.exitcode = 1
        exit_ok.exitcode = 0
   return_value.exitcode = 0
         raises.exitcode = 1
     terminated.exitcode = -15

Logging

When debugging concurrency issues, it can be useful to have access to the internals of the objects provided by multiprocessing. There is a convenient module-level function to enable logging called log_to_stderr(). It sets up a logger object using logging and adds a handler so that log messages are sent to the standard error channel.


In [14]:
import multiprocessing
import logging
import sys


def worker():
    print('Doing some work')
    sys.stdout.flush()


if __name__ == '__main__':
    multiprocessing.log_to_stderr(logging.DEBUG)
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()


[INFO/Process-31] child process calling self.run()
Doing some work
[INFO/Process-31] process shutting down
[DEBUG/Process-31] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-31] running the remaining "atexit" finalizers
[INFO/Process-31] process exiting with exitcode 0

By default, the logging level is set to NOTSET so no messages are produced. Pass a different level to initialize the logger to the level of detail desired.

To manipulate the logger directly (change its level setting or add handlers), use get_logger().


In [15]:
import multiprocessing
import logging
import sys


def worker():
    print('Doing some work')
    sys.stdout.flush()


if __name__ == '__main__':
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()


[INFO/Process-32] child process calling self.run()
[INFO/Process-32] child process calling self.run()
Doing some work
[INFO/Process-32] process shutting down
[INFO/Process-32] process shutting down
[INFO/Process-32] process exiting with exitcode 0
[INFO/Process-32] process exiting with exitcode 0

Subclassing Process

Although the simplest way to start a job in a separate process is to use Process and pass a target function, it is also possible to use a custom subclass.


In [16]:
import multiprocessing


class Worker(multiprocessing.Process):

    def run(self):
        print('In {}'.format(self.name))
        return


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Worker()
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()


[INFO/Worker-33] child process calling self.run()
[INFO/Worker-33] child process calling self.run()
[INFO/Worker-34] child process calling self.run()
In Worker-33
[INFO/Worker-34] child process calling self.run()
[INFO/Worker-33] process shutting down
[INFO/Worker-33] process shutting down
[INFO/Worker-33] process exiting with exitcode 0
[INFO/Worker-35] child process calling self.run()
[INFO/Worker-35] child process calling self.run()
[INFO/Worker-33] process exiting with exitcode 0
In Worker-35
[INFO/Worker-36] child process calling self.run()
[INFO/Worker-37] child process calling self.run()
[INFO/Worker-35] process shutting down
[INFO/Worker-36] child process calling self.run()
[INFO/Worker-35] process shutting down
[INFO/Worker-37] child process calling self.run()
[INFO/Worker-35] process exiting with exitcode 0
In Worker-37
[INFO/Worker-35] process exiting with exitcode 0
[INFO/Worker-37] process shutting down
[INFO/Worker-37] process shutting down
[INFO/Worker-37] process exiting with exitcode 0
[INFO/Worker-37] process exiting with exitcode 0
In Worker-36
[INFO/Worker-36] process shutting down
[INFO/Worker-36] process shutting down
[INFO/Worker-36] process exiting with exitcode 0
[INFO/Worker-36] process exiting with exitcode 0
In Worker-34
[INFO/Worker-34] process shutting down
[INFO/Worker-34] process shutting down
[INFO/Worker-34] process exiting with exitcode 0
[INFO/Worker-34] process exiting with exitcode 0

Handle exception in child-process

In normal case, the traceback is not enough for debugging. because the real code cause the exception is missing


In [3]:
from multiprocessing import Pool
 
def go(x):
    ret = 0.
    for i in range(x+1):
        ret += 1./(5-i)
    return ret
 
def main():
    pool = Pool(processes=4)  
    print(pool.map(go, range(10)))
 
if __name__ == "__main__":
    main()


---------------------------------------------------------------------------
RemoteTraceback                           Traceback (most recent call last)
RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/scott/anaconda3/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/home/scott/anaconda3/lib/python3.6/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "<ipython-input-3-c8c98990b81a>", line 6, in go
    ret += 1./(5-i)
ZeroDivisionError: float division by zero
"""

The above exception was the direct cause of the following exception:

ZeroDivisionError                         Traceback (most recent call last)
<ipython-input-3-c8c98990b81a> in <module>()
     12 
     13 if __name__ == "__main__":
---> 14     main()

<ipython-input-3-c8c98990b81a> in main()
      9 def main():
     10     pool = Pool(processes=4)
---> 11     print(pool.map(go, range(10)))
     12 
     13 if __name__ == "__main__":

/home/scott/anaconda3/lib/python3.6/multiprocessing/pool.py in map(self, func, iterable, chunksize)
    258         in a list that is returned.
    259         '''
--> 260         return self._map_async(func, iterable, mapstar, chunksize).get()
    261 
    262     def starmap(self, func, iterable, chunksize=None):

/home/scott/anaconda3/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
    606             return self._value
    607         else:
--> 608             raise self._value
    609 
    610     def _set(self, i, obj):

ZeroDivisionError: float division by zero

use traceback module to remedy the issue


In [6]:
import functools
import traceback
import sys
 
def get_traceback(f):
    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        try:
            return f(*args, **kwargs)
        except Exception as ex:
            ret = '#' * 60
            ret += "\nException caught:"
            ret += "\n"+'-'*60
            ret += "\n" + traceback.format_exc()
            ret += "\n" + '-' * 60
            ret += "\n"+ "#" * 60
            print(sys.stderr, ret)
            sys.stderr.flush()
            raise ex
 
    return wrapper

from multiprocessing import Pool
 
@get_traceback
def go(x):
    ret = 0.
    for i in range(x+1):
        ret += 1./(5-i)
    return ret
 
def main():
    pool = Pool(processes=4)  
    print(pool.map(go, range(10)))
 
if __name__ == "__main__":
    main()


<ipykernel.iostream.OutStream object at 0x7fa3415b27b8> ############################################################
Exception caught:
------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-6-e59c8bc0d167>", line 9, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-6-e59c8bc0d167>", line 29, in go
    ret += 1./(5-i)
ZeroDivisionError: float division by zero

------------------------------------------------------------
############################################################<ipykernel.iostream.OutStream object at 0x7fa3415b27b8> ############################################################
Exception caught:
------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-6-e59c8bc0d167>", line 9, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-6-e59c8bc0d167>", line 29, in go
    ret += 1./(5-i)
ZeroDivisionError: float division by zero

------------------------------------------------------------
############################################################<ipykernel.iostream.OutStream object at 0x7fa3415b27b8> ############################################################
Exception caught:
------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-6-e59c8bc0d167>", line 9, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-6-e59c8bc0d167>", line 29, in go
    ret += 1./(5-i)
ZeroDivisionError: float division by zero

------------------------------------------------------------
############################################################

<ipykernel.iostream.OutStream object at 0x7fa3415b27b8> ############################################################
Exception caught:
------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-6-e59c8bc0d167>", line 9, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-6-e59c8bc0d167>", line 29, in go
    ret += 1./(5-i)
ZeroDivisionError: float division by zero

------------------------------------------------------------
############################################################
<ipykernel.iostream.OutStream object at 0x7fa3415b27b8> ############################################################
Exception caught:
------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-6-e59c8bc0d167>", line 9, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-6-e59c8bc0d167>", line 29, in go
    ret += 1./(5-i)
ZeroDivisionError: float division by zero

------------------------------------------------------------
############################################################

---------------------------------------------------------------------------
RemoteTraceback                           Traceback (most recent call last)
RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/scott/anaconda3/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/home/scott/anaconda3/lib/python3.6/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "<ipython-input-6-e59c8bc0d167>", line 19, in wrapper
    raise ex
  File "<ipython-input-6-e59c8bc0d167>", line 9, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-6-e59c8bc0d167>", line 29, in go
    ret += 1./(5-i)
ZeroDivisionError: float division by zero
"""

The above exception was the direct cause of the following exception:

ZeroDivisionError                         Traceback (most recent call last)
<ipython-input-6-e59c8bc0d167> in <module>()
     35 
     36 if __name__ == "__main__":
---> 37     main()

<ipython-input-6-e59c8bc0d167> in main()
     32 def main():
     33     pool = Pool(processes=4)
---> 34     print(pool.map(go, range(10)))
     35 
     36 if __name__ == "__main__":

/home/scott/anaconda3/lib/python3.6/multiprocessing/pool.py in map(self, func, iterable, chunksize)
    258         in a list that is returned.
    259         '''
--> 260         return self._map_async(func, iterable, mapstar, chunksize).get()
    261 
    262     def starmap(self, func, iterable, chunksize=None):

/home/scott/anaconda3/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
    606             return self._value
    607         else:
--> 608             raise self._value
    609 
    610     def _set(self, i, obj):

ZeroDivisionError: float division by zero

In [ ]: