Skip to content
C Codeloom
Python

asyncio Basics in Python

A practical introduction to Python asyncio: the event loop, async/await, asyncio.run, gather, create_task, and when to pick async over threads.

·6 min read · By Yash Kesharwani
Intermediate 11 min read

What you'll learn

  • A working mental model for the asyncio event loop
  • How async def and await differ from regular functions
  • How to launch coroutines with asyncio.run, gather, and create_task
  • How to handle cancellation and exceptions in async code
  • When asyncio is the right tool versus threads or processes

Prerequisites

asyncio is one of those parts of Python that feels confusing until it clicks, and then it feels obvious. The keywords are small (async, await), the standard library has a handful of functions you actually use, and most real programs follow the same shape. This article gives you that shape.

The event loop mental model

Forget threads for a minute. Imagine a single worker that can juggle many tasks, but never two at the same instant. Each task runs until it hits something slow, usually I/O. At that point the task says “I am waiting on the network, come back to me later,” and the worker switches to another ready task. When the slow thing finishes, the original task is put back in line.

That worker is the event loop. The “come back to me later” is await. The tasks are coroutines.

This model is great when your program spends most of its time waiting: HTTP requests, database queries, reading from sockets, talking to a message queue. It is not great when your program is doing heavy CPU work. There is still only one worker.

async def and await

A function defined with async def is a coroutine function. Calling it does not run it. It returns a coroutine object that has to be awaited or scheduled.

import asyncio

async def greet(name):
    await asyncio.sleep(1)
    return f"hello {name}"

# greet("ana")  -> <coroutine object>, does nothing on its own

You can await another coroutine inside an async def. Outside one, you need an entry point.

asyncio.run is your entry point

asyncio.run(coro) creates an event loop, runs the coroutine to completion, and tears the loop down. You usually call it exactly once, at the top of your program.

import asyncio

async def main():
    result = await greet("ana")
    print(result)

if __name__ == "__main__":
    asyncio.run(main())

Two rules that save a lot of pain:

  • Do not call asyncio.run from inside an already-running loop (notebooks, web servers, etc).
  • Do not put long blocking calls (like time.sleep, big CPU loops, or requests.get) inside an async function. They freeze the whole loop.

Running things concurrently with gather

A single coroutine awaiting one thing at a time is sequential. The point of asyncio is to run many awaits concurrently. asyncio.gather is the workhorse.

import asyncio, time

async def fetch(n):
    await asyncio.sleep(1)
    return n * 2

async def main():
    start = time.perf_counter()
    results = await asyncio.gather(fetch(1), fetch(2), fetch(3))
    print(results, f"in {time.perf_counter() - start:.2f}s")

asyncio.run(main())

That prints [2, 4, 6] in ~1.00s rather than 3 seconds. The three sleeps overlap because each one yields control to the loop.

gather returns results in the same order as the inputs. If any coroutine raises, by default gather cancels the rest and re-raises. Pass return_exceptions=True if you want exceptions returned in place of results instead.

create_task: fire and (eventually) wait

gather is fine when you know up front what you want to run. When you need to start a coroutine in the background and keep doing other work, use asyncio.create_task.

async def worker(name):
    await asyncio.sleep(2)
    print(f"{name} done")

async def main():
    t = asyncio.create_task(worker("background"))
    print("doing other work")
    await asyncio.sleep(0.5)
    print("still working")
    await t  # wait for the task at the end

asyncio.run(main())

A few things to remember:

  • Always keep a reference to your task. If the only reference is lost, the loop may garbage-collect it mid-flight.
  • You can cancel a task with task.cancel(). That raises asyncio.CancelledError inside it, which you should let propagate unless you have a very good reason to swallow it.
  • await task re-raises whatever exception the task raised. Wrap it in try/except if you expect failures. See error handling for patterns.

Timeouts and cancellation

Async code without timeouts is async code that will eventually hang. asyncio.wait_for is the simplest way to bound an operation.

async def slow():
    await asyncio.sleep(10)
    return "done"

async def main():
    try:
        result = await asyncio.wait_for(slow(), timeout=1.0)
    except asyncio.TimeoutError:
        print("gave up")

asyncio.run(main())

For more structured concurrency, Python 3.11+ has asyncio.TaskGroup, which is the modern preferred way to launch a group of tasks and clean them up on errors.

async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(worker("a"))
        tg.create_task(worker("b"))
    # both finished here, or both cancelled if one raised

If you are on 3.11 or newer, prefer TaskGroup over hand-rolling create_task plus cleanup.

A small real example

Here is a sketch of fetching several URLs concurrently with httpx, which is async-aware.

import asyncio
import httpx

async def fetch(client, url):
    r = await client.get(url, timeout=5)
    return url, r.status_code

async def main():
    urls = [
        "https://example.com",
        "https://www.python.org",
        "https://httpbin.org/get",
    ]
    async with httpx.AsyncClient() as client:
        results = await asyncio.gather(*(fetch(client, u) for u in urls))
    for url, status in results:
        print(status, url)

asyncio.run(main())

Three requests, roughly the time of the slowest one. Notice async with for the client. Async context managers come from the same protocol as their sync versions; if that is new to you, the post on context managers covers it.

When to use asyncio vs threads vs processes

A short, honest guide:

  • I/O-bound with many concurrent operations (network calls, sockets, queues): asyncio is usually the cleanest fit, especially if the libraries you use are async-native.
  • I/O-bound but using libraries that only have sync APIs: threads are simpler. concurrent.futures.ThreadPoolExecutor is fine, and you do not have to rewrite your code as coroutines.
  • CPU-bound work: neither threads nor asyncio help much because of the GIL. Use multiprocessing or ProcessPoolExecutor.

You can mix them. loop.run_in_executor lets you push a blocking call onto a thread or process pool from an async function without freezing the loop.

Common pitfalls

  • Calling a sync blocking function inside an async function (the classic requests.get in async code). Switch to an async client or run it in an executor.
  • Forgetting await. some_coro() on its own does nothing and you usually get a “coroutine was never awaited” warning.
  • Creating tasks but never awaiting or storing them.
  • Catching CancelledError and ignoring it. That breaks structured cancellation.

Wrap up

asyncio gives you one worker that switches between tasks every time one of them waits on I/O. You write coroutines with async def, suspend them with await, kick off your program with asyncio.run, and run things concurrently with gather, create_task, or TaskGroup. Use timeouts, do not block the loop, and reach for processes when your bottleneck is CPU rather than I/O. Once these pieces are in your head, the rest of the ecosystem (httpx, aiosqlite, FastAPI, asyncpg) becomes easy to read.