# db_async_nb.py (just paste in a notebook cell)
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import text
= "postgresql+asyncpg://ben:secret@localhost:5433/mydb"
DATABASE_URL
= create_async_engine(
engine
DATABASE_URL,=10,
pool_size=20,
max_overflow=True,
pool_pre_ping={"server_settings": {"search_path": "api,public"}},
connect_args
)
async def get_table_info(schema: str = "api"):
async with engine.begin() as conn:
= await conn.execute(text("""
result SELECT
table_name,
column_name,
ordinal_position,
data_type,
is_nullable,
column_default
FROM information_schema.columns
WHERE table_schema = :schema
ORDER BY table_name, ordinal_position
"""), {"schema": schema})
return [dict(r) for r in result.mappings().all()]
SQLAlchemy
Here’s a compact-but-complete engineer’s guide to SQLAlchemy (v2.x mindset), with code you can lift into real projects.
What SQLAlchemy Is (and Isn’t)
Two layers
- Core: SQL expression language + dialects + connection/transaction APIs.
- ORM: Maps Python classes ↔︎ tables, manages identity map & unit-of-work via
Session
.
Batteries for SQL, not a DB: You still pick Postgres/MySQL/SQLite/etc. SQLAlchemy gives you portable SQL and well-designed state management.
Core vs ORM (when to use which)
Use case | Prefer |
---|---|
Hand-tuned SQL, ETL, admin scripts, DDL | Core |
App domain models, relations, change tracking, identity map | ORM |
Hybrid: complex read SQL + mapped writes | Both (Core selects, ORM for stateful writes) |
Quickstart – the modern (2.x) style
1) Define tables & models (Declarative)
from sqlalchemy import String, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase): pass
class User(Base):
= "user"
__tablename__ id: Mapped[int] = mapped_column(primary_key=True)
str] = mapped_column(String(256), unique=True, index=True)
email: Mapped[list["Post"]] = relationship(back_populates="author", cascade="all, delete-orphan")
posts: Mapped[
class Post(Base):
= "post"
__tablename__ id: Mapped[int] = mapped_column(primary_key=True)
str] = mapped_column(String(200))
title: Mapped[int] = mapped_column(ForeignKey("user.id"), index=True)
user_id: Mapped[= relationship(back_populates="posts") author: Mapped[User]
2) Engine & schema
from sqlalchemy import create_engine
= create_engine("postgresql+psycopg://user:pass@localhost/dbname", echo=False)
engine
# Create tables (dev/test use; prod uses Alembic)
Base.metadata.create_all(engine)
3) Sessions & basic CRUD
from sqlalchemy.orm import Session
with Session(engine, expire_on_commit=False) as session:
= User(email="alice@example.com")
alice ="Hello!"))
alice.posts.append(Post(title
session.add(alice)
session.commit()
# Query (2.x style)
from sqlalchemy import select
= select(User).where(User.email == "alice@example.com")
q = session.execute(q).scalar_one() row
Querying (2.x patterns)
from sqlalchemy import select, func
# Filter + order + limit
= (
stmt id).label("n"))
select(Post.title, func.count(Post.
.group_by(Post.title)id).desc())
.order_by(func.count(Post.10)
.limit(
)= session.execute(stmt).all()
rows
# Join + load related efficiently
from sqlalchemy.orm import joinedload, selectinload
= select(User).options(selectinload(User.posts)) # great for 1-to-many
stmt = session.execute(stmt).scalars().all() users
Loading strategies
selectinload
: many small SELECTs; excellent for 1-to-many fan-out (avoids big cartesian explosions).joinedload
: single JOIN; good for 1-to-1 or small 1-to-many.- Default lazy loading can cause N+1; prefer explicit
.options(...)
.
Transactions: do them right
with engine.begin() as conn: # Core transaction
conn.execute(...)
with Session(engine) as session: # ORM unit-of-work
with session.begin(): # commit/rollback handled
session.add(obj)# multiple operations…
- Prefer
with session.begin()
blocks for atomic writes. expire_on_commit=False
keeps objects usable after commit (common web pattern).
AsyncIO support
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
= create_async_engine("postgresql+asyncpg://user:pass@/dbname")
async_engine = async_sessionmaker(async_engine, expire_on_commit=False)
AsyncSession
async def run():
async with AsyncSession() as s:
from sqlalchemy import select
= await s.execute(select(User).where(User.email.like("%@example.com")))
res = res.scalars().all()
users
asyncio.run(run())
- Async is great for high-latency I/O workloads (APIs, microservices).
- ORM patterns are nearly identical, just
await
connection/session ops.
Migrations with Alembic (the standard)
alembic init migrations
- Set
sqlalchemy.url
inalembic.ini
. alembic revision --autogenerate -m "add post table"
alembic upgrade head
Tips
- Keep metadata import in
env.py
pointing to yourBase.metadata
. - Review autogen diffs (types, constraints, server defaults) before applying.
- For multi-DB setups, create envs or programmatic configs.
Relationships & Cascades (cheat sheet)
relationship(="...",
back_populates="save-update, merge, delete, delete-orphan",
cascade=True, # pair with ON DELETE in FK for DB-side deletes
passive_deletes="selectin" # sane default for 1-to-many
lazy )
delete-orphan
only on the parent side of a one-to-many.- Set
nullable=False
on FKs when the child must always have a parent. - Add
index=True
to FKs used in joins.
Performance Playbook
- Pick the right loader (
selectinload
vsjoinedload
). - Batch writes with
session.bulk_save_objects
(ORM) or Coreexecute(many=True)
; measure correctness vs speed (bulk APIs skip some ORM events). - Pagination: use keyset (a.k.a. “seek”) pagination with
WHERE (created_at, id) > (...) ORDER BY created_at, id LIMIT n
for large tables. - Profiling: set
echo=True
in dev; or usesqlalchemy.engine
logging +EXPLAIN ANALYZE
in Postgres. - Avoid N+1: always declare loader options for list views.
- Connection pool: tune
pool_size
,max_overflow
,pool_recycle
for your DB and workload.
Typing, Dataclasses, and Pydantic
- Use PEP-484 typed mappings (
Mapped[T]
,mapped_column
) for editor help & safety. - For API schemas, prefer Pydantic models at the boundary; map ORM ↔︎ DTOs explicitly (don’t bleed ORM into transport).
- With FastAPI, return Pydantic models; load ORM entities inside request handlers.
Patterns for Web Apps
Session scoping
- FastAPI: provide a session per request (dependency) and close it.
- Django (DRF) alongside SQLAlchemy: keep SQLAlchemy session separate from Django ORM; use middleware or per-view dependency.
Repositories (optional)
- Wrap DB access in small “repo” classes to decouple domain from ORM. Good for large codebases/testing.
Testing
- Use SQLite :memory: for fast unit tests (be careful with PostgreSQL-specific features).
- For integration tests, spin up a temp Postgres (Docker) + transactional rollbacks.
- Use
session.begin_nested()
and savepoint rollbacks per test to avoid re-creating schema. - Factory libraries (e.g.,
factory_boy
) help generate related graphs.
Advanced SQL with Core (portable power)
from sqlalchemy import Table, Column, Integer, String, MetaData, select, text
= MetaData()
meta = Table("thing", meta,
t "id", Integer, primary_key=True),
Column("name", String(50), index=True),
Column(
)with engine.connect() as conn:
"SET LOCAL statement_timeout = 2000")) # Postgres hint
conn.execute(text(= conn.execute(select(t.c.id, t.c.name).where(t.c.name.ilike("%foo%"))).all() rows
- Window functions (
func.row_number().over(...)
), CTEs (stmt.cte()
), UNION/INTERSECT, array/json types via dialects.
Postgres-specific goodies
from sqlalchemy.dialects.postgresql import JSONB, ARRAY
class Event(Base):
= "event"
__tablename__ id: Mapped[int] = mapped_column(primary_key=True)
list[str]] = mapped_column(ARRAY(String))
tags: Mapped[dict] = mapped_column(JSONB)
payload: Mapped[
# Indexes / constraints
from sqlalchemy import Index, func
"ix_event_gin_payload", Event.payload, postgresql_using="gin")
Index(
# Upserts
from sqlalchemy.dialects.postgresql import insert
= insert(User).values(email="x@x.com").on_conflict_do_nothing(index_elements=[User.email])
stmt session.execute(stmt)
Concurrency & Locks (pragmatic notes)
Use
SELECT ... FOR UPDATE
with ORM:from sqlalchemy import select = session.execute( u id==uid).with_for_update() select(User).where(User. ).scalar_one()
Prefer app-level idempotency (unique keys) to avoid duplicate writes.
Use advisory locks (Postgres) for coarse critical sections.
Common Pitfalls (and fixes)
- N+1 queries → always set loader options for list endpoints.
- Stale objects after commit →
expire_on_commit=False
(but remember to refresh when needed). - Implicit flush surprises →
autoflush=True
is default; callsession.flush()
where order matters, or disable via context manager temporarily. - Leaking sessions → always
with Session(...) as s:
or ensure FastAPI dependency closes sessions. - Over-eager cascades → review
cascade=
rules; be explicit with deletes.
Alembic Gotchas
- Column type changes may need server_default adjustments & data migrations.
- Enum changes in Postgres require
ALTER TYPE
(write revision manually). - Renames: use
op.alter_column(..., new_column_name=...)
, and keep ORM property name aligned.
Choosing the right abstraction
- Small service, API-only, heavy I/O → ORM + async session, or Core if you prefer SQL control.
- Large domain model, rich relations → ORM with explicit loader strategies and clear aggregate boundaries.
- Analytics/ETL → Core for READs + bulk COPY; ORM optional for simple stateful writes.
Mini “starter template” (sync)
yourapp/
db.py # engine, SessionLocal, Base
models.py # declarative models
schema.py # Pydantic DTOs (if using FastAPI)
repo/ # optional repositories
services/ # business logic (or “use cases”)
api/ # FastAPI/Django views
migrations/ # Alembic
tests/
db.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
class Base(DeclarativeBase): pass
= create_engine("postgresql+psycopg://user:pass@localhost/app", future=True)
engine = sessionmaker(engine, expire_on_commit=False)
SessionLocal
def get_session():
with SessionLocal() as s:
yield s
import pandas as pd
= await get_table_info("api") # <-- key change: use await, not asyncio.run
rows = pd.DataFrame(rows)
df df
table_name | column_name | ordinal_position | data_type | is_nullable | column_default | |
---|---|---|---|---|---|---|
0 | iotawatt | timestamp | 1 | timestamp with time zone | NO | None |
1 | iotawatt | device | 2 | text | NO | None |
2 | iotawatt | sensor | 3 | text | NO | None |
3 | iotawatt | power | 4 | double precision | YES | None |
4 | iotawatt | pf | 5 | double precision | YES | None |
5 | iotawatt | current | 6 | double precision | YES | None |
6 | iotawatt | v | 7 | double precision | YES | None |
from sqlalchemy import text
async def get_constraints(schema: str = "api"):
async with engine.begin() as conn:
= await conn.execute(text("""
res SELECT
tc.table_name,
tc.constraint_type,
kcu.column_name,
ccu.table_name AS foreign_table,
ccu.column_name AS foreign_column
FROM information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
LEFT JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
AND ccu.table_schema = tc.table_schema
WHERE tc.table_schema = :schema
ORDER BY tc.table_name, tc.constraint_type
"""), {"schema": schema})
return [dict(r) for r in res.mappings().all()]
= pd.DataFrame(await get_constraints("api"))
constraints_df constraints_df
async def get_index_ddl(schema: str = "api"):
async with engine.begin() as conn:
= await conn.execute(text("""
res SELECT n.nspname AS schema,
c.relname AS table,
i.relname AS index,
pg_get_indexdef(ix.indexrelid) AS definition,
ix.indisunique AS is_unique
FROM pg_index ix
JOIN pg_class i ON i.oid = ix.indexrelid
JOIN pg_class c ON c.oid = ix.indrelid
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = :schema
ORDER BY c.relname, i.relname
"""), {"schema": schema})
return [dict(r) for r in res.mappings().all()]
= pd.DataFrame(await get_index_ddl("api"))
indexes_df indexes_df
schema | table | index | definition | is_unique | |
---|---|---|---|---|---|
0 | api | iotawatt | iotawatt_timestamp_idx | CREATE INDEX iotawatt_timestamp_idx ON api.iot... | False |