Skip to main content

Concurrency/Parallelism

Coroutine

Coroutines are a more generalized form of subroutines. Subroutines are entered at one point and exited at another point. Coroutines can be entered, exited, and resumed at many different points. They can be implemented with the async def statement.

-- Python Glossary

Python implemented first-class support for coroutines and asynchronous programming as of version 3.5 when the keywords async and await were explicitly added to the language.

Coroutine allows execution of a code block to be suspended (for reasons such as running another coroutine, wait for external resource, etc.) and carry on from where it left when control re-enters, meaning coroutine has control when it will suspend and resume. As multiple coroutines can be created and executed at the same time, this characteristics allows cooperative mulitasking since coroutines yield control proactively, as opposed to preemptive multitasking used in multithreading.

On contrary, subroutine starts on invoke and only exits when it's finished, meaning subroutine only returns once while coroutine might return multiple times.

In short, subroutine is a special case of coroutine.

And functions that returns coroutine object are called coroutine functions, which may be defined with the async def statement, and may contain await, async for, async with and yield keywords.

Check PEP 492 for details.

Currently, there are two implementations of coroutine:

  1. Native coroutine: Functions declared with async def syntax
  2. Generator-based coroutine: Functions decorated with @asyncio.coroutine decorator, removed from 3.11

Awaitables

An awaitable is an object that can be used after an await expression. There are three main types of awaitable objects: coroutines, Tasks, and Futures.

Note that different asyncio APIs might accept different kinds of awaitable.

  • Coroutine

    As mentioned in previous section, a coroutine can be returned by a function declared with async def syntax.

  • Task

    There is a specific object named Task in the asyncio library, which is a Future-like object that runs a Python coroutine in a not thread-safe manner.

    Event loops use cooperative scheduling: an event loop runs one Task at a time. While a Task awaits for the completion of a Future, the event loop runs other Tasks, callbacks, or performs IO operations.

    We can think of task as a wrapper over coroutines, and that's why a coroutine is passed into the asyncio.create_task function to create a task.

    import asyncio

    async def foo():
    print('foo')
    await asyncio.sleep(1)
    print('boo')

    loop = asyncio.get_event_loop()
    task = loop.create_task(foo())
    loop.run_until_complete(task)

    However, tasks are used to schedule coroutines concurrently, a coroutine wrapped into a task with functions asyncio.create_task() is automatically scheduled to run by the event loop. Once scheduled, it will be executed even not awaited, but might have no effect if cancelled.

    import asyncio

    async def foo():
    print('inside foo')
    try:
    return 'foo'
    except asyncio.CancelledError:
    print('cancelled')
    raise

    async def no_await_main():
    task = asyncio.create_task(foo())

    async def awaited_main():
    task = asyncio.create_task(foo())
    result = await task
    print(result)

    async def cancelled_main():
    task = asyncio.create_task(foo())
    task.cancel() # task cancelled before ever get a change to execute
    result = await task
    try:
    await task
    except asyncio.CancelledError:
    print('main(): cancel_me is cancelled now')

    asyncio.run(no_await_main()) # this print 'inside foo'
    asyncio.run(awaited_main()) # this print 'inside foo' and 'foo'
    asyncio.run(cancelled_main()) # this raises exception
  • Future An asynio.Future represents an eventual result of an asynchronous operation. Not thread-safe.

Tasks are used to run coroutines in event loops. If a coroutine awaits on a Future, the Task suspends the execution of the coroutine and waits for the completion of the Future. When the Future is done, the execution of the wrapped coroutine resumes.

Future object is a low-level awaitable object compared to others since Task inherints from the Future object. So for most of the time, devs don't have the need to usea Future object.

❗ concurrent.futures.Future v.s asyncio.Future
The `asyncio.Future` object was designed to mimic c`oncurrent.futures.Future`. Key differences include:
  1. unlike asyncio Futures, concurrent.futures.Future instances cannot be awaited.

  2. asyncio.Future.result() and asyncio.Future.exception() do not accept the timeout argument.

  3. asyncio.Future.result() and asyncio.Future.exception() raise an InvalidStateError exception when the Future is not done.

  4. Callbacks registered with asyncio.Future.add_done_callback() are not called immediately. They are scheduled with loop.call_soon() instead.

  5. asyncio Future is not compatible with the concurrent.futures.wait() and concurrent.futures.as_completed() functions.

  6. asyncio.Future.cancel() accepts an optional msg argument, but concurrent.futures.cancel() does not.

asyncio.gather

Event loop

As the name implies, Event loop is essentially a loop running forever until explicitly stopped. It is responsible for running asynchronous tasks and callbacks, performing network I/O operations, and run subprocesses.

Examples

CLI Spinner

  • async Implementation
# https://github.com/fluentpython/example-code-2e/blob/master/19-concurrency/spinner_async.py

# credits: Example by Luciano Ramalho inspired by
# Michele Simionato's multiprocessing example in the python-list:
# https://mail.python.org/pipermail/python-list/2009-February/675659.html

# tag::SPINNER_ASYNC_TOP[]
import asyncio
import itertools

async def spin(msg: str) -> None: # <1>
for char in itertools.cycle(r'\|/-'):
status = f'\r{char} {msg}'
print(status, flush=True, end='')
try:
await asyncio.sleep(.1) # <2>
except asyncio.CancelledError: # <3>
break
blanks = ' ' * len(status)
print(f'\r{blanks}\r', end='')

async def slow() -> int:
await asyncio.sleep(3) # <4>
return 42
# end::SPINNER_ASYNC_TOP[]

# tag::SPINNER_ASYNC_START[]
def main() -> None: # <1>
result = asyncio.run(supervisor()) # <2>
print(f'Answer: {result}')

async def supervisor() -> int: # <3>
spinner = asyncio.create_task(spin('thinking!')) # <4>
print(f'spinner object: {spinner}') # <5>
result = await slow() # <6>
spinner.cancel() # <7>
return result

if __name__ == '__main__':
main()
# end::SPINNER_ASYNC_START[]
  • Process Implementation
# https://github.com/fluentpython/example-code-2e/blob/master/19-concurrency/spinner_proc.py

# credits: Adapted from Michele Simionato's
# multiprocessing example in the python-list:
# https://mail.python.org/pipermail/python-list/2009-February/675659.html

# tag::SPINNER_PROC_IMPORTS[]
import itertools
import time
from multiprocessing import Process, Event # <1>
from multiprocessing import synchronize # <2>

def spin(msg: str, done: synchronize.Event) -> None: # <3>
# end::SPINNER_PROC_IMPORTS[]
for char in itertools.cycle(r'\|/-'):
status = f'\r{char} {msg}'
print(status, end='', flush=True)
if done.wait(.1):
break
blanks = ' ' * len(status)
print(f'\r{blanks}\r', end='')

def slow() -> int:
time.sleep(3)
return 42

# tag::SPINNER_PROC_SUPER[]
def supervisor() -> int:
done = Event()
spinner = Process(target=spin, # <4>
args=('thinking!', done))
print(f'spinner object: {spinner}') # <5>
spinner.start()
result = slow()
done.set()
spinner.join()
return result
# end::SPINNER_PROC_SUPER[]

def main() -> None:
result = supervisor()
print(f'Answer: {result}')


if __name__ == '__main__':
main()
  • thread Implementation
# https://github.com/fluentpython/example-code-2e/blob/master/19-concurrency/spinner_thread.py

# credits: Adapted from Michele Simionato's
# multiprocessing example in the python-list:
# https://mail.python.org/pipermail/python-list/2009-February/675659.html

# tag::SPINNER_THREAD_TOP[]
import itertools
import time
from threading import Thread, Event

def spin(msg: str, done: Event) -> None: # <1>
for char in itertools.cycle(r'\|/-'): # <2>
status = f'\r{char} {msg}' # <3>
print(status, end='', flush=True)
if done.wait(.1): # <4>
break # <5>
blanks = ' ' * len(status)
print(f'\r{blanks}\r', end='') # <6>

def slow() -> int:
time.sleep(3) # <7>
return 42
# end::SPINNER_THREAD_TOP[]

# tag::SPINNER_THREAD_REST[]
def supervisor() -> int: # <1>
done = Event() # <2>
spinner = Thread(target=spin, args=('thinking!', done)) # <3>
print(f'spinner object: {spinner}') # <4>
spinner.start() # <5>
result = slow() # <6>
done.set() # <7>
spinner.join() # <8>
return result

def main() -> None:
result = supervisor() # <9>
print(f'Answer: {result}')

if __name__ == '__main__':
main()
# end::SPINNER_THREAD_REST[]

Deep Dive

Using asyncio.sleep(0)

References