FastAPI 实战秘籍:从零构建高性能 API -数据库篇

前面我们讲了Fastapi日志、配置管理,在web开发中主要是针对数据库的增删改查,今天主要讲一下通过sqlalchemy连接数据库,数据库会话,数据库模型的创建和自动建表等。

FastAPI实战秘籍:从零构建高性能API-日志篇》《FastAPI实战秘籍:从零构建高性能API-配置篇

准备工作

对象关系映射操作库:

复制
pip install sqlalchemy1.

数据库连接配置,在config.yaml中增加如下如下配置:

复制
# config.yaml database: url: "mysql+pymysql://root:123456@127.0.0.1:3306/zadmin" async_url: "mysql+asyncmy://root:123456@127.0.0.1:3306/zadmin" pool_size: 20 echo_sql: false # 是否打印SQL日志1.2.3.4.5.6.

需要安装pymysql和asyncmy库,处理同步及异步请求:

复制
pip install pymysql asyncmy1.

在前面讲解配置中我们设置了配置加载及定义:

复制
# config.py class DatabaseConfig(BaseModel): url: str async_url: str pool_size: int = 10 echo_sql: bool = False class Settings(BaseSettings): ... database: DatabaseConfig ...1.2.3.4.5.6.7.8.9.10.11.12.
数据库连接

在core下创建database.py文件。使用sqlalchemy连接数据库,数据源的定义如下:

复制
database_url dialect+driver://username:password@host:port/database # 数据库连接配置 # MySQL示例: mysql+pymysql://username:password@localhost/dbname # PostgreSQL示例: postgresql+psycopg2://username:password@localhost/dbname1.2.3.4.5.

(1) 创建数据库引擎(Engine)

复制
# core/database.py from sqlalchemy import create_engine from sqlalchemy.ext.asyncio import create_async_engine from config.config import get_settings settings = get_settings() # 异步引擎 async_engine = create_async_engine( url = settings.database.async_url, echo=settings.database.echo_sql, pool_size=settings.database.pool_size ) # 同步引擎 engine = create_engine( url = settings.database.url, echo=settings.database.echo_sql, pool_size=settings.database.pool_size )1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.

(2) 创建会话工厂(Session Factory)

「会话工厂」是一个用于创建会话实例的工厂函数或类。它预先配置了会话的各种参数,但不会立即创建数据库连接。

复制
# core/database.py # 同步会话工厂 session_local = sessionmaker( autocommit=False, autoflush=False, bind=engine) # 异步会话工厂 session_factory = async_sessionmaker( autocommit=False, autoflush=False, bind=async_engine, expire_on_commit=True, class_=AsyncSession )1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.

(3) 创建会话(Session)

「会话」是SQLAlchemy ORM的核心,它代表了一个与数据库的"对话",负责:

管理对象状态执行数据库操作处理事务维护对象标识映射
复制
# 异步会话 async def get_async_db(): async with session_factory() as session: async with session.begin(): yield session # 同步会话 def get_db(): with session_local() as session: yield session1.2.3.4.5.6.7.8.9.10.

不使用上下文管理器,也可以通过下面方式定义

复制
def get_db(): db = session_local() try: yield db finally: db.close()1.2.3.4.5.6.
基类定义及自动建表
复制
# 定义基类,所有其它数据实体都继承于它 Base = declarative_base() # 如下示例:用户表模型,除了__*__属性,其它通过mapper或Column定义的都对应表的字段 # class User(Base): # __tablename__ = users # __table_args__ = ({comment: 用户表}) # id: Mapped[int] = mapped_column(Integer, primary_key=True, comment=主键ID) # telephone: Mapped[str] = mapped_column(String(11), unique=True,comment="手机号码")1.2.3.4.5.6.7.8.9.

自动建表:

复制
def create_all_tables(): Base.metadata.create_all(bind=engine) # 由于main.py中的函数代码都是同步的,所以下面的函数基本上用到。 async def async_create_all_tables(): async with async_engine.begin() as conn: # 使用 run_sync 在异步上下文中执行同步操作 await conn.run_sync(Base.metadata.create_all)1.2.3.4.5.6.7.8.

在main.py中使用:

复制
from core import register_exception, database def create_app(): """启动项目""" ... database.create_all_tables()1.2.3.4.5.6.
高级应用

由于我们创建的数据表大部分u都有created_at、id、updated_at字段,我们可以自定义基类:

复制
# 定义一个公共模型 class BaseModel(Base): """ 公共 ORM 模型,基表 """ __abstract__ = True id: Mapped[int] = mapped_column(Integer, primary_key=True, comment=主键ID) created_at: Mapped[datetime] = mapped_column( DateTime, default=func.now(), comment=创建时间) updated_at: Mapped[datetime] = mapped_column( DateTime, default=func.now(), onupdate=func.now(), comment=更新时间 ) deleted_at: Mapped[datetime] = mapped_column( DateTime, nullable=True, comment=删除时间)1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.

这样上面的User模型就可以简化为下面的方式,去掉了ID主键的定义:

复制
# class User(BaseModel): # __tablename__ = users # __table_args__ = ({comment: 用户表}) # telephone: Mapped[str] = mapped_column(String(11), unique=True,comment="手机号码")1.2.3.4.
演示

在models下创建文件user.py,定义模型内容如下:

复制
# models/user.py from sqlalchemy import String from sqlalchemy.orm import Mapped, mapped_column from core.database import BaseModel, TableName from passlib.context import CryptContext pwd_context = CryptContext(schemes=[bcrypt], deprecated=auto) @TableName("users") class User(BaseModel): telephone: Mapped[str] = mapped_column(String(11), index=True, unique=True, comment="手机号码") password: Mapped[str] = mapped_column(String(128), comment="密码") username: Mapped[str] = mapped_column(String(50), index=True, nullable=False, comment="姓名") @staticmethod def get_password_hash(password: str) -> str: """ 生成哈希密码 :param password: 原始密码 :return: 哈希密码 """ return pwd_context.hash(password) @staticmethod def verify_password(password: str, hashed_password: str) -> bool: """ 验证原始密码是否与哈希密码一致 :param password: 原始密码 :param hashed_password: 哈希密码 :return: """ return pwd_context.verify(password, hashed_password)1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.

(1) 定义路由,实现增删改查操作

打开数据库连接工具比如navicat,然后创建数据库zadmin 启动服务。

(2) 异步操作

复制
@app.get("/curd/async") asyncdef async_curd( db: AsyncSession = Depends(get_async_db), ): """创建用户""" hashed_password = User.get_password_hash("123456") user = User(telephone="13800000090", password=hashed_password, username="admin") db.add(user) await db.commit() await db.refresh(user) print("add user {} {} {}".format(user.username, user.telephone, user.id)) """根据ID获取用户""" result = await db.execute(select(User).where(User.id == 3)) user = result.scalar() print(" scalar_one_or_none user {} {} {}".format(user.username, user.telephone, user.id)) """获取所有用户(分页)""" result = await db.execute(select(User).offset(0).limit(10)) print(result.scalars().all()) """更新用户信息""" stmt = update(User).where(User.id == 1).values({"telephone":"13522023423"}) result = await db.execute(stmt) print(result) result = await db.execute(select(User).where(User.telephone == "13522023423")) user = result.scalar_one_or_none() print(user) # 删除操作 stmt = delete(User).where(User.id == 3) result = await db.execute(stmt) await db.commit()1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.

同步操作:

复制
@app.get("/curd/sync") asyncdef sync_curd( db: Session = Depends(get_db), ): """同步创建用户""" hashed_password = User.get_password_hash("123456") user = User(telephone="13800000090", password=hashed_password, username="admin") db.add(user) db.commit() db.refresh(user) print("add user {} {} {}".format(user.username, user.telephone, user.id)) """单行查询""" user = db.query(User).filter(User.id == 3).first() print("user by id {} {} {}".format(user.username, user.telephone, user.id)) """更新操作""" user = db.query(User).filter(User.id == 3).first() user.telephone = "13522023423" db.query(User).filter(User.id == 3).update({"telephone":"13522023423"}) db.commit() db.refresh(user) user = db.query(User).filter(User.id == 1).first() db.delete(user) db.commit()1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.

阅读剩余
THE END