A critical section is a region of code that should not run in parallel. For example, the increment of a variable is not considered an atomic operation, so, it should be performed using mutual exclusion.
All Python’s built-in data structures (such as lists, dictionaries, etc.) are thread-safe. However, other user's data structures implemented by users, or simpler types like integers and floats, should not be accesed concurrently.
In [21]:
# Two threads that have a critical section executed in parallel without mutual exclusion.
# This code does not work!
import threading
import time
counter = 10
def task_1():
global counter
for i in range(10**6):
counter += 1
def task_2():
global counter
for i in range(10**6+1):
counter -= 1
thread_1 = threading.Thread(target=task_1)
thread_2 = threading.Thread(target=task_2)
thread_1.start()
thread_2.start()
print("(Both threads started)")
thread_1.join()
thread_2.join()
print("\nBoth threads finished")
print('counter =', counter)
The same example, using mutual exclusion (using a lock):
In [22]:
# Two threads that have a critical section executed sequentially.
import threading
import time
lock = threading.Lock()
counter = 10
def task_1():
global counter
for i in range(10**6):
with lock:
counter += 1
def task_2():
global counter
for i in range(10**6+1):
with lock:
counter -= 1
thread_1 = threading.Thread(target=task_1)
thread_2 = threading.Thread(target=task_2)
now = time.perf_counter() # Real time (not only user time)
thread_1.start()
thread_2.start()
print("Both threads started")
thread_1.join()
thread_2.join()
print("Both threads finished")
elapsed = time.perf_counter() - now
print(f"elapsed {elapsed:0.2f} seconds")
print('counter =', counter)
Notice that both tasks are CPU-bound. This means that using threading has not any wall time advantage compared to an iterative implementation of both taks.
In [23]:
# Two processes that have a critical section executed sequentially
import multiprocessing
import time
import ctypes
def task_1(lock, counter):
for i in range(10000):
with lock:
counter.value += 1
def task_2(lock, counter):
for i in range(10001):
with lock:
counter.value -= 1
lock = multiprocessing.Lock()
manager = multiprocessing.Manager()
counter = manager.Value(ctypes.c_int, 10)
process_1 = multiprocessing.Process(target=task_1, args=(lock, counter))
process_2 = multiprocessing.Process(target=task_2, args=(lock, counter))
now = time.perf_counter()
process_1.start()
process_2.start()
print("Both tasks started")
process_1.join()
process_2.join()
print("Both tasks finished")
elapsed = time.perf_counter() - now
print(f"elapsed {elapsed:0.2f} seconds")
print('counter =', counter.value)
Unlike threading, multiprocessing is suitable for reducing the running times in the case of CPU-bound problems.
In [30]:
import asyncio
counter = 10
async def task_1():
global counter
for i in range(10):
print("o", end='', flush=True)
counter += 1
await task_2()
async def task_2():
global counter
print("O", end='', flush=True)
counter -= 1
await task_1()
print('\ncounter =', counter)
In [34]:
import asyncio
import time
counter = 10
async def task_1():
global counter
for i in range(10**6):
counter += 1
await task_2()
async def task_2():
global counter
counter -= 1
now = time.perf_counter()
await task_1()
elapsed = time.perf_counter() - now
print(f"\nelapsed {elapsed:0.2f} seconds")
print('counter =', counter)
Coroutines are faster than threads, but not faster than the one-loop version of the task.
In [37]:
import time
counter = 10
def task():
global counter
for i in range(10**6):
counter += 1
counter -= 1
now = time.perf_counter()
task()
elapsed = time.perf_counter() - now
print(f"\nelapsed {elapsed:0.2f} seconds")
print('counter =', counter)