FastAPI 实战秘籍:从零构建高性能 API -数据库篇
前面我们讲了Fastapi日志、配置管理,在web开发中主要是针对数据库的增删改查,今天主要讲一下通过sqlalchemy连接数据库,数据库会话,数据库模型的创建和自动建表等。
《FastAPI实战秘籍:从零构建高性能API-日志篇》《FastAPI实战秘籍:从零构建高性能API-配置篇》对象关系映射操作库:
复制
pip install sqlalchemy1.
数据库连接配置,在config.yaml中增加如下如下配置:
复制
# config.yaml
database:
url: "mysql+pymysql://root:123456@127.0.0.1:3306/zadmin"
async_url: "mysql+asyncmy://root:123456@127.0.0.1:3306/zadmin"
pool_size: 20
echo_sql: false # 是否打印SQL日志1.2.3.4.5.6.
需要安装pymysql和asyncmy库,处理同步及异步请求:
复制
pip install pymysql asyncmy1.
在前面讲解配置中我们设置了配置加载及定义:
复制
# config.py
class DatabaseConfig(BaseModel):
url: str
async_url: str
pool_size: int = 10
echo_sql: bool = False
class Settings(BaseSettings):
...
database: DatabaseConfig
...1.2.3.4.5.6.7.8.9.10.11.12.
在core下创建database.py文件。使用sqlalchemy连接数据库,数据源的定义如下:
复制
database_url dialect+driver://username:password@host:port/database
# 数据库连接配置
# MySQL示例: mysql+pymysql://username:password@localhost/dbname
# PostgreSQL示例: postgresql+psycopg2://username:password@localhost/dbname1.2.3.4.5.
(1) 创建数据库引擎(Engine)
复制
# core/database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import create_async_engine
from config.config import get_settings
settings = get_settings()
# 异步引擎
async_engine = create_async_engine(
url = settings.database.async_url,
echo=settings.database.echo_sql,
pool_size=settings.database.pool_size
)
# 同步引擎
engine = create_engine(
url = settings.database.url,
echo=settings.database.echo_sql,
pool_size=settings.database.pool_size
)1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.
(2) 创建会话工厂(Session Factory)
「会话工厂」是一个用于创建会话实例的工厂函数或类。它预先配置了会话的各种参数,但不会立即创建数据库连接。
复制
# core/database.py
# 同步会话工厂
session_local = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine)
# 异步会话工厂
session_factory = async_sessionmaker(
autocommit=False,
autoflush=False,
bind=async_engine,
expire_on_commit=True,
class_=AsyncSession
)1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.
(3) 创建会话(Session)
「会话」是SQLAlchemy ORM的核心,它代表了一个与数据库的"对话",负责:
管理对象状态执行数据库操作处理事务维护对象标识映射复制
# 异步会话
async def get_async_db():
async with session_factory() as session:
async with session.begin():
yield session
# 同步会话
def get_db():
with session_local() as session:
yield session1.2.3.4.5.6.7.8.9.10.
不使用上下文管理器,也可以通过下面方式定义
复制
def get_db():
db = session_local()
try:
yield db
finally:
db.close()1.2.3.4.5.6.
复制
# 定义基类,所有其它数据实体都继承于它
Base = declarative_base()
# 如下示例:用户表模型,除了__*__属性,其它通过mapper或Column定义的都对应表的字段
# class User(Base):
# __tablename__ = users
# __table_args__ = ({comment: 用户表})
# id: Mapped[int] = mapped_column(Integer, primary_key=True, comment=主键ID)
# telephone: Mapped[str] = mapped_column(String(11), unique=True,comment="手机号码")1.2.3.4.5.6.7.8.9.
自动建表:
复制
def create_all_tables():
Base.metadata.create_all(bind=engine)
# 由于main.py中的函数代码都是同步的,所以下面的函数基本上用到。
async def async_create_all_tables():
async with async_engine.begin() as conn:
# 使用 run_sync 在异步上下文中执行同步操作
await conn.run_sync(Base.metadata.create_all)1.2.3.4.5.6.7.8.
在main.py中使用:
复制
from core import register_exception, database
def create_app():
"""启动项目"""
...
database.create_all_tables()1.2.3.4.5.6.
由于我们创建的数据表大部分u都有created_at、id、updated_at字段,我们可以自定义基类:
复制
# 定义一个公共模型
class BaseModel(Base):
"""
公共 ORM 模型,基表
"""
__abstract__ = True
id: Mapped[int] = mapped_column(Integer, primary_key=True, comment=主键ID)
created_at: Mapped[datetime] = mapped_column(
DateTime,
default=func.now(),
comment=创建时间)
updated_at: Mapped[datetime] = mapped_column(
DateTime,
default=func.now(),
onupdate=func.now(),
comment=更新时间
)
deleted_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=True,
comment=删除时间)1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.
这样上面的User模型就可以简化为下面的方式,去掉了ID主键的定义:
复制
# class User(BaseModel):
# __tablename__ = users
# __table_args__ = ({comment: 用户表})
# telephone: Mapped[str] = mapped_column(String(11), unique=True,comment="手机号码")1.2.3.4.
在models下创建文件user.py,定义模型内容如下:
复制
# models/user.py
from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column
from core.database import BaseModel, TableName
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=[bcrypt], deprecated=auto)
@TableName("users")
class User(BaseModel):
telephone: Mapped[str] = mapped_column(String(11), index=True, unique=True, comment="手机号码")
password: Mapped[str] = mapped_column(String(128), comment="密码")
username: Mapped[str] = mapped_column(String(50), index=True, nullable=False, comment="姓名")
@staticmethod
def get_password_hash(password: str) -> str:
"""
生成哈希密码
:param password: 原始密码
:return: 哈希密码
"""
return pwd_context.hash(password)
@staticmethod
def verify_password(password: str, hashed_password: str) -> bool:
"""
验证原始密码是否与哈希密码一致
:param password: 原始密码
:param hashed_password: 哈希密码
:return:
"""
return pwd_context.verify(password, hashed_password)1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.
(1) 定义路由,实现增删改查操作
打开数据库连接工具比如navicat,然后创建数据库zadmin 启动服务。
(2) 异步操作
复制
@app.get("/curd/async")
asyncdef async_curd(
db: AsyncSession = Depends(get_async_db),
):
"""创建用户"""
hashed_password = User.get_password_hash("123456")
user = User(telephone="13800000090", password=hashed_password, username="admin")
db.add(user)
await db.commit()
await db.refresh(user)
print("add user {} {} {}".format(user.username, user.telephone, user.id))
"""根据ID获取用户"""
result = await db.execute(select(User).where(User.id == 3))
user = result.scalar()
print(" scalar_one_or_none user {} {} {}".format(user.username, user.telephone, user.id))
"""获取所有用户(分页)"""
result = await db.execute(select(User).offset(0).limit(10))
print(result.scalars().all())
"""更新用户信息"""
stmt = update(User).where(User.id == 1).values({"telephone":"13522023423"})
result = await db.execute(stmt)
print(result)
result = await db.execute(select(User).where(User.telephone == "13522023423"))
user = result.scalar_one_or_none()
print(user)
# 删除操作
stmt = delete(User).where(User.id == 3)
result = await db.execute(stmt)
await db.commit()1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.
同步操作:
复制
@app.get("/curd/sync")
asyncdef sync_curd(
db: Session = Depends(get_db),
):
"""同步创建用户"""
hashed_password = User.get_password_hash("123456")
user = User(telephone="13800000090", password=hashed_password, username="admin")
db.add(user)
db.commit()
db.refresh(user)
print("add user {} {} {}".format(user.username, user.telephone, user.id))
"""单行查询"""
user = db.query(User).filter(User.id == 3).first()
print("user by id {} {} {}".format(user.username, user.telephone, user.id))
"""更新操作"""
user = db.query(User).filter(User.id == 3).first()
user.telephone = "13522023423"
db.query(User).filter(User.id == 3).update({"telephone":"13522023423"})
db.commit()
db.refresh(user)
user = db.query(User).filter(User.id == 1).first()
db.delete(user)
db.commit()1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.
阅读剩余
THE END