Provides SQLAlchemy middleware for FastAPI using AsyncSession and async engine.
pip install fastapi-async-sqlalchemy
It also works with sqlmodel
Note that the session object provided by db.session
is based on the Python3.7+ ContextVar
. This means that
each session is linked to the individual request context in which it was created.
from fastapi import FastAPI
from fastapi_async_sqlalchemy import SQLAlchemyMiddleware
from fastapi_async_sqlalchemy import db # provide access to a database session
from sqlalchemy import column
from sqlalchemy import table
app = FastAPI()
app.add_middleware(
SQLAlchemyMiddleware,
db_url="postgresql+asyncpg://user:[email protected]:5432/primary_db",
engine_args={ # engine arguments example
"echo": True, # print all SQL statements
"pool_pre_ping": True, # feature will normally emit SQL equivalent to “SELECT 1” each time a connection is checked out from the pool
"pool_size": 5, # number of connections to keep open at a time
"max_overflow": 10, # number of connections to allow to be opened above pool_size
},
)
# once the middleware is applied, any route can then access the database session
# from the global ``db``
foo = table("ms_files", column("id"))
# Usage inside of a route
@app.get("/")
async def get_files():
result = await db.session.execute(foo.select())
return result.fetchall()
async def get_db_fetch():
# It uses the same ``db`` object and use it as a context manager:
async with db():
result = await db.session.execute(foo.select())
return result.fetchall()
# Usage inside of a route using a db context
@app.get("/db_context")
async def db_context():
return await get_db_fetch()
# Usage outside of a route using a db context
@app.on_event("startup")
async def on_startup():
# We are outside of a request context, therefore we cannot rely on ``SQLAlchemyMiddleware``
# to create a database session for us.
result = await get_db_fetch()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8002)
databases.py
from fastapi import FastAPI
from fastapi_async_sqlalchemy import create_middleware_and_session_proxy
FirstSQLAlchemyMiddleware, first_db = create_middleware_and_session_proxy()
SecondSQLAlchemyMiddleware, second_db = create_middleware_and_session_proxy()
main.py
from fastapi import FastAPI
from databases import FirstSQLAlchemyMiddleware, SecondSQLAlchemyMiddleware
from routes import router
app = FastAPI()
app.include_router(router)
app.add_middleware(
FirstSQLAlchemyMiddleware,
db_url="postgresql+asyncpg://user:[email protected]:5432/primary_db",
engine_args={
"pool_size": 5,
"max_overflow": 10,
},
)
app.add_middleware(
SecondSQLAlchemyMiddleware,
db_url="mysql+aiomysql://user:[email protected]:5432/primary_db",
engine_args={
"pool_size": 5,
"max_overflow": 10,
},
)
routes.py
import asyncio
from fastapi import APIRouter
from sqlalchemy import column, table, text
from databases import first_db, second_db
router = APIRouter()
foo = table("ms_files", column("id"))
@router.get("/first-db-files")
async def get_files_from_first_db():
result = await first_db.session.execute(foo.select())
return result.fetchall()
@router.get("/second-db-files")
async def get_files_from_second_db():
result = await second_db.session.execute(foo.select())
return result.fetchall()
@router.get("/concurrent-queries")
async def parallel_select():
async with first_db(multi_sessions=True):
async def execute_query(query):
return await first_db.session.execute(text(query))
tasks = [
asyncio.create_task(execute_query("SELECT 1")),
asyncio.create_task(execute_query("SELECT 2")),
asyncio.create_task(execute_query("SELECT 3")),
asyncio.create_task(execute_query("SELECT 4")),
asyncio.create_task(execute_query("SELECT 5")),
asyncio.create_task(execute_query("SELECT 6")),
]
await asyncio.gather(*tasks)