Skip to content

h0rn3t/fastapi-async-sqlalchemy

Repository files navigation

SQLAlchemy FastAPI middleware

ci ci codecov License: MIT pip Downloads Updates

Description

Provides SQLAlchemy middleware for FastAPI using AsyncSession and async engine.

Install

  pip install fastapi-async-sqlalchemy

It also works with sqlmodel

Examples

Note that the session object provided by db.session is based on the Python3.7+ ContextVar. This means that each session is linked to the individual request context in which it was created.

from fastapi import FastAPI
from fastapi_async_sqlalchemy import SQLAlchemyMiddleware
from fastapi_async_sqlalchemy import db  # provide access to a database session
from sqlalchemy import column
from sqlalchemy import table

app = FastAPI()
app.add_middleware(
    SQLAlchemyMiddleware,
    db_url="postgresql+asyncpg://user:[email protected]:5432/primary_db",
    engine_args={              # engine arguments example
        "echo": True,          # print all SQL statements
        "pool_pre_ping": True, # feature will normally emit SQL equivalent to “SELECT 1” each time a connection is checked out from the pool
        "pool_size": 5,        # number of connections to keep open at a time
        "max_overflow": 10,    # number of connections to allow to be opened above pool_size
    },
)
# once the middleware is applied, any route can then access the database session
# from the global ``db``

foo = table("ms_files", column("id"))

# Usage inside of a route
@app.get("/")
async def get_files():
    result = await db.session.execute(foo.select())
    return result.fetchall()

async def get_db_fetch():
    # It uses the same ``db`` object and use it as a context manager:
    async with db():
        result = await db.session.execute(foo.select())
        return result.fetchall()

# Usage inside of a route using a db context
@app.get("/db_context")
async def db_context():
    return await get_db_fetch()

# Usage outside of a route using a db context
@app.on_event("startup")
async def on_startup():
    # We are outside of a request context, therefore we cannot rely on ``SQLAlchemyMiddleware``
    # to create a database session for us.
    result = await get_db_fetch()


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8002)

Usage of multiple databases

databases.py

from fastapi import FastAPI
from fastapi_async_sqlalchemy import create_middleware_and_session_proxy

FirstSQLAlchemyMiddleware, first_db = create_middleware_and_session_proxy()
SecondSQLAlchemyMiddleware, second_db = create_middleware_and_session_proxy()

main.py

from fastapi import FastAPI

from databases import FirstSQLAlchemyMiddleware, SecondSQLAlchemyMiddleware
from routes import router

app = FastAPI()

app.include_router(router)

app.add_middleware(
    FirstSQLAlchemyMiddleware,
    db_url="postgresql+asyncpg://user:[email protected]:5432/primary_db",
    engine_args={
        "pool_size": 5,
        "max_overflow": 10,
    },
)
app.add_middleware(
    SecondSQLAlchemyMiddleware,
    db_url="mysql+aiomysql://user:[email protected]:5432/primary_db",
    engine_args={
        "pool_size": 5,
        "max_overflow": 10,
    },
)

routes.py

import asyncio

from fastapi import APIRouter
from sqlalchemy import column, table, text

from databases import first_db, second_db

router = APIRouter()

foo = table("ms_files", column("id"))

@router.get("/first-db-files")
async def get_files_from_first_db():
    result = await first_db.session.execute(foo.select())
    return result.fetchall()


@router.get("/second-db-files")
async def get_files_from_second_db():
    result = await second_db.session.execute(foo.select())
    return result.fetchall()


@router.get("/concurrent-queries")
async def parallel_select():
    async with first_db(multi_sessions=True):
        async def execute_query(query):
            return await first_db.session.execute(text(query))

        tasks = [
            asyncio.create_task(execute_query("SELECT 1")),
            asyncio.create_task(execute_query("SELECT 2")),
            asyncio.create_task(execute_query("SELECT 3")),
            asyncio.create_task(execute_query("SELECT 4")),
            asyncio.create_task(execute_query("SELECT 5")),
            asyncio.create_task(execute_query("SELECT 6")),
        ]

        await asyncio.gather(*tasks)