A week ago, I was working on a project that involved calling a REST API end point 32 million times to retrieve certain type of documents. The input to the API was a presigned URL that had a validity of few days. Hence I did not have the luxury of doing things in sequential manner. A rough calculation for the time taken to perform the task using a simple for loop made me realize that the task is a nice little use case for parallelizing. That’s when I started looking at asyncio. In the first go at my task, I ventured along with a standard approach of using multithreading functions in python. However there was always an itch to see if I could get better performance using ayncio and multithreading. The book titled “Python Concurrency with asyncio” written by “Matthew Fowler” helped me understand the basics of concurrent and parallel computing with asyncio. Subsequently I went back and performed the task of pinging an API 32 million times to retrieve 32 million json documents using asyncio and multithreading. In this post, I will summarize a few chapters that I found it useful to get my work done.

Getting to know asyncio

asyncio was introduced in Python 3.4 as an additional way to handle highly concurrent workloads outside of multithreading and multiprocessing. The chapter introduces the concept of concurrency and parallelism using an example of baking a cake. Concurrency means that the subtasks involved in doing a certain task are not done in a sequential manner but are being done by switching between tasks. Parallelism means that subtasks are being done concurrently, but they are also executed at the same time.

The difference between concurrency and parallelism is illustrated via baking a cake situation

There are two types of multitasking scenarios: Preemptive multitasking and Cooperative multitasking. In the preemptive one, we let the OS decide how to switch between which work is currently being executed via a process called time slicing. When the OS switches between work, we call it preempting. In Cooperative multitasking, we explicitly code points in our app where we can let other tasks run. The tasks in our application operate in a cooperative mode

asyncio uses cooperative multitasking to achieve concurrency. When our application reaches a point where it could wait a while for a result to come back, we explicitly mark this in code. This allows other code to run while we wait for the result to come back in the background.

The chapter gives a good primer on multiprocessing and multithreading by walking through a few code snippets.

A process is an application run that has a memory space that other applications cannot access

Threads can be thought of as lighter-weight processes. They do not have their own memory as does a process; instead, they share the memory of the process that created them. Threads are associated with the process that created then. A process will always have at least one thread associated with it, known as the main thread. A process can also create other threads called as worker threads. These threads can perform other work concurrently alongside the main thread.

A good visual to highlight the difference between multi-threaded program and multiprocessing program

GIL is the most infamous topic in the Python community. It prevents one python process from executing more that one Python bytecode instruction at any given time. This means that even if we have multiple threads on multiple cores, Python can have only one thread running Python code at once.

Why does GIL exist ?

The answer lies in how memory is managed in CPython. In CPython, the memory is managed primarily by a process known as reference counting. Reference counting works by keeping track of who currently needs access to a particular Python object. A reference count is an integer keeping track of how many places reference that particular object. When reference count reaches zero, no one is referencing the object, it can be deleted from the memory

The conflict with threads arises in the that the implementation in CPython is not thread safe. If two or more threads modify a shared variable, that variable may end in an unexpected state.

Is the GIL ever released?

GIL is released when I/O operation happens. This lets us employ threads to do concurrent work when it comes to I/O but not for CPU-bound Python code itself

Why is GIL released only for I/O operations but not CPU-bound operations ?

The answer lies in the system calls that are made in the background. In the case of I/O, the low-level system calls are outside of the Python runtime. This allows GIL to be released because it is not interacting with Python objects directly. GIL is only reacquired when the data received is translated back in to a Python object. Then, at the OS level, the I/O operations execute concurrently. This model gives us concurrency but not parallelism. In Python, because of GIL, the best we can do is concurrency of our I/O operations

asyncio exploits the fact that I/O operations release the GIL to give us concurrency even with only one thread. When we utilize asyncio we create objects called coroutines. A coroutine can be thought of as executing a lightweight thread. Much like we can have multiple threads running at the same time, each with their own concurrent I/O operation, we can have many coroutines running alongside one another. When we are waiting for our I/O bound coroutines to finish, we can still execute other Python code, thus, giving us concurrency. It is important to note that asyncio does not circumvent the GIL. If we have a CPU bound task, we still need to use multiple processes to execute it concurrently

asyncio uses the event notification mechanism of the underlying OS to achieve concurrency. The underlying OS uses non-blocking I/O mode for all the I/O operations. When we hit an I/O operation, we hand it over to our OS event notification system to keep track of it for us. Once we have done this hand-off, our Python thread is free to keep running other Python code or add more non-blocking sockets for the OS to keep track of for us. When our I/O operation finishes, we “wake up” the task that was waiting for the result and then proceed to run any other Python code that came after that I/O operation

The way asyncio keeps track of the tasks waiting for I/O notifications is via event loop

An event loop is at the heart of every asyncio application. It keeps a queue of tasks that are nothing but wrappers around a coroutine. A coroutine can pause execution when it hits an I/O bound operation and will let the event loop run other tasks that are not waiting for I/O operation to complete.

A good visual to illustrate event loop is

All the basic concepts relating to general concepts of concurrency and multiprocessing are very well explained in the first chapter using a combination of visuals and code snippets.

asyncio basics

Coroutine construct is the key element of an asyncio framework. A coroutine is like a regular python function but with the superpower that is can pause its execution when it encounters an operation that could take a while to complete. when that long running operation is complete, we can “wake up” our paused coroutine and finish executing any other code in that coroutine. While a paused coroutine is waiting for the operation it paused for to finish, we can other code. This running of other code while waiting is what gives our application concurrency.

What can one do after defining a coroutine ? One can use asyncio.run function to create an event related to the coroutine. await is used to pause a coroutine so that the control passes back to the main loop only after the coroutine is done executing. Using async functions along with await and asyncio.run, one cannot really run a block of code concurrently, unless one creates a task.

Once you create a task, one usually must use an await keyword on our tasks at some point in the application. If we do not use await, our task would be scheduled to run, but it would almost immediately be stopped and cleaned up when asyncio.run shuts down the event loop.

There are also other functions in the asyncio module such as cancel, wait_for and shield that can be used by the programmer so that the application never waits forever for a certain task to get done.

In order to understand the difference between task and coroutine, as both seem to be responding to wait prefix, the chapter introduces future and awaitable

A future is a Python object that contains a single value that you expect to get at some point in the future but may not yet have. A task can be thought of as a combination of both a coroutine and a future. When we create a task, we are creating an empty future and running the coroutine. When the coroutine has completed with either an exception or a result, we set the result of exception of the future. The common thread between coroutine, task and future is that they are all implementations of the Awaitable abstract class.

There are two common pitfalls of using coroutines and tasks. The first one involves running a CPU bound function using the asyncio framework. Since the CPU bound function by default does not release GIL, asyncio library will not give any performance gains. The second pitfall is using blocking IO libraries such as requests. These libraries are by default blocking and hence one cannot take advantage of asyncio model. It is better to use the relevant libraries that implements non blocking sockets.

A first asyncio application

This chapter introduces a basic example of socket communication between a client and a server. By default socket communication is blocking and hence a client has to wait until the server finishes processing other requests that have come in earlier. One can of course make the socket communication non-blocking so that parallel requests could be processed by the server. However there is a cost of doing this programatically as it is generally a CPU heavy operation. Instead it is better one uses the built in APIs at operating system level.

Operating systems have efficient APIs that let us watch sockets for incoming data and other events built in. While the actual API is dependent on the operating system, all of these I/O notification systems operate on a similar concepts. We give them a list of sockets we want to monitor for events, and instead of constantly checking each socket to see if it has data, the operating system tells us explicitly when sockets have data. Because this is implemented at the hardware level, very little CPU utilization is used during the monitoring, allowing for efficient resource usage. These notification systems are the core of how asyncio achieves concurrency.

Concurrent web requests

This chapter introduces aiohttp library that uses non-blocking sockets to make web requests and returns coroutines for those requests, which we can then await for a result.

One cannot take an existing Python library and use it with async library and hope to get performance gains. Most libraries do not perform well with asyncio because it uses blocking sockets. For example, the requests library will block the thread that it runs in, and since asyncio is single threaded, out entire event loop will halt until that request finishes.

The first advantage that aiohttp offers is connection pooling, i.e. creating a reusable pool of connections that can be used for performing requests. The chapter talks about all the important functions in the aiohttp library.

For a while I did not understand the reason for using gather. But reading this chapter made the purpose of the function very clear. The function takes in a sequence of awaitables and lets us run them concurrently, all in one line of code. gather will automatically wrap coroutines in to a task and put them on an event loop.

gather function has a few drawbacks. The first, is that it isn’t easy to cancel our tasks if one throws an exception. The second is that we must wait for all for all our coroutines to finish before we can process our results.

The chapter walks the reader through wait function that gives a more fine-grained control over the asynchronous request.

How much you get out of the chapter is completely dependent on what you want to accomplish ? It all depends on the idiosyncrasies of the server that you are pinging . In my case, I had to download 32 million documents from 32 million presigned URLs, each came with a certain time before which they expired. There was no choice for me other than to use parallel processing as going through the simple route of using a sequential download would have made the task infeasible. However in my case, the server had a peculiar feature that limited the API calls per second and throttled the requests. Hence I had to use other hacks to make sure that aiohttp tasks were not crossing a specific threshold of firing the requests.

Handling CPU bound work

asyncio has an API for interoperating with Python’s multiprocessing library. This lets us use async await syntax as well as as asyncio APIs with multiple processes. Thus we can get the benefit asyncio library even when using CPU-bound code.

Basic idea behind multiprocessing library

Using this library helps one to spawn separate subprocesses to handle our work. Each subprocess will have its own Python interpreter and be subject to the GIL, but instead of one interpreter we will have several, each with its own GIL. Assume we run on a machine with multiple CPU cores, this means that we can parallelize any CPU-bound workload effectively. Even if we have more processes than cores, our OS will use preemptive multitasking to allow our multiple tasks to run concurrently. This setup is both concurrent and parallel

One can use Process function to spawn several processes and get things done in a concurrent fashion. However it is better to use process pools, i.e. a collection of Python processes that we can use to run functions in parallel. The Pool function from multiprocessing library gives the ability to run tasks in parallel. Using process pools are good for simple use cases, but Python offers an abstraction on top of the multiprocessing pools via concurrent.futures module. This module contains executors for both processes and threads that can be used on their own but also interoperate with asyncio.

When we submit a task to a process pool, it may not run immediately because the processes in the pool may be busy with other tasks. Ho does the process pool handle this ? In the background, a process pool executors keep a queue of tasks to manage this. When we submit a task to the process pool, its arguments are picked and put on the task queue. Then, each worked process asks for a task from the queue when it is ready for work. When a worked process pulls a task off the queue, it unpickles the arguments and begins to execute the task.

Use eventloop

The use of executors makes the whole process far more pleasing as it abstracts across threads and processes. However there is one limitation, the order of iteration is deterministic based on the target function arguments. I think the main takeaway is that one can use the event loop that asyncio relies on, and make sure that process pool executors get a chance to use event loops. The regular code involving process pool executors is easy to tweak so as to make it work with event loops.

The chapter gives a quick primer on map reduce and then shows an example of using asyncio on map reduce to give it a better performance.

Shared locks

The section on shared locks is very interesting. Learned about two kinds of shared data: values and array, supported by multiprocessing module. These shared data types can be used to make sure that the processes can access global variables and in order to prevent race conditions, one can use lock features available for these shared data objects. Sharing data between process pools is a bit more nuanced but the code given in the chapter is easy to understand

Handling blocking work with threads

The organization of the chapter is similar to the previous chapter; however the focus is on threads. If one wants to use asyncio with legacy code that has blocking code, then this chapter is worth going over.

I found the discussion around race condition interesting. In the case of threads, since they share share the memory of the process that spawned them, we do not have to go through the route of using shared data type objects. However to implement the lock functionality, there is a Lock function with in the threading module that one can use. To illustrate this point, the author uses a URL download reporter example. The following is a slight modification to the code that uses the Lock functionality

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import asyncio
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
import functools
from functools import partial
import time

counter_lock = Lock() counter = 0

def io_task(d): global counter time.sleep(d) with counter_lock: counter = counter + 1 return counter

async def report_task(n, increments): global counter print(counter) while counter < n: print(f"tasks processed {counter*100/n} %") await asyncio.sleep(1)

async def main(): loop = asyncio.get_running_loop() N = 10 with ThreadPoolExecutor() as p: calls = [partial(io_task, i) for i in range(N)] call_coros = [loop.run_in_executor(p, call) for call in calls] reporter = asyncio.create_task(report_task(N, 1)) results = await asyncio.gather(*call_coros) await reporter print(results)

if name == "main": asyncio.run(main())

After understanding the use of locks, I realized that I could have used this sort of code in my “32 million API pings” task to keep track of the progress instead of keeping track of files downloaded.

Synchronization

This chapter proved to be priceless for my task as it gave me a good understanding of Semaphore that I could then use it to set a limit to the number of requests fired by API.

asyncio synchronization primitives can help us prevent bugs unique to a single-threaded concurrency model.

Firstly, can there be single-threaded concurrency bugs ? The chapter gives a nice example to illustrate the point that concurrency issues can happen in a single-threaded process too.

A simple example proves the point that concurrency issues can happen with asyncio library too

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import asyncio

class MockSocket: def init(self): self.socket_closed = False

async def send(self, msg):
    if self.socket_closed:
        raise Exception(&#34;Socket is closed&#34;)
    print(f&#34;Sending: {msg}&#34;)
    await asyncio.sleep(1)
    print(f&#34;Sent: {msg}&#34;)

def close(self):
    self.socket_closed = True

user_names_to_sockets = { "John": MockSocket(), "Terry": MockSocket(), "Eric": MockSocket(), }

async def user_disconnect(username): print(f"{username} disconnected") socket = user_names_to_sockets.pop(username) socket.close()

async def message_all_users(): print("creating message tasks") messages = [ socket_f.send(f"Hello {user}") for user, socket_f in user_names_to_sockets.items() ] await asyncio.gather(*messages)

async def main(): await asyncio.gather(message_all_users(), user_disconnect("Eric"))

asyncio.run(main())

The above code throws an exception; when message_all_users pauses, another coroutine runs and modifies a variable that is being used.

These are the types of bugs you tend to see in a single-threaded concurrency model. You hit a suspension point with await, and another coroutine runs and modifies some share state, changing it for the first coroutine once it resumes in a undesired way. The key difference between multithreaded concurrency bugs and single threaded concurrency bugs is that in a multithreaded application, race conditions are possible anywhere you modify a mutable state. In a single-threaded concurrency model, you need to modify the mutable state during an await point.

How can one solve the above problem ? It is via asyncio locks, that are awaitable objects that suspend coroutine execution when they are blocked. This means that when a coroutine is blocked waiting to acquire a lock, other code can run. This is very useful in situations where you want a set of coroutines not to get locked in a race condition

Semaphores

I found this functionality extremely useful in my task involving download of 32 million documents. There was a rate limit applied by the API and hence my plain and simple asyncio program was getting HTTP 429 response codes. Semaphore came to my rescue.

Semaphores acts much like a lock in that we can acquire and we can release it, with the major difference being that we can acquire it multiple times up to a limit we can specify. Internally a semaphore keeps track of this limit; each time we acquire the semaphore we decrement the limit, and each time we release the semaphore we increment it. If the count reaches zero, any further attempts to acquire the semaphore will block until someone else calls release and increments the count. Semaphore is useful whenever you are hitting an API that limits concurrent access to an endpoint.

Unfortunately using Semaphore did not solve my problem completely as the API that I was hitting had also per second limit on the number of concurrency calls. Using ONLY Semaphore can make the code bursty, meaning that is has the potential to burst 10 requests at the same moment, creating a potential spike in traffic. This may not be desirable if we are concerned about spikes of load on the API we are calling. Hence there was a hack that I found online that helped me fire API calls at a fixed number per second that uses ‘leaky bucket’ algo. Here is the link to the article and here is the code that was helpful in my task.

BoundedSemaphore

There is a variant of Semaphore called BoundedSemaphore that can be used to get a more fine grained control on the way the lock is acquired and released. Using Semaphore, it is valid to call release more times than we call acquire. If we are always using async with block, this situation can never happen. However if there are situations in the app where the locks are released with out the async with block for whatever reason it is, one can use BoundedSemaphore functionality to restrict the number of releases one can call on a lock.

Takeaway

Thanks to this book, I was able to answer the following questions relating to parallel processing:

  • How is asyncio different from the regular multiprocessing and concurrent.futures module ?
  • Is there a speed up in I/O bound tasks when one uses asyncio as compared to concurrent.futures on a 32 core machine ?
  • How do I make sure that I do not exceed the API rate limit while doing millions of request ?
  • How do I make sure that I do not run out of memory because asyncio can spawn literally thousands of requests at the same time ?
  • How do I use aiohttp to improve I/O bound tasks that involves pinging an external REST endpoint ?
  • How do I use aiofiles so that I can take a blocking code that serializes JSON response, in to a non blocking code that serializes JSON response ?
  • Why is it better to use a ClientSession in aiohttp ?
  • What is the performance benefit of using aiomultiprocess over asyncio ?
  • What is the performance benefit of using asyncio over ThreadPoolExecutor ?
  • What is a blocking code ?
  • What is a non blocking code ?
  • What’s the use of asyncio.sleep function ?
  • What is a coroutine ?
  • What is a task ?
  • How is coroutine different from task ?
  • What is awaitable ?
  • What is the difference between process and thread ?
  • If you are using a ThreadPoolexecutor on a multicore CPU, will the threads run on a single core or multiple cores ?
  • If you are using a ThreadPoolexecutor on a multicore CPU, will the threads be concurrently processed ?
  • What does concurrent and parallel operation mean ?
  • What is the default max_ workers in a ProcessPoolExecutor?
  • What is the default max_ workers in a ThreadsPoolExecutor?
  • What is Semaphore and how can one use it in task that involves asyncio library?
  • What is BoundedSemaphore and how can one use it in task that involves asyncio library?
  • Why are thread pool executors and process pool executors better to work with, as compared to thread and processes directly?
  • Can different processes access to shared memory without creating a race condition ?
  • How can one use asyncio with blocking libraries such as requests ?
  • What is the limitation with GIL ?
  • GIL is not much of an issue in I/O bound task - Why ?
  • How does one access the event loop in a program ?
  • What is the use of wait keyword in the asyncio framework ? Where should one use it ?
  • What is the connection between generators and coroutines ?
  • What are similarities and dissimilarities between generators and coroutines
  • Python is multithreaded but is not simultaneously multithreaded. What does this mean ?
  • Multi-threaded does not strictly mean single-core. What does this statement mean ?
  • How does one avoid locks in a situation that involves thread pool executor and async code?

I found this book very useful in order to successfully accomplish my task of making 32 million REST API calls. Clearly without the knowledge from this book, it would have taken forever to get the task done. The author has taken pains to explain various aspects of the library in a pretty elaborate manner, making this book a good beginner book for understanding asyncio library. I think this is one of the the best Python books that I have read this year.