我们将分为几个部分来讲解,从基础到进阶,并包含大量示例代码。

SQLAlchemy 的两种主要风格
在开始查询之前,你需要了解 SQLAlchemy 的两种核心使用方式:
- SQLAlchemy Core (声明式风格):这是 SQLAlchemy 的核心,它提供了一种与数据库无关的、基于 Python 表达式的 SQL 构建方式,你操作的是 Python 对象(如
Table,Column),而不是手写字符串 SQL,这种方式更灵活,也更安全(能有效防止 SQL 注入)。 - SQLAlchemy ORM (对象关系映射):这是在 Core 之上构建的一层,它允许你将数据库表映射为 Python 类(模型),将行映射为类的实例(对象),这使得你可以用面向对象的方式与数据库交互,代码更符合 Pythonic 风格,也是最常见的用法。
本教程将主要围绕 SQLAlchemy ORM 展开,因为它在日常开发中使用更广泛,最后会简要介绍 Core 的查询方式。
环境准备与模型定义
你需要安装 SQLAlchemy 并创建一个数据库连接。
安装:

pip install sqlalchemy
模型定义
我们以一个简单的 User 和 Address 表为例。
import datetime
from sqlalchemy import (
create_engine,
Column,
Integer,
String,
ForeignKey,
DateTime,
)
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
# 1. 创建数据库引擎
# 'sqlite:///test.db' 表示使用 SQLite 数据库,文件名为 test.db
# 如果是 PostgreSQL: 'postgresql://user:password@host:port/dbname'
# 如果是 MySQL: 'mysql://user:password@host:port/dbname'
engine = create_engine("sqlite:///test.db", echo=False) # echo=True 会打印生成的 SQL
# 2. 创建基类
Base = declarative_base()
# 3. 定义模型 (Python 类映射到数据库表)
class User(Base):
__tablename__ = 'users' # 指定表名
id = Column(Integer, primary_key=True)
name = Column(String)
email = Column(String, unique=True)
created_at = Column(DateTime, default=datetime.datetime.utcnow)
# 定义关系
# 一个 User 可以有多个 Address
# back_populates="user" 表示 Address 模型中也有一个名为 "user" 的关系属性,用于反向查询
addresses = relationship("Address", back_populates="user", cascade="all, delete-orphan")
def __repr__(self):
return f"<User(id={self.id}, name='{self.name}', email='{self.email}')>"
class Address(Base):
__tablename__ = 'addresses'
id = Column(Integer, primary_key=True)
email_address = Column(String, nullable=False)
user_id = Column(Integer, ForeignKey('users.id'))
# 定义关系
# back_populates="addresses" 表示 User 模型中也有一个名为 "addresses" 的关系属性
user = relationship("User", back_populates="addresses")
def __repr__(self):
return f"<Address(id={self.id}, email_address='{self.email_address}')>"
# 4. 创建表(如果不存在)
# 在实际应用中,你可能使用 Alembic 等工具来管理数据库迁移
Base.metadata.create_all(engine)
# 5. 创建 Session
# Session 是与数据库交互的主要接口
Session = sessionmaker(bind=engine)
session = Session()
基础查询操作
1 创建和添加数据 (INSERT)
在查询之前,我们先添加一些测试数据。
# 创建新用户
new_user = User(name="Alice", email="alice@example.com")
session.add(new_user)
# 创建多个新用户
session.add_all([
User(name="Bob", email="bob@example.com"),
User(name="Charlie", email="charlie@example.com")
])
# 提交事务,将更改保存到数据库
session.commit()
2 查询所有数据 (SELECT * FROM ...)
使用 session.query() 来构建查询。
# 查询 User 表中的所有用户
all_users = session.query(User).all()
for user in all_users:
print(user)
# 输出:
# <User(id=1, name='Alice', email='alice@example.com')>
# <User(id=2, name='Bob', email='bob@example.com')>
# <User(id=3, name='Charlie', email='charlie@example.com')>
3 查询特定列
如果你只需要某些列,而不是整个对象,可以在 query() 中指定。

# 只查询 User 的 name 和 email 列
# 查询结果是一个元组列表
user_names_emails = session.query(User.name, User.email).all()
for name, email in user_names_emails:
print(f"Name: {name}, Email: {email}")
# 输出:
# Name: Alice, Email: alice@example.com
# Name: Bob, Email: bob@example.com
# Name: Charlie, Email: charlie@example.com
4 过滤查询 (WHERE)
使用 filter() 方法来添加 WHERE 条件。
# 查询名字为 'Alice' 的用户
alice = session.query(User).filter_by(name="Alice").first()
print(alice)
# 输出: <User(id=1, name='Alice', email='alice@example.com')>
# 查询邮箱以 'example.com' 结尾的用户
users_from_example = session.query(User).filter(User.email.like('%example.com')).all()
for user in users_from_example:
print(user)
# 多个条件可以使用 and_(), or_()
from sqlalchemy import and_, or_
# 查询名字为 'Bob' 或邮箱为 'charlie@example.com' 的用户
special_users = session.query(User).filter(
or_(
User.name == 'Bob',
User.email == 'charlie@example.com'
)
).all()
filter() vs filter_by()
filter_by(): 使用关键字参数,语法更简洁,但功能有限(不支持like,in_等复杂操作)。filter(): 使用标准的 Python 表达式,功能更强大,是推荐的方式。
5 限制结果数量 (LIMIT)
使用 limit() 方法。
# 只查询前两个用户 first_two_users = session.query(User).limit(2).all() print(first_two_users) # 输出: [<User ...>, <User ...>]
6 排序 (ORDER BY)
使用 order_by() 方法。
# 按名字升序排列 users_sorted_by_name_asc = session.query(User).order_by(User.name.asc()).all() # 按创建时间降序排列 users_sorted_by_created_at_desc = session.query(User).order_by(User.created_at.desc()).all()
7 获取单个结果
first(): 获取查询结果中的第一个元素,如果没有则返回None。one(): 获取唯一的结果,如果结果数量不是1,会抛出异常。one_or_none(): 获取唯一的结果,如果没有则返回None,如果结果超过一个,抛出异常。
# 获取第一个用户
first_user = session.query(User).first()
# 获取ID为1的用户(确保存在)
user_with_id_1 = session.query(User).filter(User.id == 1).one()
# 尝试获取一个可能不存在的用户
user_not_found = session.query(User).filter(User.name == "NonExistentUser").one_or_none()
if user_not_found is None:
print("用户不存在")
高级查询操作
1 关系查询
这是 ORM 的强大之处。
# 1. 为 Alice 添加一个地址
alice = session.query(User).filter_by(name="Alice").one()
alice.addresses.append(Address(email_address="alice_home@example.com"))
session.commit()
# 2. 查询 Alice 的所有地址
# 通过关系属性 addresses 进行查询
alice_addresses = session.query(User).filter_by(name="Alice").one().addresses
print("Alice's addresses:")
for addr in alice_addresses:
print(addr)
# 输出:
# <Address(id=1, email_address='alice_home@example.com')>
# 3. 通过 Address 反向查询到 User
# 通过关系属性 user 进行查询
address = session.query(Address).filter_by(email_address="alice_home@example.com").one()
print("Address belongs to:", address.user)
# 输出: Address belongs to: <User(id=1, name='Alice', email='alice@example.com')>
2 连接查询 (JOIN)
使用 join() 方法来连接多个表。
# 内连接:查询所有有地址的用户及其地址 # SQLAlchemy 会自动根据外键关系判断如何连接 users_with
