4  Asynchronous Programming with Python

One programming model to rule them all


François-David Collin



Paul-ValΓ©ry Montpellier 3 University

Ghislain Durif



5 Asynchronous, Basics

5.1 What is Asynchronous Programming?

  • Asynchronous programming is a programming paradigm that allows the program to continue executing other tasks before the current task is finished.
  • It is a way to achieve concurrency in a program.

\Rightarrow it is an abstraction over concurrency, it does not necessarily mean that the program is executed in parallel.

5.2 I/O Bound vs. CPU Bound

import requests
1response = requests.get('https://www.example.com')
items = response.headers.items()
2headers = [f'{key}: {header}' for key, header in items]
3formatted_headers = '\n'.join(headers)
with open('headers.txt', 'w') as file:
4    file.write(formatted_headers)
I/O-bound web request
CPU-bound response processing
CPU-bound string concatenation
I/O-bound write to disk

6 Concurrency, parallelism and multitasking

We will use extensively the bakery metaphor.

6.1 Concurrency vs. Parallelism

One baker and two cakes to prepare.

  • Can preheat the oven while preparing the first cake.
  • Can start the second cake while the first one is in the oven.

\Rightarrow Switching between tasks is concurrency (or concurrent behavior).

Two bakers and two cakes to prepare.

  • Can prepare both cakes at the same time.

\Rightarrow Doing multiple tasks in parallel is parallelism (or parallel behavior).

With concurrency, we have multiple tasks happening at the same time, but only one we’re actively doing at a given point in time. With parallelism, we have multiple tasks happening and are actively doing more than one simultaneously.

From [1]

With concurrency, we switch between running two applications. With parallelism, we actively run two applications simultaneously.

From [1]

  • Concurrency is about multiple independent tasks that can happen.
  • Parallelism is concurrency AND simultaneous execution.

While parallelism implies concurrency, concurrency does not always imply parallelism.

\Rightarrow Concurrency is a broader concept than parallelism.

6.2 Multitasking

6.2.1 Preemptive multitasking

  • The operating system decides when to switch between tasks.
  • The tasks are not aware of each other.

6.2.2 Cooperative multitasking

  • In this model we have to explicitly to decide when to switch between tasks.
  • The tasks are aware of each other.

6.3 Benefits of cooperative multitasking

  • Less overhead than preemptive multitasking.
  • Granular/optimal control over when to switch between tasks.

7 Processes, threads, multithreading, and multiprocessing

7.1 Multi-processing vs Multi-threading

                   Main Process
                 β”‚             β”‚
                 β”‚   CPU       β”‚
                 β”‚             β”‚
                 β”‚   Memory    β”‚
                   β”‚    β”‚    β”‚
                   β”‚    β”‚    β”‚
       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚    └───────────┐
       β”‚                β”‚                β”‚
β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”
β”‚             β”‚  β”‚             β”‚  β”‚             β”‚
β”‚   CPU       β”‚  β”‚   CPU       β”‚  β”‚   CPU       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€  β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€  β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚             β”‚  β”‚             β”‚  β”‚             β”‚
β”‚   Memory    β”‚  β”‚   Memory    β”‚  β”‚   Memory    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

   Process 1        Process 1        Process 1
β”‚            MAIN PROCESS              β”‚
β”‚                                      β”‚
β”‚                                      β”‚
β”‚     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                     β”‚
β”‚     β”‚          β”‚                     β”‚
β”‚     β”‚   CPU    β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚     β”‚          β”‚   β”‚           β”‚     β”‚
β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   Memory  β”‚     β”‚
β”‚                    β”‚           β”‚     β”‚
β”‚                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                      β”‚
β”‚                                      β”‚
β”‚                                      β”‚
β”‚      β”Œβ”€β”         β”Œβ”€β”         β”Œβ”€β”     β”‚
β”‚      β”‚β”Όβ”‚         β”‚β”Όβ”‚         β”‚β”Όβ”‚     β”‚
β”‚      β”‚β”΄β”‚         β”‚β”΄β”‚         β”‚β”΄β”‚     β”‚
β”‚      β–Όβ–Όβ–Ό         β–Όβ–Όβ–Ό         β–Όβ–Όβ–Ό     β”‚
β”‚   Thread 1    Thread 2    Thread 3   β”‚
β”‚      β”Œβ”€β”         β”Œβ”€β”         β”Œβ”€β”     β”‚
β”‚      β”‚β”Όβ”‚         β”‚β”Όβ”‚         β”‚β”Όβ”‚     β”‚
β”‚      β”‚β”΄β”‚         β”‚β”΄β”‚         β”‚β”΄β”‚     β”‚
β”‚      β–Όβ–Όβ–Ό         β–Όβ–Όβ–Ό         β–Όβ–Όβ–Ό     β”‚
β”‚                                      β”‚

7.2 Processes and threads

import os
import threading
print(f'Python process running with process id: {os.getpid()}')
total_threads = threading.active_count()
thread_name = threading.current_thread().name
print(f'Python is currently running {total_threads} thread(s)')
print(f'The current thread is {thread_name}')
Python process running with process id: 132369
Python is currently running 8 thread(s)
The current thread is MainThread

7.3 Creating processes

import multiprocessing
import os
def hello_from_process():
    print(f'Hello from child process {os.getpid()}!')
if __name__ == '__main__':
    hello_process = multiprocessing.Process(target=hello_from_process)
    print(f'Hello from parent process {os.getpid()}')
Hello from child process 132419!
Hello from parent process 132369

7.4 Creating threads

import threading
def hello_from_thread():
    print(f'Hello from thread {threading.current_thread()}!')
hello_thread = threading.Thread(target=hello_from_thread)
total_threads = threading.active_count()
thread_name = threading.current_thread().name
print(f'Python is currently running {total_threads} thread(s)')
print(f'The current thread is {thread_name}')
Hello from thread <Thread(Thread-6 (hello_from_thread), started 140617705440832)>!
Python is currently running 8 thread(s)
The current thread is MainThread

8 And all hell broke loose: the GIL

8.1 What about Python?

  • Designed for sequential and single-core architecture from the beginning
  • Everything is interpreted
  • All dynamic (no static types)

8.2 The GIL

Aka Global Interpreter Lock

. . .

  • The GIL allows thread usage, you can create threads and launch them: YES!

. . .

  • but…

. . .

  • Only ONE thread can actually execute code at python level..

8.3 Multi-threaded != Parallel execution

Multi-threading doesn’t guarantee parallel executien…

\Longrightarrow Python seems to have started off with the wrong foot by a long way…

8.4 High performance Python 😬

But wait!

  1. Actually we can run (real) parallel programs with the multiprocessing package.

    \Rightarrow But this is an β€œOS level” multiprocessing, with associated huge overhead (relatively)

  2. Python actually releases the GIL when executing everything that is not Python code (e.g. C/C++ extensions and libraries)

    \Rightarrow It means we can parallelize our code by using I/O bound and CPU bound libraries that release the GIL (this is the case for most of them)

9 Single-threaded asynchronous programming with asyncio

9.1 Socket

Writing bytes to a socket and reading bytes from a socket

From [1]

  • This a mailbox metaphor
  • By default, the socket is blocking, i.e. the program will wait until the socket is ready to be read or written.
  • We can make the socket non-blocking, i.e. the program will not wait for the socket to be ready to be read or written. \Rightarrow Later on, the OS will tell us we received byte and we deal with it.

  • Making a non-blocking I/O request returns immediately
  • tells the O/S to watch sockets for data \Rightarrow This allows execute_other_code() to run right away instead of waiting for the I/O requests to finish
  • Later, we can be alerted when I/O is complete and process the response.

From [1]

9.2 Event Loop

from collections import deque
messages = deque()
while True:
    if messages:
        message = messages.pop()
  • The event loop is a loop that runs forever.
  • It checks if there are any messages to process.
  • If there are, it processes them.
  • If there are not, it waits for messages to arrive.

\Rightarrow In asyncio, the event loop is queue of tasks instead of messages, Tasks are wrapped coroutines.

def make_request():
task_one = make_request()
task_two = make_request()
task_three = make_request()

10 asyncio Coroutines

To define a coroutine, we use the async def syntax.

async def my_coroutine() -> None
    print('Hello world!')

10.1 What is it?

async def coroutine_add_one(number: int) -> int:
    return number + 1
def add_one(number: int) -> int:
    return number + 1
1function_result = add_one(1)
2coroutine_result = coroutine_add_one(1)
print(f'Function result is {function_result}\n\
    and the type is {type(function_result)}')
print(f'Coroutine result is {coroutine_result}\n\
    and the type is {type(coroutine_result)}')
function call, is executed immediately.
coroutine call, is not executed at all, but returns a coroutine object.
Function result is 2
    and the type is <class 'int'>
Coroutine result is <coroutine object coroutine_add_one at 0x7fe3fdddb640>
    and the type is <class 'coroutine'>

From [1]

10.2 How to execute a coroutine?

You need an event loop.

import asyncio
async def coroutine_add_one(number: int) -> int:
    return number + 1
1result = asyncio.run(coroutine_add_one(1))

This launches the event loop, executes the coroutine, and returns the result.

This code will not work in a Jupyter notebook, because the event loop is already running (by Jupyter itself). So you just have to replace the line 4 by:

result = await coroutine_add_one(1)

10.3 await keyword

import asyncio
async def add_one(number: int) -> int:
    return number + 1
async def main() -> None:
1    one_plus_one = await add_one(1)
2    two_plus_one = await add_one(2)
3await main()
Pause, and wait for the result of add_one(1).
Pause, and wait for the result of add_one(2).
Pause, and wait for the result of main(). (outside of a Jupyter notebook, you have to launch the event loop somewhere, like asyncio.run(main()) instead of await main())

From [1]

10.4 Simulating the real thing with asyncio.sleep

import asyncio
async def hello_world_message() -> str:
1    await asyncio.sleep(1)
    return 'Hello World!'
async def main() -> None:
2    message = await hello_world_message()
await main()
Pause hello_world_message for 1 second.
Pause main until hello_world_message is finished.
Hello World!

10.5 Utility function delay(seconds)

import asyncio
1async def delay(delay_seconds: int) -> int:
2    print(f'sleeping for {delay_seconds} second(s)')
    await asyncio.sleep(delay_seconds)
    print(f'finished sleeping for {delay_seconds} second(s)')
3    return delay_seconds
Takes an integer of the duration in seconds that we’d like the function to sleep.
Prints when sleep begins and ends.
Returns that integer to the caller once it has finished sleeping.

10.6 Running two coroutines

import asyncio
async def add_one(number: int) -> int:
    return number + 1
async def hello_world_message() -> str:
    await delay(1)
    return 'Hello World!'
async def main() -> None:
1    message = await hello_world_message()
2    one_plus_one = await add_one(1)
await main()
Pause main until hello_world_message is finished.
Pause main until add_one is finished.
sleeping for 1 second(s)
finished sleeping for 1 second(s)
Hello World!

From [1]

10.7 What to do next?

Moving away from sequential execution and run add_one and hello_world_message concurrently.

For that we need…

11 Tasks

So far we just learned how to create coroutines and put then in the event loop.

Tasks are a way to schedule coroutines concurrently.

\Rightarrow Tasks are wrapped coroutines which are scheduled to run in the event loop as soon as possible.

11.1 Creating tasks

import asyncio

async def main():
    sleep_for_three = asyncio.create_task(delay(3))
    result = await sleep_for_three
await main()
<class '_asyncio.Task'>
sleeping for 3 second(s)
finished sleeping for 3 second(s)
  • the coroutine is scheduled to run in the event loop as soon as possible.
  • before, it was just run at the await statement (pausing the caller).

11.2 Running tasks concurrently

import asyncio
async def main():
    sleep_for_three = \
    sleep_again = \
    sleep_once_more = \
    await sleep_for_three
    await sleep_again
    await sleep_once_more

await main()
sleeping for 3 second(s)
sleeping for 3 second(s)
sleeping for 3 second(s)
finished sleeping for 3 second(s)
finished sleeping for 3 second(s)
finished sleeping for 3 second(s)

From [1]

import asyncio
async def hello_every_second():
    for i in range(2):
        await asyncio.sleep(1)
        print("I'm running other code while I'm waiting!")
async def main():
    first_delay = asyncio.create_task(delay(3))
    second_delay = asyncio.create_task(delay(3))
    await hello_every_second()
    await first_delay
    await second_delay

await main()
sleeping for 3 second(s)
sleeping for 3 second(s)
I'm running other code while I'm waiting!
I'm running other code while I'm waiting!
finished sleeping for 3 second(s)
finished sleeping for 3 second(s)

From [1]

11.3 Canceling tasks

import asyncio
from asyncio import CancelledError

async def main():
    long_task = asyncio.create_task(delay(10))
    seconds_elapsed = 0
    while not long_task.done():
        print('Task not finished, checking again in a second.')
        await asyncio.sleep(1)
        seconds_elapsed = seconds_elapsed + 1
        if seconds_elapsed == 5:
        await long_task
    except CancelledError:
        print('Our task was cancelled')
await main()
Task not finished, checking again in a second.
sleeping for 10 second(s)
Task not finished, checking again in a second.
Task not finished, checking again in a second.
Task not finished, checking again in a second.
Task not finished, checking again in a second.
Task not finished, checking again in a second.
Our task was cancelled

11.4 Setting a timeout

import asyncio

async def main():
    delay_task = asyncio.create_task(delay(2))
        result = await asyncio.wait_for(delay_task, timeout=1)
    except asyncio.exceptions.TimeoutError:
        print('Got a timeout!')
        print(f'Was the task cancelled? {delay_task.cancelled()}')
await main()
sleeping for 2 second(s)
Got a timeout!
Was the task cancelled? True

12 Tasks, coroutines, futures, and awaitables

12.1 Introducing futures

from asyncio import Future
my_future = Future()
print(f'Is my_future done? {my_future.done()}')
print(f'Is my_future done? {my_future.done()}')
print(f'What is the result of my_future? {my_future.result()}')
Is my_future done? False
Is my_future done? True
What is the result of my_future? 42

12.2 Awaiting futures

from asyncio import Future
import asyncio
def make_request() -> Future:
    future = Future()
1    asyncio.create_task(set_future_value(future))
    return future
async def set_future_value(future) -> None:
2    await asyncio.sleep(1)
async def main():
    future = make_request()
    print(f'Is the future done? {future.done()}')
3    value = await future
    print(f'Is the future done? {future.done()}')
await main()
Create a task to asynchronously set the value of the future.
Wait 1 second before setting the value of the future.
Pause main until the future’s value is set.
Is the future done? False
Is the future done? True

12.3 Comparing tasks, coroutines, futures, and awaitables

Objects that can be awaited in an async function, including coroutines, tasks, and futures.
Special functions that can be paused and resumed later, defined using async def, and can be awaited to allow other coroutines to run.
Represent the result of an asynchronous operation, manage its state, and can be awaited to get the result.
Schedule and run coroutines concurrently, and can be used to cancel or check their status.

13 Benchmarking

13.1 With a decorator

import functools
import time
from typing import Callable, Any
def async_timed():
    def wrapper(func: Callable) -> Callable:
        async def wrapped(*args, **kwargs) -> Any:
            print(f'starting {func} with args {args} {kwargs}')
            start = time.time()
                return await func(*args, **kwargs)
                end = time.time()
                total = end - start
                print(f'finished {func} in {total:.4f} second(s)')
        return wrapped
    return wrapper

Official Python documentation for decorators

  • add functionality to an existing function
  • without modifying the function itself
  • it intercepts the function call and runs β€œdecorated” code before and after it

13.2 Using it

import asyncio
async def delay(delay_seconds: int) -> int:
    print(f'sleeping for {delay_seconds} second(s)')
    await asyncio.sleep(delay_seconds)
    print(f'finished sleeping for {delay_seconds} second(s)')
    return delay_seconds
async def main():
    task_one = asyncio.create_task(delay(2))
    task_two = asyncio.create_task(delay(3))
    await task_one
    await task_two

await main()
starting <function main at 0x7fe3fdc84d60> with args () {}
starting <function delay at 0x7fe3fdc86c00> with args (2,) {}
sleeping for 2 second(s)
starting <function delay at 0x7fe3fdc86c00> with args (3,) {}
sleeping for 3 second(s)
finished sleeping for 2 second(s)
finished <function delay at 0x7fe3fdc86c00> in 2.0032 second(s)
finished sleeping for 3 second(s)
finished <function delay at 0x7fe3fdc86c00> in 3.0017 second(s)
finished <function main at 0x7fe3fdc84d60> in 3.0018 second(s)

13.3 asyncio.gather

asyncio.gather() runs multiple asynchronous operations, wraps a coroutine as a task, and returns a list of results in the same order of awaitables.

import asyncio

async def call_api(message, result, delay=3):
    await asyncio.sleep(delay)
    return result

async def main():
    return await asyncio.gather(
        call_api('Calling API 1 ...', 1),
        call_api('Calling API 2 ...', 2)

await main()
Calling API 1 ...
Calling API 2 ...
[1, 2]

asyncio.gather takes a tuple of awaitables, not a list of awaitables, but returns a list of results in the same order of awaitables.

If you want to pass a list, use the * operator to unpack it as a tuple.

14 Pitfalls of asynchronous programming

14.1 Running CPU-bound code

import asyncio

async def cpu_bound_work() -> int:
    counter = 0
    for i in range(100000000):
        counter = counter + 1
    return counter
async def main():
    task_one = asyncio.create_task(cpu_bound_work())
    task_two = asyncio.create_task(cpu_bound_work())
    await task_one
    await task_two
await main()
starting <function main at 0x7fe3fdc87c40> with args () {}
starting <function cpu_bound_work at 0x7fe3fdc87a60> with args () {}
finished <function cpu_bound_work at 0x7fe3fdc87a60> in 3.7527 second(s)
starting <function cpu_bound_work at 0x7fe3fdc87a60> with args () {}
finished <function cpu_bound_work at 0x7fe3fdc87a60> in 4.1736 second(s)
finished <function main at 0x7fe3fdc87c40> in 7.9269 second(s)

14.2 Running blocking APIs

import asyncio
import requests
async def get_example_status() -> int:
    return requests.get('http://www.example.com').status_code
async def main():
    task_1 = asyncio.create_task(get_example_status())
    task_2 = asyncio.create_task(get_example_status())
    task_3 = asyncio.create_task(get_example_status())
    await task_1
    await task_2
    await task_3
await main()
starting <function main at 0x7fe4559e0ae0> with args () {}
starting <function get_example_status at 0x7fe3fdc86e80> with args () {}
finished <function get_example_status at 0x7fe3fdc86e80> in 0.3771 second(s)
starting <function get_example_status at 0x7fe3fdc86e80> with args () {}
finished <function get_example_status at 0x7fe3fdc86e80> in 0.4965 second(s)
starting <function get_example_status at 0x7fe3fdc86e80> with args () {}
finished <function get_example_status at 0x7fe3fdc86e80> in 0.4975 second(s)
finished <function main at 0x7fe4559e0ae0> in 1.3716 second(s)

15 Asynchronous threading

15.1 Example of blocking code

import requests
def get_status_code(url: str) -> int:
    response = requests.get(url)
    return response.status_code
url = 'https://www.example.com'

15.2 Thread Pool

import time
import requests
from concurrent.futures import ThreadPoolExecutor
def get_status_code(url: str) -> int:
    response = requests.get(url)
    return response.status_code
start = time.time()
with ThreadPoolExecutor() as pool:
    urls = ['https://www.example.com' for _ in range(10)]
    results = pool.map(get_status_code, urls)
    for result in results:
        # print(result)

end = time.time()
print(f'finished requests in {end - start:.4f} second(s)')
finished requests in 0.7330 second(s)

15.3 Compare with sequential code

start = time.time()
urls = ['https://www.example.com' for _ in range(10)]
for url in urls:
    result = get_status_code(url)
    # print(result)
end = time.time()
print(f'finished requests in {end - start:.4f} second(s)')
finished requests in 7.8103 second(s)

15.4 Thread pool with asyncio

import functools
import requests
import asyncio
from concurrent.futures import ThreadPoolExecutor
def get_status_code(url: str) -> int:
    response = requests.get(url)
    return response.status_code
async def main():
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor() as pool:
        urls = ['https://www.example.com' for _ in range(10)]
        tasks = [loop.run_in_executor(pool, functools.partial(get_status_code, url)) for url in urls]
        results = await asyncio.gather(*tasks)
await main()
starting <function main at 0x7fe455a29580> with args () {}
[200, 200, 200, 200, 200, 200, 200, 200, 200, 200]
finished <function main at 0x7fe455a29580> in 0.8459 second(s)

15.5 Multithreading with numpy

Let’s define a big matrix on which we will compute the mean of each row.

import numpy as np

data_points = 400000000
rows = 50
columns = int(data_points / rows)
matrix = np.arange(data_points).reshape(rows, columns)

Now process the matrix sequentially.

s = time.time()
res_seq = np.mean(matrix, axis=1)
e = time.time()
print(e - s)

And then the same with multithreading (we check that the results are exactly the same).

import functools
from concurrent.futures.thread import ThreadPoolExecutor
import asyncio
def mean_for_row(arr, row):
    return np.mean(arr[row])
async def main():
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor() as pool:
        tasks = []
        for i in range(rows):
            mean = functools.partial(mean_for_row, matrix, i)
            tasks.append(loop.run_in_executor(pool, mean))
        return await asyncio.gather(*tasks)

res_threads = np.array(await main())
np.testing.assert_array_equal(res_seq, res_threads)
starting <function main at 0x7fe455a2b060> with args () {}
finished <function main at 0x7fe455a2b060> in 0.2386 second(s)

16 Conclusion

  • Everything is awaitable (coroutines, futures, tasks), i.e. can be simply run with await.

  • a task is a coroutine wrapped in a future, and scheduled to run in the event loop.

  • asyncio is a single-threaded asynchronous programming library, providing a simple way to write concurrent code for I/O bound tasks.

    \Rightarrow We’ll see later that this programming model can be used for parallelism as well, and very easily.

17 References