30 Dec 2024 - tsp
Last update 30 Dec 2024
34 mins
Python’s asyncio
is a library designed to write concurrent code using
the async
/await
syntax. It is particularly useful for I/O-bound
and high-level structured network code, enabling efficient multitasking within
a single-threaded program.
The following blog post serves as a quick overview, tutorial and a repository
of the most important concepts and patterns to use with ayncio
.
Synchronous Programming: In synchronous programming, tasks are executed sequentially. Each operation blocks the program until it is complete. For example:
def fetch_data():
data = read_file("data.txt") # Blocks until the file is read
process(data) # Processes the data
Asynchronous Programming: In asynchronous programming, tasks can be paused and resumed, allowing other tasks to run during idle times. Example:
async def fetch_data():
data = await read_file("data.txt") # Non-blocking read
process(data) # Processes the data
This approach improves efficiency for programs involving I/O by allowing other tasks to run during idle times. For example, while waiting for a file to be read, a web scraping program can send multiple HTTP requests and process them in the order they finish:
import asyncio
async def fetch_url(url, delay):
await asyncio.sleep(delay) # Simulates network request delay
return f"Content from {url} after {delay} seconds"
async def main():
urls = [
("http://example.com/1", 3),
("http://example.com/2", 2),
("http://example.com/3", 1)
]
tasks = [asyncio.create_task(fetch_url(url, delay)) for url, delay in urls]
for task in asyncio.as_completed(tasks):
result = await task
print(result) # Process results as they complete
asyncio.run(main())
In this example, asyncio.as_completed
ensures that results are processed in
the order they are ready, not the order they were initiated. This demonstrates how
asyncio improves responsiveness and resource utilization.
The core components of asyncio
are:
The Key Syntax is composed of:
async def
: Defines a coroutine.await
: Pauses the coroutine, yielding control back to the event loop.In the following sections we are going to explore how to utilize asyncio
and all basic components. Before diving into the matter just a word of caution:
Doing multithreading in the way asyncio
does in a cooperative fashion
does not utilize multiple cores of a machine and it still suffers from global
lock when one utilizes respective APIs. Even when combined with Pythons threading
module the application remains single threaded on the operating system level. The
only way of doing proper multithreading in Python is the utilization of multiprocessing
by launching multiple processes - which is also mentioned in this blog article.
Any other usage of asyncio
or threading
is more of a programming
pattern or utilizing overlapped I/O operations to interleave computation with I/O.
asyncio
Works InternallyUnderstanding the internal mechanics of asyncio
is crucial for writing
efficient and reliable asynchronous programs. Lets first delve into the key
components and their interactions - as well as the inner workings that are
usually not visible to the programmer.
At the core of asyncio is the event loop, a mechanism that continuously monitors and dispatches events. The event loop:
The event loop ensures that only one operation is active at any given time, but it can switch between tasks whenever they are idle, such as during I/O waits. Internally, the event loop is essentially an endless loop that continuously fetches and executes tasks from a queue.
Although Python’s asyncio
typically runs a single event loop per thread,
it is possible to run multiple threads, each with its own event loop.
However, this requires careful management to avoid conflicts between threads.
While Python does not natively support a thread pool of event loops similar
to kqueue, you can manually create threads with individual event loops for
specialized concurrency scenarios.
Scheduling tasks is accomplished through three primary mechanisms:
asyncio.create_task()
and allow coroutines to run
concurrently.asyncio
employs cooperative multitasking like the first multitasking
operating systems like Windows 3.x, where tasks voluntarily yield control
back to the event loop using await. Unlike preemptive multitasking, asyncio does
not interrupt tasks like modern operating systems do with threads; instead,
tasks must explicitly indicate when they can pause.
Example:
import asyncio
async def task1():
print("Task 1: Starting")
await asyncio.sleep(2)
print("Task 1: Done")
async def task2():
print("Task 2: Starting")
await asyncio.sleep(1)
print("Task 2: Done")
async def main():
await asyncio.gather(task1(), task2())
asyncio.run(main())
In this example, task2
completes first because it yields control
back to the event loop after a shorter delay.
await
to the Event Loopawait
. A normal function cannot “pause” in this way, as it
does not interact with the event loop.await
, it pauses its execution and hands
control back to the event loop.This process ensures that no time is wasted waiting for I/O operations, enabling other tasks to make progress in the mean time.
asyncio
In this section, we explore how to effectively use asyncio
by creating
event loops, running coroutines, and working with tasks. Additionally, we’ll
cover essential APIs such as sleep
, gather
, and run_in_executor
.
The event loop is the backbone of asyncio
. You typically use asyncio.run()
to automatically create and run an event loop for a coroutine. For more advanced use
cases, you can explicitly create and manage an event loop
using asyncio.get_event_loop()
.
The following example demonstrated how to manually create and run an event loop:
import asyncio
async def say_hello():
print("Hello, world!")
# Create an event loop
loop = asyncio.get_event_loop()
try:
# Run the coroutine in the event loop
loop.run_until_complete(say_hello())
finally:
# Close the loop when done
loop.close()
Using asyncio.run()
simplifies the process for most cases:
async def say_hello():
print("Hello, world!")
asyncio.run(say_hello())
run_until_complete
and Alternativesrun_until_complete
is a method of the event loop that runs a
given coroutine until it completes. It is typically used when you want
to execute a specific coroutine and then stop the event loop immediately
after it finishes.
Example:
import asyncio
async def say_hello():
print("Hello, world!")
await asyncio.sleep(1)
print("Goodbye, world!")
loop = asyncio.get_event_loop()
loop.run_until_complete(say_hello())
loop.close()
In this example, the event loop runs only the say_hello
coroutine,
and once it completes, the loop stops.
run_forever
: The run_forever
method keeps the event loop
running indefinitely. It is useful when the application has ongoing tasks,
such as a server or a long-running daemon.
Example:
import asyncio
async def periodic_task():
while True:
print("Task running...")
await asyncio.sleep(2)
loop = asyncio.get_event_loop()
loop.create_task(periodic_task())
try:
loop.run_forever()
finally:
loop.close()
Here, the loop continues running until it is explicitly stopped (e.g., by
calling loop.stop()
or terminating the program).
asyncio.run
: The asyncio.run
function simplifies running a
single coroutine by automatically creating and closing the event loop
for you. It is equivalent to creating an event loop, running run_until_complete
,
and closing the loop.
Example:
async def say_hello():
print("Hello, world!")
asyncio.run(say_hello())
Use run_until_complete
for targeted coroutine execution, run_forever
for
persistent applications, and asyncio.run for general-purpose, single-entry-point
asynchronous scripts.
As we have seen previously, tasks enable coroutines to run concurrently. When you
create a task using asyncio.create_task()
or loop.create_task()
, it is
enqueued onto the event loop’s ready queue. Tasks in the ready queue are executed
in the order they are dequeued by the event loop, subject to I/O readiness and
other coroutine states.
Example:
async def fetch_data(n):
print(f"Task {n} starting")
await asyncio.sleep(n)
print(f"Task {n} done")
async def main():
task1 = asyncio.create_task(fetch_data(2))
task2 = asyncio.create_task(fetch_data(1))
# Wait for both tasks to complete
await task1
await task2
asyncio.run(main())
asyncio.sleep()
Pauses a coroutine for a specified time.
async def example():
print("Waiting...")
await asyncio.sleep(2)
print("Done waiting")
asyncio.run(example())
asyncio.gather()
Runs multiple coroutines concurrently and aggregates their results. The
coroutines passed to gather
are scheduled for execution at the
time gather
is called. If a list of tasks is passed instead, those
tasks must already be created using asyncio.create_task
or similar
methods. This distinction allows flexibility - you can prepare a list of
tasks beforehand or directly pass coroutines to gather for immediate
scheduling.
async def task(n):
await asyncio.sleep(n)
return f"Task {n} complete"
async def main():
results = await asyncio.gather(task(1), task(2), task(3))
print(results)
asyncio.run(main())
asyncio.run_in_executor()
Offloads blocking code to a separate thread or process. The first argument
to run_in_executor
specifies the executor to use. If set to None
,
it defaults to the loop’s default thread pool executor. The code being
offloaded is thread-safe as long as the provided function itself is thread-safe.
You can access the event loop for a specific thread using asyncio.get_running_loop()
,
but only within coroutines running in that thread. Each thread can have its
own event loop, but managing multiple loops across threads requires careful
synchronization and design.
import time
# Simulating a blocking operation
def blocking_task():
time.sleep(3)
return "Blocking task complete"
async def main():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_task)
print(result)
asyncio.run(main())
Executors in Python’s asyncio framework provide a mechanism for running
blocking or computationally intensive code in a non-blocking way. Executors
can be either thread pools (ThreadPoolExecutor
) or process
pools (ProcessPoolExecutor
). When the first argument of run_in_executor
is not set to None
, you can pass a specific executor instance to
control where the code is executed. For example, you can create a ThreadPoolExecutor
or ProcessPoolExecutor
and pass it as the first argument. This gives finer control
over resource usage and isolation:
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# Blocking function
def blocking_task():
return "Task completed in executor"
async def main():
# Create a thread pool executor
thread_executor = ThreadPoolExecutor(max_workers=3)
# Use the thread executor
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(thread_executor, blocking_task)
print(result)
asyncio.run(main())
This allows you to control thread or process-level concurrency depending on your application needs.
Errors within coroutines need to be handled carefully to avoid crashing
the event loop. Use try
and except
blocks:
async def faulty_task():
raise ValueError("An error occurred")
async def main():
try:
await faulty_task()
except ValueError as e:
print(f"Caught an error: {e}")
asyncio.run(main())
Proper error handling ensurses that a single failing coroutine does not
halt other operations. This section has provided foundational techniques
for using asyncio
effectively.
asyncio
with ThreadingPython’s threading model includes the Global Interpreter Lock (GIL), which ensures that only one thread executes Python bytecode at a time. This simplifies memory management but limits the performance of multi-threaded CPU-bound tasks. However, threads can still perform I/O-bound tasks concurrently, as the GIL is released during I/O operations or native extensions.
asyncio.to_thread
and Traditional ThreadsThe asyncio.to_thread
function allows you to run blocking
functions in separate threads seamlessly:
import asyncio
import time
def blocking_task():
time.sleep(3)
return "Blocking task complete"
async def main():
result = await asyncio.to_thread(blocking_task)
print(result)
asyncio.run(main())
For more control, you can use the threading
module to create and
manage threads manually. Threads can execute synchronous functions
alongside asyncio
tasks, but you must carefully synchronize
shared data:
import threading
import asyncio
shared_data = []
def thread_function():
shared_data.append("Data from thread")
async def main():
thread = threading.Thread(target=thread_function)
thread.start()
thread.join()
print(shared_data)
asyncio.run(main())
You can launch event loops in threads created with the threading
module, enabling separate asyncio
queues to run independently.
To achieve this, each thread must explicitly create and run its own
event loop using asyncio.new_event_loop()
. This setup allows
you to pass tasks or data between threads using asyncio
-compatible
primitives like asyncio.Queue
.
Example:
import asyncio
import threading
# Worker function for the thread
def thread_worker(queue):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def process_queue():
while True:
data = await queue.get()
if data is None:
break
print(f"Processed in thread: {data}")
queue.task_done()
loop.run_until_complete(process_queue())
async def main():
queue = asyncio.Queue()
# Start a thread with its own event loop
thread = threading.Thread(target=thread_worker, args=(queue,))
thread.start()
# Add tasks to the queue
for i in range(5):
await queue.put(f"Task {i}")
# Signal the thread to exit
await queue.put(None)
# Wait for the thread to finish
thread.join()
asyncio.run(main())
To pass tasks between threads, you can use asyncio.run_coroutine_threadsafe()
to schedule a coroutine on another thread’s event loop. This is useful
for transferring data or triggering actions from one thread to another.
Example:
import asyncio
import threading
# Worker function for the thread
def thread_worker(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
async def task_to_run(data):
print(f"Task received in other thread: {data}")
async def main():
# Create an event loop for the second thread
loop = asyncio.new_event_loop()
thread = threading.Thread(target=thread_worker, args=(loop,))
thread.start()
# Pass a task to the second thread
asyncio.run_coroutine_threadsafe(task_to_run("Hello from main"), loop)
# Give some time for the task to execute
await asyncio.sleep(1)
# Stop the thread's event loop
loop.call_soon_threadsafe(loop.stop)
thread.join()
asyncio.run(main())
In this example, a coroutine is passed to a thread’s event loop
using asyncio.run_coroutine_threadsafe
, demonstrating how tasks can
be shared between threads while maintaining asyncio
compatibility.
You can also launch own event loops in different threads that you have launched. Then you can pass tasks from one thread and enqueue them in another threads loop.
When accessing shared resources between threads and asyncio
tasks,
synchronization primitives are crucial. You can use standard threading primitives
like Lock
or Condition
or rely on asyncio
’s native primitives
like asyncio.Lock
.
Example using threading.Lock
:
import asyncio
import threading
shared_data = []
lock = threading.Lock()
def thread_function():
with lock:
shared_data.append("Data from thread")
async def async_function():
async with asyncio.Lock():
shared_data.append("Data from async task")
async def main():
thread = threading.Thread(target=thread_function)
thread.start()
await async_function()
thread.join()
print(shared_data)
asyncio.run(main())
Example using asyncio.Lock
:
import asyncio
shared_data = []
lock = asyncio.Lock()
async def async_function_one():
async with lock:
shared_data.append("Data from async function one")
await asyncio.sleep(1) # Simulate some asynchronous work
print("Async function one completed")
async def async_function_two():
async with lock:
shared_data.append("Data from async function two")
await asyncio.sleep(1) # Simulate some asynchronous work
print("Async function two completed")
async def main():
await asyncio.gather(async_function_one(), async_function_two())
print(shared_data)
asyncio.run(main())
When creating processes, Python supports two modes:
Spawn
: Starts a new process without inheriting resources from the parent
process. Each process runs its own event loop.Fork
: Inherits resources and state, including the GIL, from the parent process.asyncio
works independently in each process. This means you must initialize
event loops separately in each process when using spawn
or fork
.
Notably, fork
is not available on Windows systems, as it is specific to
Unix-based operating systems. On Windows, only spawn
can be used, which
does not inherit the state of the parent process, requiring more explicit
initialization in child processes.
Example:
import asyncio
from multiprocessing import Process
async def async_task():
print("Async task running")
await asyncio.sleep(1)
print("Async task done")
def worker():
asyncio.run(async_task())
if __name__ == "__main__":
process = Process(target=worker)
process.start()
process.join()
ThreadPoolExecutor
and ProcessPoolExecutor
The ThreadPoolExecutor
is ideal for offloading I/O-bound tasks.
It creates a pool of threads and schedules blocking operations to run
concurrently:
from concurrent.futures import ThreadPoolExecutor
import asyncio
def blocking_task():
return "Task in thread pool"
async def main():
executor = ThreadPoolExecutor(max_workers=4)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, blocking_task)
print(result)
asyncio.run(main())
For CPU-bound tasks, the ProcessPoolExecutor
spawns separate
processes. This provides true parallelism by bypassing the GIL:
from concurrent.futures import ProcessPoolExecutor
import asyncio
def cpu_bound_task():
return sum(range(10**6))
async def main():
executor = ProcessPoolExecutor(max_workers=2)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, cpu_bound_task)
print(result)
asyncio.run(main())
ProcessPoolExecutor
Processes created by ProcessPoolExecutor
have their own
memory space, preventing unintended interactions between tasks.
This isolation is crucial for ensuring safe parallel execution of CPU-intensive
workloads but incurs higher overhead compared to ThreadPoolExecutor
. Use
it when tasks require heavy computation or need strict memory separation.
When using ProcessPoolExecutor
, communication between coroutines and the
processes managed by the executor requires explicit mechanisms because each process
operates in its own isolated memory space. Here are the key methods to pass arguments
into and retrieve results from tasks executed in a ProcessPoolExecutor
:
Arguments can be passed directly to the function executed in
the ProcessPoolExecutor
. For example:
from concurrent.futures import ProcessPoolExecutor
import asyncio
def process_task(data):
return f"Processed {data}"
async def main():
loop = asyncio.get_running_loop()
executor = ProcessPoolExecutor()
result = await loop.run_in_executor(executor, process_task, "Task data")
print(result)
asyncio.run(main())
In this example, “Task data” is passed to the process_task function in the process pool.
The result of the function is returned to the calling coroutine as demonstrated
in the example above. The await
keyword ensures the calling coroutine waits
until the process completes execution and the result is available.
To enable communication beyond simple arguments and return values, you can use
inter-process communication (IPC) mechanisms like multiprocessing.Queue
or multiprocessing.Manager
.
Example using multiprocessing.Queue
:
from multiprocessing import Queue
from concurrent.futures import ProcessPoolExecutor
import asyncio
queue = Queue()
def process_task():
queue.put("Data from process")
async def main():
loop = asyncio.get_running_loop()
executor = ProcessPoolExecutor()
# Run the task and pass the queue
await loop.run_in_executor(executor, process_task)
# Retrieve data from the queue
while not queue.empty():
print(queue.get())
asyncio.run(main())
Note that we generated the queue outside of async def main
. One
could get the idea that one can pass the queue as third argument
to run_in_executor
and accept is as argument in a process_task(queue)
method. This does not work since queues always only have to be passed
via inheritance and never as arguments due to the cloning of the file
descriptors during the process fork.
Since processes do not share memory, data passed between them must be
serialized (e.g., using pickle
). Python’s concurrent.futures
handles serialization automatically when passing arguments and receiving
results from functions executed in a ProcessPoolExecutor
.
By using these methods, you can effectively communicate between coroutines
and processes while leveraging the isolation and parallelism provided
by ProcessPoolExecutor
. Make sure to carefully manage serialization
overhead and IPC mechanisms for performance-critical applications.
asyncio
for I/O Operationsasyncio
and aiohttp
asyncio
provides an excellent foundation for handling network
I/O operations, such as making HTTP requests. The aiohttp
library
builds on this by offering an asynchronous HTTP client and server.
Example of using aiohttp
for HTTP requests:
import aiohttp
import asyncio
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"http://example.com",
"http://example.org",
"http://example.net",
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(main())
This example demonstrates how aiohttp integrates with asyncio to handle multiple HTTP requests concurrently.
Example of using aiohttp
to build an HTTP server:
import asyncio
from aiohttp import web
async def handle(request):
name = request.rel_url.query.get("name", "world")
return web.Response(text=f"Hello, {name}!")
async def init_app():
app = web.Application()
app.router.add_get("/", handle)
return app
async def main():
app = await init_app()
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "localhost", 8080)
await site.start()
print("Server running on http://localhost:8080")
# Keep the server running
await asyncio.Event().wait()
asyncio.run(main())
This server responds to HTTP GET
requests on /
, returning a
personalized greeting if a name query parameter is provided.
While Python’s standard open function blocks the event loop, you can
use aiofiles
to perform asynchronous file operations. This is
an optional library that can be installed via PyPi:
pip install aiofiles
Example of reading and writing to a file asynchronously:
import aiofiles
import asyncio
async def read_file(filename):
async with aiofiles.open(filename, mode='r') as f:
contents = await f.read()
print(contents)
async def write_file(filename, content):
async with aiofiles.open(filename, mode='w') as f:
await f.write(content)
async def main():
await write_file("example.txt", "Hello, asyncio file I/O!")
await read_file("example.txt")
asyncio.run(main())
The aiofiles
library ensures that file I/O does not block the
event loop, enabling other tasks to run concurrently.
asyncio
asyncio
can also be used to handle serial communication through
libraries like pyserial-asyncio
. This allows for efficient
handling of serial data streams. This is an optional add-on library
that can either be installed from PyPi:
pip install asyncio-serial
or depending on your platform via your local package management which is preferred. On FreeBSD for Python 3.11 this would be done using
pkg install py311-pyserial-asyncio
Example of reading data from a serial port:
import asyncio
import serial_asyncio
class SerialReader(asyncio.Protocol):
def __init__(self):
self.buffer = b''
self.expected_length = None
self.terminator = b'\n'
def data_received(self, data):
self.buffer += data
while self.buffer:
tag = self.buffer[0]
if tag == 1: # Fixed-length message
if len(self.buffer) < 2:
break
self.expected_length = self.buffer[1]
if len(self.buffer) >= 2 + self.expected_length:
message = self.buffer[2:2 + self.expected_length]
self.buffer = self.buffer[2 + self.expected_length:]
print(f"Tag 1: Received message: {message.decode()}")
else:
break
elif tag == 2: # Terminated message
terminator_index = self.buffer.find(self.terminator)
if terminator_index != -1:
message = self.buffer[1:terminator_index]
self.buffer = self.buffer[terminator_index + 1:]
print(f"Tag 2: Received message: {message.decode()}")
else:
break
else:
print(f"Unknown tag: {tag}")
self.buffer = self.buffer[1:] # Discard the unknown tag
async def main():
loop = asyncio.get_running_loop()
await serial_asyncio.create_serial_connection(
loop, SerialReader, '/dev/ttyU0', baudrate=9600
)
# Run forever to keep the serial connection active
await asyncio.Event().wait()
asyncio.run(main())
In this example, serial_asyncio
creates a non-blocking serial
connection, and the SerialReader
protocol processes incoming data.
The protocol distinguishes between two types of messages based on the tag:
fixed-length messages and newline-terminated messages. This setup ensures
the event loop remains responsive while handling varied serial data
formats asynchronously.
serial_asyncio
To handle reconnections when a serial resource is physically disconnected, you can monitor the connection status and attempt to reconnect upon disconnection. Here’s how it can be achieved:
import asyncio
import serial_asyncio
class SerialReconnector(asyncio.Protocol):
def __init__(self, port, baudrate):
self.port = port
self.baudrate = baudrate
self.buffer = b''
def connection_made(self, transport):
self.transport = transport
print(f"Connected to {self.port}")
def data_received(self, data):
self.buffer += data
print(f"Received: {data.decode()}")
def connection_lost(self, exc):
print("Connection lost, attempting to reconnect...")
asyncio.create_task(self.reconnect())
async def reconnect(self):
while True:
try:
await serial_asyncio.create_serial_connection(
asyncio.get_running_loop(), lambda: self, self.port, baudrate=self.baudrate
)
print("Reconnected successfully")
break
except Exception as e:
print(f"Reconnection failed: {e}, retrying in 5 seconds...")
await asyncio.sleep(5)
async def main():
port = '/dev/ttyUSB0'
baudrate = 9600
loop = asyncio.get_running_loop()
protocol = SerialReconnector(port, baudrate)
await serial_asyncio.create_serial_connection(loop, lambda: protocol, port, baudrate=baudrate)
# Run indefinitely
await asyncio.Event().wait()
asyncio.run(main())
This solution ensures continuous attempts to reconnect the serial resource and restores communication once it is reconnected.
asyncio
and TCP/UDP client and server connectionsasyncio
provides robust support for implementing TCP and UDP clients
and servers. Below, we demonstrate examples for both protocols, including
handling reconnects for TCP and sending/receiving UDP packets.
A TCP client can be created using asyncio.open_connection
.
Here’s an example with reconnect logic:
import asyncio
async def tcp_client(host, port):
while True:
try:
reader, writer = await asyncio.open_connection(host, port)
print(f"Connected to {host}:{port}")
writer.write(b"Hello, Server!\n")
await writer.drain()
data = await reader.readline()
print(f"Received: {data.decode().strip()}")
writer.close()
await writer.wait_closed()
break
except (ConnectionRefusedError, asyncio.TimeoutError):
print(f"Connection failed, retrying in 5 seconds...")
await asyncio.sleep(5)
asyncio.run(tcp_client("127.0.0.1", 8888))
This client continuously attempts to reconnect if the connection fails.
A basic TCP server can be implemented using asyncio.start_server
:
import asyncio
async def handle_client(reader, writer):
client_address = writer.get_extra_info('peername')
print(f"Connection from {client_address}")
while True:
data = await reader.readline()
if not data:
break
print(f"Received: {data.decode().strip()}")
writer.write(data)
await writer.drain()
print(f"Connection closed: {client_address}")
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(handle_client, "127.0.0.1", 8888)
print("Server running on 127.0.0.1:8888")
async with server:
await server.serve_forever()
asyncio.run(main())
This server echoes back messages it receives from clients.
For UDP, asyncio
provides DatagramProtocol
for asynchronous
packet transmission and reception. Here’s an example of a UDP client and server:
UDP Server:
import asyncio
class UDPServerProtocol(asyncio.DatagramProtocol):
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
print(f"Received {data.decode()} from {addr}")
# Echo the data back
self.transport.sendto(data, addr)
async def main():
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
lambda: UDPServerProtocol(), local_addr=("127.0.0.1", 9999)
)
print("UDP server running on 127.0.0.1:9999")
try:
await asyncio.Event().wait() # Keep server running
finally:
transport.close()
asyncio.run(main())
UDP Client:
import asyncio
async def udp_client():
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
lambda: asyncio.DatagramProtocol(),
remote_addr=("127.0.0.1", 9999),
)
message = b"Hello, UDP Server!"
transport.sendto(message)
print(f"Sent: {message.decode()}")
await asyncio.sleep(1) # Give time for server response (if needed)
transport.close()
asyncio.run(udp_client())
The UDP client sends a message to the server, which echoes it back. Both the server and client are non-blocking and efficient.
FreeSimpleGUI
FreeSimpleGUI
is a user-friendly GUI framework designed for rapid
development of Python utilities. It offers straightforward integration
with libraries like matplotlib
and supports 2D/3D drawing canvases
along with intuitive mouse input handling. Previously known as PySimpleGUI,
the library has been succeeded by FreeSimpleGUI, an actively maintained and
free fork of the original, which is no longer open source. Since it’s a third
party library it can be installed using PyPi:
pip install FreeSimpleGUI
When integrating asyncio
with FreeSimpleGUI
, several
challenges arise:
FreeSimpleGUI
uses its own blocking event
loop to process GUI events. This can conflict with asyncio
’s event loop,
leading to unresponsive GUIs or stalled asyncio
tasks.asyncio
coroutine must be done
in a thread-safe manner to avoid crashes or undefined behavior.asyncio
and GUI
components can introduce performance bottlenecks if not handled efficiently.To avoid conflicts, the asyncio
event loop and FreeSimpleGUI
’s event
loop must cooperate. A very simple way is an approach that yields execution instead
of blocking while waiting on window events. This has the drawback of performing busy waiting
asyncio
import asyncio
import FreeSimpleGUI as sg
async def long_running_task(window):
for i in range(5):
await asyncio.sleep(1)
window.write_event_value("TASK_UPDATE", f"Step {i + 1} complete")
window.write_event_value("TASK_DONE", "Task finished")
def get_window_events(window, future):
while True:
event, values = window.read(timeout = 0)
if event is not None:
future.set_result((event, values))
break
asyncio.sleep(0)
async def main():
layout = [[sg.Text("Asyncio Integration Example")],
[sg.Output(size=(40, 10))],
[sg.Button("Start Task"), sg.Button("Exit")]]
window = sg.Window("Asyncio and FreeSimpleGUI", layout, finalize=True)
loop = asyncio.get_running_loop()
while True:
# event, values = await loop.run_in_executor(None, window.read())
# event, values = window.read(timeout = 1)
future = asyncio.Future()
loop.call_soon(get_window_events, window, future)
event, values = await future
await asyncio.sleep(0)
if event in (sg.WINDOW_CLOSED, "Exit"):
break
if event == "Start Task":
asyncio.create_task(long_running_task(window))
if event == "TASK_UPDATE":
print(values[event])
if event == "TASK_DONE":
print(values[event])
window.close()
asyncio.run(main())
As one can see in this pattern the window.read
method has been wrapped in another
synchronous method that periodically polls the GUIs event loop without blocking (thus
setting timeout=0
). In case there is a result the future object is set to the result
and the method returns, else it yields control to the asyncio
framework by
executing asyncio.sleep(0)
.
asyncio
tasks in a background threadA way better approach for cooperating between a GUI event loop and asyncio
is to
launch the asyncio tasks in a background thread. This way the foreground thread can be
the synchronous blocking GUI loop and the asyncio
framework does not interfer.
import asyncio
import threading
import FreeSimpleGUI as sg
# This is our background thread. It just sets the supplied
# event loop and then runs forever. It will get terminated
# by our foreground GUI thread
def start_asyncio_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
# This is the async task that will later be used to
# produce output that will be sent back to the
# GUI via the event queue.
async def async_task(window):
for i in range(10):
await asyncio.sleep(1) # Simulate async work
window.write_event_value("TASK_UPDATE", f"Async task iteration {i+1}")
window.write_event_value("TASK_UPDATE", f"Async task finished")
# Create our GUI as usual.
layout = [
[sg.Text("PySimpleGUI with asyncio")],
[sg.Output(size=(40,10))],
[sg.Button("Start Async Task"), sg.Button("Exit")]
]
window = sg.Window("Asyncio + PySimpleGUI Example", layout)
# Start the asyncio event loop in a background thread. We create the
# loop here so we can later on just use the ```run_coroutine_threadsafe```
# here.
loop = asyncio.new_event_loop()
thread = threading.Thread(target=start_asyncio_loop, args=(loop,), daemon=True)
thread.start()
# Main event loop for the GUI
while True:
event, values = window.read()
if event == sg.WINDOW_CLOSED or event == "Exit":
break
if event == "Start Async Task":
# Schedule the async task
asyncio.run_coroutine_threadsafe(async_task(window), loop)
if event == "TASK_UPDATE":
print(values[event])
# Clean up
loop.call_soon_threadsafe(loop.stop) # Stop the asyncio loop
thread.join()
window.close()
Efficient coroutines are the cornerstone of asyncio
programming.
To write them effectively:
asyncio.sleep()
instead of time.sleep()
.asyncio.to_thread
or libraries
like concurrent.futures
. This keeps the event loop responsive.asyncio
’s synchronization
primitives (e.g., asyncio.Lock
, asyncio.Queue
) instead of
their threading counterparts.asyncio.run
with debug
mode or external profilers to identify bottlenecks and optimize task
scheduling.Deadlocks and race conditions can arise when tasks contend for resources. To avoid them:
await asyncio.wait_for(task, timeout=5)
Debugging asyncio
programs requires specialized techniques
due to their concurrent nature:
asyncio.run(main(), debug=True)
asyncio.all_tasks()
and asyncio.current_task()
to analyze task states.loop.set_task_factory()
to monitor task creation.We have explored the foundational aspects of asyncio
, focusing on
how to manage the event loop and execute coroutines and tasks effectively.
This approach enabled us to achieve efficient non-blocking I/O and
cooperative multitasking, unlocking the full potential of concurrent
Python programming, particularly when combined with multiprocessing.
Additionally, we examined various patterns tailored to specific I/O scenarios, including file operations, HTTP communication, TCP and UDP networking, and serial port interactions. Each of these demonstrated how asyncio’s versatility simplifies handling diverse I/O needs.
Finally, we delved into best practices and common pitfalls. These included writing efficient coroutines, avoiding blocking operations, and properly utilizing synchronization primitives to manage shared resources without risking deadlocks. Together, these insights form a solid foundation for mastering asynchronous programming in Python.
asyncio
is more than just a library; it’s a paradigm shift for Python
programming, empowering developers to write applications that are both scalable
and responsive. By mastering its tools and techniques, you can bridge the gap
between simplicity and performance, crafting solutions that handle complex
workloads with ease.
Remember, the key to mastering asyncio lies in understanding its core principles, practicing its usage, and experimenting with its powerful capabilities. Whether you’re building networked applications, integrating asynchronous I/O, or tackling real-time data processing, asyncio provides the framework to achieve it with elegance and efficiency.
Dive in, experiment, and let asyncio transform the way you approach programming challenges.
asyncio
asyncio
is by building projects that utilize
its features extensively.By embracing asyncio
’s potential and adhering to these guidelines,
you can create responsive and scalable Python applications.
This article is tagged:
Dipl.-Ing. Thomas Spielauer, Wien (webcomplains389t48957@tspi.at)
This webpage is also available via TOR at http://rh6v563nt2dnxd5h2vhhqkudmyvjaevgiv77c62xflas52d5omtkxuid.onion/