14. Handy Snippets
Small, reusable building blocks you can drop into real projects.
- Context manager template
from contextlib import contextmanager
@contextmanager
def managed():
try:
yield
finally:
...
- LRU cache for pure function
from functools import lru_cache
@lru_cache(maxsize=1024)
def compute(x: int) -> int:
return x * x
- cached_property
from functools import cached_property
class Service:
@cached_property
def config(self) -> dict:
return load_config()
- Thread and Process pools
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
with ThreadPoolExecutor(max_workers=8) as t:
...
with ProcessPoolExecutor() as p:
...
- asyncio gather with timeout
import asyncio
async def with_timeout(coros):
return await asyncio.wait_for(asyncio.gather(*coros), timeout=10)
- singledispatch
from functools import singledispatch
@singledispatch
def dump(obj):
return str(obj)
@dump.register
def _(obj: list):
return ",".join(map(str, obj))
- Dataclass with slots and immutability
from dataclasses import dataclass
@dataclass(slots=True, frozen=True)
class Settings:
url: str
debug: bool = False
- Pydantic v2 model
from pydantic import BaseModel
class Item(BaseModel):
name: str
price: float
- Limit concurrency with a semaphore
import asyncio
async def fetch(url: str) -> str: ...
async def crawl(urls: list[str], limit: int = 50):
sem = asyncio.Semaphore(limit)
async def one(u: str):
async with sem:
return await fetch(u)
return await asyncio.gather(*(one(u) for u in urls))
- Async producer/consumer with queue
import asyncio
async def producer(q):
for i in range(100):
await q.put(i)
await q.put(None)
async def consumer(q):
while (item := await q.get()) is not None:
...
q.task_done()
q.task_done()