Exploring Pythons asyncio - a basic tutorial

30 Dec 2024 - tsp
Last update 30 Dec 2024
Reading time 34 mins

Introduction to asyncio

Python’s asyncio is a library designed to write concurrent code using the async/await syntax. It is particularly useful for I/O-bound and high-level structured network code, enabling efficient multitasking within a single-threaded program.

The following blog post serves as a quick overview, tutorial and a repository of the most important concepts and patterns to use with ayncio.

Synchronous vs. Asynchronous Programming

Synchronous Programming: In synchronous programming, tasks are executed sequentially. Each operation blocks the program until it is complete. For example:

def fetch_data():
    data = read_file("data.txt")  # Blocks until the file is read
    process(data)                 # Processes the data

Asynchronous Programming: In asynchronous programming, tasks can be paused and resumed, allowing other tasks to run during idle times. Example:

async def fetch_data():
    data = await read_file("data.txt")  # Non-blocking read
    process(data)                        # Processes the data

This approach improves efficiency for programs involving I/O by allowing other tasks to run during idle times. For example, while waiting for a file to be read, a web scraping program can send multiple HTTP requests and process them in the order they finish:

import asyncio

async def fetch_url(url, delay):
    await asyncio.sleep(delay)  # Simulates network request delay
    return f"Content from {url} after {delay} seconds"

async def main():
    urls = [
        ("http://example.com/1", 3),
        ("http://example.com/2", 2),
        ("http://example.com/3", 1)
    ]

    tasks = [asyncio.create_task(fetch_url(url, delay)) for url, delay in urls]

    for task in asyncio.as_completed(tasks):
        result = await task
        print(result)  # Process results as they complete

asyncio.run(main())

In this example, asyncio.as_completed ensures that results are processed in the order they are ready, not the order they were initiated. This demonstrates how asyncio improves responsiveness and resource utilization.

The core components of asyncio are:

The Key Syntax is composed of:

In the following sections we are going to explore how to utilize asyncio and all basic components. Before diving into the matter just a word of caution: Doing multithreading in the way asyncio does in a cooperative fashion does not utilize multiple cores of a machine and it still suffers from global lock when one utilizes respective APIs. Even when combined with Pythons threading module the application remains single threaded on the operating system level. The only way of doing proper multithreading in Python is the utilization of multiprocessing by launching multiple processes - which is also mentioned in this blog article. Any other usage of asyncio or threading is more of a programming pattern or utilizing overlapped I/O operations to interleave computation with I/O.

How asyncio Works Internally

Understanding the internal mechanics of asyncio is crucial for writing efficient and reliable asynchronous programs. Lets first delve into the key components and their interactions - as well as the inner workings that are usually not visible to the programmer.

The Role of the Event Loop

At the core of asyncio is the event loop, a mechanism that continuously monitors and dispatches events. The event loop:

The event loop ensures that only one operation is active at any given time, but it can switch between tasks whenever they are idle, such as during I/O waits. Internally, the event loop is essentially an endless loop that continuously fetches and executes tasks from a queue.

Although Python’s asyncio typically runs a single event loop per thread, it is possible to run multiple threads, each with its own event loop. However, this requires careful management to avoid conflicts between threads. While Python does not natively support a thread pool of event loops similar to kqueue, you can manually create threads with individual event loops for specialized concurrency scenarios.

Scheduling and Managing Tasks

Scheduling tasks is accomplished through three primary mechanisms:

Cooperative Multitasking in Python

asyncio employs cooperative multitasking like the first multitasking operating systems like Windows 3.x, where tasks voluntarily yield control back to the event loop using await. Unlike preemptive multitasking, asyncio does not interrupt tasks like modern operating systems do with threads; instead, tasks must explicitly indicate when they can pause.

Example:

import asyncio

async def task1():
    print("Task 1: Starting")
    await asyncio.sleep(2)
    print("Task 1: Done")

async def task2():
    print("Task 2: Starting")
    await asyncio.sleep(1)
    print("Task 2: Done")

async def main():
    await asyncio.gather(task1(), task2())

asyncio.run(main())

In this example, task2 completes first because it yields control back to the event loop after a shorter delay.

Execution Flow: From await to the Event Loop

This process ensures that no time is wasted waiting for I/O operations, enabling other tasks to make progress in the mean time.

Using asyncio

In this section, we explore how to effectively use asyncio by creating event loops, running coroutines, and working with tasks. Additionally, we’ll cover essential APIs such as sleep, gather, and run_in_executor.

Creating and Running Event Loops

The event loop is the backbone of asyncio. You typically use asyncio.run() to automatically create and run an event loop for a coroutine. For more advanced use cases, you can explicitly create and manage an event loop using asyncio.get_event_loop().

The following example demonstrated how to manually create and run an event loop:

import asyncio

async def say_hello():
    print("Hello, world!")

# Create an event loop
loop = asyncio.get_event_loop()
try:
    # Run the coroutine in the event loop
    loop.run_until_complete(say_hello())
finally:
    # Close the loop when done
    loop.close()

Using asyncio.run() simplifies the process for most cases:

async def say_hello():
    print("Hello, world!")

asyncio.run(say_hello())

Explanation of run_until_complete and Alternatives

run_until_complete is a method of the event loop that runs a given coroutine until it completes. It is typically used when you want to execute a specific coroutine and then stop the event loop immediately after it finishes.

Example:

import asyncio

async def say_hello():
    print("Hello, world!")
    await asyncio.sleep(1)
    print("Goodbye, world!")

loop = asyncio.get_event_loop()
loop.run_until_complete(say_hello())
loop.close()

In this example, the event loop runs only the say_hello coroutine, and once it completes, the loop stops.

Alternatives

run_forever: The run_forever method keeps the event loop running indefinitely. It is useful when the application has ongoing tasks, such as a server or a long-running daemon.

Example:

import asyncio

async def periodic_task():
    while True:
        print("Task running...")
        await asyncio.sleep(2)

loop = asyncio.get_event_loop()
loop.create_task(periodic_task())
try:
    loop.run_forever()
finally:
    loop.close()

Here, the loop continues running until it is explicitly stopped (e.g., by calling loop.stop() or terminating the program).

asyncio.run: The asyncio.run function simplifies running a single coroutine by automatically creating and closing the event loop for you. It is equivalent to creating an event loop, running run_until_complete, and closing the loop.

Example:

async def say_hello():
    print("Hello, world!")

asyncio.run(say_hello())

Use run_until_complete for targeted coroutine execution, run_forever for persistent applications, and asyncio.run for general-purpose, single-entry-point asynchronous scripts.

Working with Tasks

As we have seen previously, tasks enable coroutines to run concurrently. When you create a task using asyncio.create_task() or loop.create_task(), it is enqueued onto the event loop’s ready queue. Tasks in the ready queue are executed in the order they are dequeued by the event loop, subject to I/O readiness and other coroutine states.

Example:

async def fetch_data(n):
    print(f"Task {n} starting")
    await asyncio.sleep(n)
    print(f"Task {n} done")

async def main():
    task1 = asyncio.create_task(fetch_data(2))
    task2 = asyncio.create_task(fetch_data(1))

    # Wait for both tasks to complete
    await task1
    await task2

asyncio.run(main())

Key asyncio APIs

asyncio.sleep()

Pauses a coroutine for a specified time.

async def example():
    print("Waiting...")
    await asyncio.sleep(2)
    print("Done waiting")

asyncio.run(example())

asyncio.gather()

Runs multiple coroutines concurrently and aggregates their results. The coroutines passed to gather are scheduled for execution at the time gather is called. If a list of tasks is passed instead, those tasks must already be created using asyncio.create_task or similar methods. This distinction allows flexibility - you can prepare a list of tasks beforehand or directly pass coroutines to gather for immediate scheduling.

async def task(n):
    await asyncio.sleep(n)
    return f"Task {n} complete"

async def main():
    results = await asyncio.gather(task(1), task(2), task(3))
    print(results)

asyncio.run(main())

asyncio.run_in_executor()

Offloads blocking code to a separate thread or process. The first argument to run_in_executor specifies the executor to use. If set to None, it defaults to the loop’s default thread pool executor. The code being offloaded is thread-safe as long as the provided function itself is thread-safe. You can access the event loop for a specific thread using asyncio.get_running_loop(), but only within coroutines running in that thread. Each thread can have its own event loop, but managing multiple loops across threads requires careful synchronization and design.

import time

# Simulating a blocking operation
def blocking_task():
    time.sleep(3)
    return "Blocking task complete"

async def main():
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, blocking_task)
    print(result)

asyncio.run(main())

Executors in Python’s asyncio framework provide a mechanism for running blocking or computationally intensive code in a non-blocking way. Executors can be either thread pools (ThreadPoolExecutor) or process pools (ProcessPoolExecutor). When the first argument of run_in_executor is not set to None, you can pass a specific executor instance to control where the code is executed. For example, you can create a ThreadPoolExecutor or ProcessPoolExecutor and pass it as the first argument. This gives finer control over resource usage and isolation:

import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# Blocking function
def blocking_task():
    return "Task completed in executor"

async def main():
    # Create a thread pool executor
    thread_executor = ThreadPoolExecutor(max_workers=3)

    # Use the thread executor
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(thread_executor, blocking_task)
    print(result)

asyncio.run(main())

This allows you to control thread or process-level concurrency depending on your application needs.

Error Handling in Coroutines

Errors within coroutines need to be handled carefully to avoid crashing the event loop. Use try and except blocks:

async def faulty_task():
    raise ValueError("An error occurred")

async def main():
    try:
        await faulty_task()
    except ValueError as e:
        print(f"Caught an error: {e}")

asyncio.run(main())

Proper error handling ensurses that a single failing coroutine does not halt other operations. This section has provided foundational techniques for using asyncio effectively.

Combining asyncio with Threading

Overview of Threading and the Global Interpreter Lock (GIL)

Python’s threading model includes the Global Interpreter Lock (GIL), which ensures that only one thread executes Python bytecode at a time. This simplifies memory management but limits the performance of multi-threaded CPU-bound tasks. However, threads can still perform I/O-bound tasks concurrently, as the GIL is released during I/O operations or native extensions.

Calling Routines in Separate Threads Using asyncio.to_thread and Traditional Threads

The asyncio.to_thread function allows you to run blocking functions in separate threads seamlessly:

import asyncio
import time

def blocking_task():
    time.sleep(3)
    return "Blocking task complete"

async def main():
    result = await asyncio.to_thread(blocking_task)
    print(result)

asyncio.run(main())

For more control, you can use the threading module to create and manage threads manually. Threads can execute synchronous functions alongside asyncio tasks, but you must carefully synchronize shared data:

import threading
import asyncio

shared_data = []

def thread_function():
    shared_data.append("Data from thread")

async def main():
    thread = threading.Thread(target=thread_function)
    thread.start()
    thread.join()
    print(shared_data)

asyncio.run(main())

Launching asyncio queues in separate threads

You can launch event loops in threads created with the threading module, enabling separate asyncio queues to run independently. To achieve this, each thread must explicitly create and run its own event loop using asyncio.new_event_loop(). This setup allows you to pass tasks or data between threads using asyncio-compatible primitives like asyncio.Queue.

Example:

import asyncio
import threading

# Worker function for the thread
def thread_worker(queue):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    async def process_queue():
        while True:
            data = await queue.get()
            if data is None:
                break
            print(f"Processed in thread: {data}")
            queue.task_done()

    loop.run_until_complete(process_queue())

async def main():
    queue = asyncio.Queue()

    # Start a thread with its own event loop
    thread = threading.Thread(target=thread_worker, args=(queue,))
    thread.start()

    # Add tasks to the queue
    for i in range(5):
        await queue.put(f"Task {i}")

    # Signal the thread to exit
    await queue.put(None)

    # Wait for the thread to finish
    thread.join()

asyncio.run(main())

Passing Tasks Between Threads

To pass tasks between threads, you can use asyncio.run_coroutine_threadsafe() to schedule a coroutine on another thread’s event loop. This is useful for transferring data or triggering actions from one thread to another.

Example:

import asyncio
import threading

# Worker function for the thread
def thread_worker(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def task_to_run(data):
    print(f"Task received in other thread: {data}")

async def main():
    # Create an event loop for the second thread
    loop = asyncio.new_event_loop()
    thread = threading.Thread(target=thread_worker, args=(loop,))
    thread.start()

    # Pass a task to the second thread
    asyncio.run_coroutine_threadsafe(task_to_run("Hello from main"), loop)

    # Give some time for the task to execute
    await asyncio.sleep(1)

    # Stop the thread's event loop
    loop.call_soon_threadsafe(loop.stop)
    thread.join()

asyncio.run(main())

In this example, a coroutine is passed to a thread’s event loop using asyncio.run_coroutine_threadsafe, demonstrating how tasks can be shared between threads while maintaining asyncio compatibility.

You can also launch own event loops in different threads that you have launched. Then you can pass tasks from one thread and enqueue them in another threads loop.

Managing Thread Synchronization with asyncio

When accessing shared resources between threads and asyncio tasks, synchronization primitives are crucial. You can use standard threading primitives like Lock or Condition or rely on asyncio’s native primitives like asyncio.Lock.

Example using threading.Lock:

import asyncio
import threading

shared_data = []
lock = threading.Lock()

def thread_function():
    with lock:
        shared_data.append("Data from thread")

async def async_function():
    async with asyncio.Lock():
        shared_data.append("Data from async task")

async def main():
    thread = threading.Thread(target=thread_function)
    thread.start()
    await async_function()
    thread.join()
    print(shared_data)

asyncio.run(main())

Example using asyncio.Lock:

import asyncio

shared_data = []
lock = asyncio.Lock()

async def async_function_one():
    async with lock:
        shared_data.append("Data from async function one")
        await asyncio.sleep(1)  # Simulate some asynchronous work
        print("Async function one completed")

async def async_function_two():
    async with lock:
        shared_data.append("Data from async function two")
        await asyncio.sleep(1)  # Simulate some asynchronous work
        print("Async function two completed")

async def main():
    await asyncio.gather(async_function_one(), async_function_two())
    print(shared_data)

asyncio.run(main())

Multithreading with spawn and fork

When creating processes, Python supports two modes:

asyncio works independently in each process. This means you must initialize event loops separately in each process when using spawn or fork. Notably, fork is not available on Windows systems, as it is specific to Unix-based operating systems. On Windows, only spawn can be used, which does not inherit the state of the parent process, requiring more explicit initialization in child processes.

Example:

import asyncio
from multiprocessing import Process

async def async_task():
    print("Async task running")
    await asyncio.sleep(1)
    print("Async task done")

def worker():
    asyncio.run(async_task())

if __name__ == "__main__":
    process = Process(target=worker)
    process.start()
    process.join()

Using ThreadPoolExecutor and ProcessPoolExecutor

ThreadPoolExecutor

The ThreadPoolExecutor is ideal for offloading I/O-bound tasks. It creates a pool of threads and schedules blocking operations to run concurrently:

from concurrent.futures import ThreadPoolExecutor
import asyncio

def blocking_task():
    return "Task in thread pool"

async def main():
    executor = ThreadPoolExecutor(max_workers=4)
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(executor, blocking_task)
    print(result)

asyncio.run(main())

ProcessPoolExecutor

For CPU-bound tasks, the ProcessPoolExecutor spawns separate processes. This provides true parallelism by bypassing the GIL:

from concurrent.futures import ProcessPoolExecutor
import asyncio

def cpu_bound_task():
    return sum(range(10**6))

async def main():
    executor = ProcessPoolExecutor(max_workers=2)
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(executor, cpu_bound_task)
    print(result)

asyncio.run(main())

Process Isolation in ProcessPoolExecutor

Processes created by ProcessPoolExecutor have their own memory space, preventing unintended interactions between tasks. This isolation is crucial for ensuring safe parallel execution of CPU-intensive workloads but incurs higher overhead compared to ThreadPoolExecutor. Use it when tasks require heavy computation or need strict memory separation.

Communication between asyncio tasks when using ProcessPoolExecutor

When using ProcessPoolExecutor, communication between coroutines and the processes managed by the executor requires explicit mechanisms because each process operates in its own isolated memory space. Here are the key methods to pass arguments into and retrieve results from tasks executed in a ProcessPoolExecutor:

Passing Arguments to Processes

Arguments can be passed directly to the function executed in the ProcessPoolExecutor. For example:

from concurrent.futures import ProcessPoolExecutor
import asyncio

def process_task(data):
    return f"Processed {data}"

async def main():
    loop = asyncio.get_running_loop()
    executor = ProcessPoolExecutor()

    result = await loop.run_in_executor(executor, process_task, "Task data")
    print(result)

asyncio.run(main())

In this example, “Task data” is passed to the process_task function in the process pool.

Retrieving Results from Processes

The result of the function is returned to the calling coroutine as demonstrated in the example above. The await keyword ensures the calling coroutine waits until the process completes execution and the result is available.

Using Shared Data Structures

To enable communication beyond simple arguments and return values, you can use inter-process communication (IPC) mechanisms like multiprocessing.Queue or multiprocessing.Manager.

Example using multiprocessing.Queue:

from multiprocessing import Queue
from concurrent.futures import ProcessPoolExecutor
import asyncio

queue = Queue()

def process_task():
    queue.put("Data from process")

async def main():
    loop = asyncio.get_running_loop()
    executor = ProcessPoolExecutor()

    # Run the task and pass the queue
    await loop.run_in_executor(executor, process_task)

    # Retrieve data from the queue
    while not queue.empty():
        print(queue.get())

asyncio.run(main())

Note that we generated the queue outside of async def main. One could get the idea that one can pass the queue as third argument to run_in_executor and accept is as argument in a process_task(queue) method. This does not work since queues always only have to be passed via inheritance and never as arguments due to the cloning of the file descriptors during the process fork.

Using Serialization for Complex Data

Since processes do not share memory, data passed between them must be serialized (e.g., using pickle). Python’s concurrent.futures handles serialization automatically when passing arguments and receiving results from functions executed in a ProcessPoolExecutor.

By using these methods, you can effectively communicate between coroutines and processes while leveraging the isolation and parallelism provided by ProcessPoolExecutor. Make sure to carefully manage serialization overhead and IPC mechanisms for performance-critical applications.

Using asyncio for I/O Operations

Network I/O: asyncio and aiohttp

asyncio provides an excellent foundation for handling network I/O operations, such as making HTTP requests. The aiohttp library builds on this by offering an asynchronous HTTP client and server.

Example of using aiohttp for HTTP requests:

import aiohttp
import asyncio

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "http://example.com",
        "http://example.org",
        "http://example.net",
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    for result in results:
        print(result)

asyncio.run(main())

This example demonstrates how aiohttp integrates with asyncio to handle multiple HTTP requests concurrently.

Example of using aiohttp to build an HTTP server:

import asyncio
from aiohttp import web

async def handle(request):
    name = request.rel_url.query.get("name", "world")
    return web.Response(text=f"Hello, {name}!")

async def init_app():
    app = web.Application()
    app.router.add_get("/", handle)
    return app

async def main():
    app = await init_app()
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, "localhost", 8080)
    await site.start()
    print("Server running on http://localhost:8080")

    # Keep the server running
    await asyncio.Event().wait()

asyncio.run(main())

This server responds to HTTP GET requests on /, returning a personalized greeting if a name query parameter is provided.

File I/O: Async File Readers and Writers

While Python’s standard open function blocks the event loop, you can use aiofiles to perform asynchronous file operations. This is an optional library that can be installed via PyPi:

pip install aiofiles

Example of reading and writing to a file asynchronously:

import aiofiles
import asyncio

async def read_file(filename):
    async with aiofiles.open(filename, mode='r') as f:
        contents = await f.read()
        print(contents)

async def write_file(filename, content):
    async with aiofiles.open(filename, mode='w') as f:
        await f.write(content)

async def main():
    await write_file("example.txt", "Hello, asyncio file I/O!")
    await read_file("example.txt")

asyncio.run(main())

The aiofiles library ensures that file I/O does not block the event loop, enabling other tasks to run concurrently.

Serial Communication with asyncio

asyncio can also be used to handle serial communication through libraries like pyserial-asyncio. This allows for efficient handling of serial data streams. This is an optional add-on library that can either be installed from PyPi:

pip install asyncio-serial

or depending on your platform via your local package management which is preferred. On FreeBSD for Python 3.11 this would be done using

pkg install py311-pyserial-asyncio

Example of reading data from a serial port:

import asyncio
import serial_asyncio

class SerialReader(asyncio.Protocol):
    def __init__(self):
        self.buffer = b''
        self.expected_length = None
        self.terminator = b'\n'

    def data_received(self, data):
        self.buffer += data
        while self.buffer:
            tag = self.buffer[0]

            if tag == 1:  # Fixed-length message
                if len(self.buffer) < 2:
                    break
                self.expected_length = self.buffer[1]
                if len(self.buffer) >= 2 + self.expected_length:
                    message = self.buffer[2:2 + self.expected_length]
                    self.buffer = self.buffer[2 + self.expected_length:]
                    print(f"Tag 1: Received message: {message.decode()}")
                else:
                    break

            elif tag == 2:  # Terminated message
                terminator_index = self.buffer.find(self.terminator)
                if terminator_index != -1:
                    message = self.buffer[1:terminator_index]
                    self.buffer = self.buffer[terminator_index + 1:]
                    print(f"Tag 2: Received message: {message.decode()}")
                else:
                    break

            else:
                print(f"Unknown tag: {tag}")
                self.buffer = self.buffer[1:]  # Discard the unknown tag

async def main():
    loop = asyncio.get_running_loop()
    await serial_asyncio.create_serial_connection(
        loop, SerialReader, '/dev/ttyU0', baudrate=9600
    )

    # Run forever to keep the serial connection active
    await asyncio.Event().wait()

asyncio.run(main())

In this example, serial_asyncio creates a non-blocking serial connection, and the SerialReader protocol processes incoming data. The protocol distinguishes between two types of messages based on the tag: fixed-length messages and newline-terminated messages. This setup ensures the event loop remains responsive while handling varied serial data formats asynchronously.

Reconnecting a Serial Resource with serial_asyncio

To handle reconnections when a serial resource is physically disconnected, you can monitor the connection status and attempt to reconnect upon disconnection. Here’s how it can be achieved:

import asyncio
import serial_asyncio

class SerialReconnector(asyncio.Protocol):
    def __init__(self, port, baudrate):
        self.port = port
        self.baudrate = baudrate
        self.buffer = b''

    def connection_made(self, transport):
        self.transport = transport
        print(f"Connected to {self.port}")

    def data_received(self, data):
        self.buffer += data
        print(f"Received: {data.decode()}")

    def connection_lost(self, exc):
        print("Connection lost, attempting to reconnect...")
        asyncio.create_task(self.reconnect())

    async def reconnect(self):
        while True:
            try:
                await serial_asyncio.create_serial_connection(
                    asyncio.get_running_loop(), lambda: self, self.port, baudrate=self.baudrate
                )
                print("Reconnected successfully")
                break
            except Exception as e:
                print(f"Reconnection failed: {e}, retrying in 5 seconds...")
                await asyncio.sleep(5)

async def main():
    port = '/dev/ttyUSB0'
    baudrate = 9600
    loop = asyncio.get_running_loop()

    protocol = SerialReconnector(port, baudrate)
    await serial_asyncio.create_serial_connection(loop, lambda: protocol, port, baudrate=baudrate)

    # Run indefinitely
    await asyncio.Event().wait()

asyncio.run(main())

This solution ensures continuous attempts to reconnect the serial resource and restores communication once it is reconnected.

asyncio and TCP/UDP client and server connections

asyncio provides robust support for implementing TCP and UDP clients and servers. Below, we demonstrate examples for both protocols, including handling reconnects for TCP and sending/receiving UDP packets.

TCP Client with Reconnects

A TCP client can be created using asyncio.open_connection. Here’s an example with reconnect logic:

import asyncio

async def tcp_client(host, port):
    while True:
        try:
            reader, writer = await asyncio.open_connection(host, port)
            print(f"Connected to {host}:{port}")

            writer.write(b"Hello, Server!\n")
            await writer.drain()

            data = await reader.readline()
            print(f"Received: {data.decode().strip()}")

            writer.close()
            await writer.wait_closed()
            break
        except (ConnectionRefusedError, asyncio.TimeoutError):
            print(f"Connection failed, retrying in 5 seconds...")
            await asyncio.sleep(5)

asyncio.run(tcp_client("127.0.0.1", 8888))

This client continuously attempts to reconnect if the connection fails.

TCP Server

A basic TCP server can be implemented using asyncio.start_server:

import asyncio

async def handle_client(reader, writer):
    client_address = writer.get_extra_info('peername')
    print(f"Connection from {client_address}")

    while True:
        data = await reader.readline()
        if not data:
            break
        print(f"Received: {data.decode().strip()}")
        writer.write(data)
        await writer.drain()

    print(f"Connection closed: {client_address}")
    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle_client, "127.0.0.1", 8888)
    print("Server running on 127.0.0.1:8888")

    async with server:
        await server.serve_forever()

asyncio.run(main())

This server echoes back messages it receives from clients.

UDP Communication

For UDP, asyncio provides DatagramProtocol for asynchronous packet transmission and reception. Here’s an example of a UDP client and server:

UDP Server:

import asyncio

class UDPServerProtocol(asyncio.DatagramProtocol):
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        print(f"Received {data.decode()} from {addr}")

        # Echo the data back
        self.transport.sendto(data, addr)

async def main():
    loop = asyncio.get_running_loop()
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: UDPServerProtocol(), local_addr=("127.0.0.1", 9999)
    )

    print("UDP server running on 127.0.0.1:9999")

    try:
        await asyncio.Event().wait()  # Keep server running
    finally:
        transport.close()

asyncio.run(main())

UDP Client:

import asyncio

async def udp_client():
    loop = asyncio.get_running_loop()

    transport, protocol = await loop.create_datagram_endpoint(
        lambda: asyncio.DatagramProtocol(),
        remote_addr=("127.0.0.1", 9999),
    )

    message = b"Hello, UDP Server!"
    transport.sendto(message)
    print(f"Sent: {message.decode()}")

    await asyncio.sleep(1)  # Give time for server response (if needed)
    transport.close()

asyncio.run(udp_client())

The UDP client sends a message to the server, which echoes it back. Both the server and client are non-blocking and efficient.

Integrating asyncio with FreeSimpleGUI

FreeSimpleGUI is a user-friendly GUI framework designed for rapid development of Python utilities. It offers straightforward integration with libraries like matplotlib and supports 2D/3D drawing canvases along with intuitive mouse input handling. Previously known as PySimpleGUI, the library has been succeeded by FreeSimpleGUI, an actively maintained and free fork of the original, which is no longer open source. Since it’s a third party library it can be installed using PyPi:

pip install FreeSimpleGUI

Challenges of Updating GUIs Asynchronously

When integrating asyncio with FreeSimpleGUI, several challenges arise:

Avoiding Conflicts Between Event Loops

To avoid conflicts, the asyncio event loop and FreeSimpleGUI’s event loop must cooperate. A very simple way is an approach that yields execution instead of blocking while waiting on window events. This has the drawback of performing busy waiting

Pattern 1: Busy waiting yielding control back to asyncio

import asyncio
import FreeSimpleGUI as sg

async def long_running_task(window):
    for i in range(5):
        await asyncio.sleep(1)
        window.write_event_value("TASK_UPDATE", f"Step {i + 1} complete")
    window.write_event_value("TASK_DONE", "Task finished")

def get_window_events(window, future):
    while True:
        event, values = window.read(timeout = 0)
        if event is not None:
            future.set_result((event, values))
            break
        asyncio.sleep(0)

async def main():
    layout = [[sg.Text("Asyncio Integration Example")],
              [sg.Output(size=(40, 10))],
              [sg.Button("Start Task"), sg.Button("Exit")]]

    window = sg.Window("Asyncio and FreeSimpleGUI", layout, finalize=True)

    loop = asyncio.get_running_loop()

    while True:
#        event, values = await loop.run_in_executor(None, window.read())
#        event, values = window.read(timeout = 1)
        future = asyncio.Future()
        loop.call_soon(get_window_events, window, future)
        event, values = await future

        await asyncio.sleep(0)

        if event in (sg.WINDOW_CLOSED, "Exit"):
            break

        if event == "Start Task":
            asyncio.create_task(long_running_task(window))

        if event == "TASK_UPDATE":
            print(values[event])

        if event == "TASK_DONE":
            print(values[event])

    window.close()

asyncio.run(main())

As one can see in this pattern the window.read method has been wrapped in another synchronous method that periodically polls the GUIs event loop without blocking (thus setting timeout=0). In case there is a result the future object is set to the result and the method returns, else it yields control to the asyncio framework by executing asyncio.sleep(0).

Pattern 2: Running asyncio tasks in a background thread

A way better approach for cooperating between a GUI event loop and asyncio is to launch the asyncio tasks in a background thread. This way the foreground thread can be the synchronous blocking GUI loop and the asyncio framework does not interfer.

import asyncio
import threading
import FreeSimpleGUI as sg

# This is our background thread. It just sets the supplied
# event loop and then runs forever. It will get terminated
# by our foreground GUI thread

def start_asyncio_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

# This is the async task that will later be used to
# produce output that will be sent back to the
# GUI via the event queue.

async def async_task(window):
    for i in range(10):
        await asyncio.sleep(1)  # Simulate async work
        window.write_event_value("TASK_UPDATE", f"Async task iteration {i+1}")
    window.write_event_value("TASK_UPDATE", f"Async task finished")
    
# Create our GUI as usual.
layout = [
    [sg.Text("PySimpleGUI with asyncio")],
    [sg.Output(size=(40,10))],
    [sg.Button("Start Async Task"), sg.Button("Exit")]
]

window = sg.Window("Asyncio + PySimpleGUI Example", layout)

# Start the asyncio event loop in a background thread. We create the
# loop here so we can later on just use the ```run_coroutine_threadsafe```
# here.

loop = asyncio.new_event_loop()
thread = threading.Thread(target=start_asyncio_loop, args=(loop,), daemon=True)
thread.start()

# Main event loop for the GUI
while True:
    event, values = window.read()
    if event == sg.WINDOW_CLOSED or event == "Exit":
        break
    if event == "Start Async Task":
        # Schedule the async task
        asyncio.run_coroutine_threadsafe(async_task(window), loop)
    if event == "TASK_UPDATE":
        print(values[event])

# Clean up
loop.call_soon_threadsafe(loop.stop)  # Stop the asyncio loop
thread.join()
window.close()

Best Practices and Common Pitfalls

Writing Efficient Coroutines

Efficient coroutines are the cornerstone of asyncio programming. To write them effectively:

Avoiding Deadlocks and Race Conditions

Deadlocks and race conditions can arise when tasks contend for resources. To avoid them:

Debugging asyncio Applications

Debugging asyncio programs requires specialized techniques due to their concurrent nature:

Conclusion

We have explored the foundational aspects of asyncio, focusing on how to manage the event loop and execute coroutines and tasks effectively. This approach enabled us to achieve efficient non-blocking I/O and cooperative multitasking, unlocking the full potential of concurrent Python programming, particularly when combined with multiprocessing.

Additionally, we examined various patterns tailored to specific I/O scenarios, including file operations, HTTP communication, TCP and UDP networking, and serial port interactions. Each of these demonstrated how asyncio’s versatility simplifies handling diverse I/O needs.

Finally, we delved into best practices and common pitfalls. These included writing efficient coroutines, avoiding blocking operations, and properly utilizing synchronization primitives to manage shared resources without risking deadlocks. Together, these insights form a solid foundation for mastering asynchronous programming in Python.

Closing Thoughts

asyncio is more than just a library; it’s a paradigm shift for Python programming, empowering developers to write applications that are both scalable and responsive. By mastering its tools and techniques, you can bridge the gap between simplicity and performance, crafting solutions that handle complex workloads with ease.

Remember, the key to mastering asyncio lies in understanding its core principles, practicing its usage, and experimenting with its powerful capabilities. Whether you’re building networked applications, integrating asynchronous I/O, or tackling real-time data processing, asyncio provides the framework to achieve it with elegance and efficiency.

Dive in, experiment, and let asyncio transform the way you approach programming challenges.

Final Tips for Mastering asyncio

By embracing asyncio’s potential and adhering to these guidelines, you can create responsive and scalable Python applications.

This article is tagged:


Data protection policy

Dipl.-Ing. Thomas Spielauer, Wien (webcomplains389t48957@tspi.at)

This webpage is also available via TOR at http://rh6v563nt2dnxd5h2vhhqkudmyvjaevgiv77c62xflas52d5omtkxuid.onion/

Valid HTML 4.01 Strict Powered by FreeBSD IPv6 support