杰瑞科技汇

Python SQLAlchemy查询怎么写?

我们将分为几个部分来讲解,从基础到进阶,并包含大量示例代码。

Python SQLAlchemy查询怎么写?-图1
(图片来源网络,侵删)

SQLAlchemy 的两种主要风格

在开始查询之前,你需要了解 SQLAlchemy 的两种核心使用方式:

  1. SQLAlchemy Core (声明式风格):这是 SQLAlchemy 的核心,它提供了一种与数据库无关的、基于 Python 表达式的 SQL 构建方式,你操作的是 Python 对象(如 Table, Column),而不是手写字符串 SQL,这种方式更灵活,也更安全(能有效防止 SQL 注入)。
  2. SQLAlchemy ORM (对象关系映射):这是在 Core 之上构建的一层,它允许你将数据库表映射为 Python 类(模型),将行映射为类的实例(对象),这使得你可以用面向对象的方式与数据库交互,代码更符合 Pythonic 风格,也是最常见的用法。

本教程将主要围绕 SQLAlchemy ORM 展开,因为它在日常开发中使用更广泛,最后会简要介绍 Core 的查询方式。


环境准备与模型定义

你需要安装 SQLAlchemy 并创建一个数据库连接。

安装:

Python SQLAlchemy查询怎么写?-图2
(图片来源网络,侵删)
pip install sqlalchemy

模型定义 我们以一个简单的 UserAddress 表为例。

import datetime
from sqlalchemy import (
    create_engine,
    Column,
    Integer,
    String,
    ForeignKey,
    DateTime,
)
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
# 1. 创建数据库引擎
# 'sqlite:///test.db' 表示使用 SQLite 数据库,文件名为 test.db
# 如果是 PostgreSQL: 'postgresql://user:password@host:port/dbname'
# 如果是 MySQL: 'mysql://user:password@host:port/dbname'
engine = create_engine("sqlite:///test.db", echo=False)  # echo=True 会打印生成的 SQL
# 2. 创建基类
Base = declarative_base()
# 3. 定义模型 (Python 类映射到数据库表)
class User(Base):
    __tablename__ = 'users'  # 指定表名
    id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String, unique=True)
    created_at = Column(DateTime, default=datetime.datetime.utcnow)
    # 定义关系
    # 一个 User 可以有多个 Address
    # back_populates="user" 表示 Address 模型中也有一个名为 "user" 的关系属性,用于反向查询
    addresses = relationship("Address", back_populates="user", cascade="all, delete-orphan")
    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}', email='{self.email}')>"
class Address(Base):
    __tablename__ = 'addresses'
    id = Column(Integer, primary_key=True)
    email_address = Column(String, nullable=False)
    user_id = Column(Integer, ForeignKey('users.id'))
    # 定义关系
    # back_populates="addresses" 表示 User 模型中也有一个名为 "addresses" 的关系属性
    user = relationship("User", back_populates="addresses")
    def __repr__(self):
        return f"<Address(id={self.id}, email_address='{self.email_address}')>"
# 4. 创建表(如果不存在)
# 在实际应用中,你可能使用 Alembic 等工具来管理数据库迁移
Base.metadata.create_all(engine)
# 5. 创建 Session
# Session 是与数据库交互的主要接口
Session = sessionmaker(bind=engine)
session = Session()

基础查询操作

1 创建和添加数据 (INSERT)

在查询之前,我们先添加一些测试数据。

# 创建新用户
new_user = User(name="Alice", email="alice@example.com")
session.add(new_user)
# 创建多个新用户
session.add_all([
    User(name="Bob", email="bob@example.com"),
    User(name="Charlie", email="charlie@example.com")
])
# 提交事务,将更改保存到数据库
session.commit()

2 查询所有数据 (SELECT * FROM ...)

使用 session.query() 来构建查询。

# 查询 User 表中的所有用户
all_users = session.query(User).all()
for user in all_users:
    print(user)
# 输出:
# <User(id=1, name='Alice', email='alice@example.com')>
# <User(id=2, name='Bob', email='bob@example.com')>
# <User(id=3, name='Charlie', email='charlie@example.com')>

3 查询特定列

如果你只需要某些列,而不是整个对象,可以在 query() 中指定。

Python SQLAlchemy查询怎么写?-图3
(图片来源网络,侵删)
# 只查询 User 的 name 和 email 列
# 查询结果是一个元组列表
user_names_emails = session.query(User.name, User.email).all()
for name, email in user_names_emails:
    print(f"Name: {name}, Email: {email}")
# 输出:
# Name: Alice, Email: alice@example.com
# Name: Bob, Email: bob@example.com
# Name: Charlie, Email: charlie@example.com

4 过滤查询 (WHERE)

使用 filter() 方法来添加 WHERE 条件。

# 查询名字为 'Alice' 的用户
alice = session.query(User).filter_by(name="Alice").first()
print(alice)
# 输出: <User(id=1, name='Alice', email='alice@example.com')>
# 查询邮箱以 'example.com' 结尾的用户
users_from_example = session.query(User).filter(User.email.like('%example.com')).all()
for user in users_from_example:
    print(user)
# 多个条件可以使用 and_(), or_()
from sqlalchemy import and_, or_
# 查询名字为 'Bob' 或邮箱为 'charlie@example.com' 的用户
special_users = session.query(User).filter(
    or_(
        User.name == 'Bob',
        User.email == 'charlie@example.com'
    )
).all()

filter() vs filter_by()

  • filter_by(): 使用关键字参数,语法更简洁,但功能有限(不支持 like, in_ 等复杂操作)。
  • filter(): 使用标准的 Python 表达式,功能更强大,是推荐的方式。

5 限制结果数量 (LIMIT)

使用 limit() 方法。

# 只查询前两个用户
first_two_users = session.query(User).limit(2).all()
print(first_two_users)
# 输出: [<User ...>, <User ...>]

6 排序 (ORDER BY)

使用 order_by() 方法。

# 按名字升序排列
users_sorted_by_name_asc = session.query(User).order_by(User.name.asc()).all()
# 按创建时间降序排列
users_sorted_by_created_at_desc = session.query(User).order_by(User.created_at.desc()).all()

7 获取单个结果

  • first(): 获取查询结果中的第一个元素,如果没有则返回 None
  • one(): 获取唯一的结果,如果结果数量不是1,会抛出异常。
  • one_or_none(): 获取唯一的结果,如果没有则返回 None,如果结果超过一个,抛出异常。
# 获取第一个用户
first_user = session.query(User).first()
# 获取ID为1的用户(确保存在)
user_with_id_1 = session.query(User).filter(User.id == 1).one()
# 尝试获取一个可能不存在的用户
user_not_found = session.query(User).filter(User.name == "NonExistentUser").one_or_none()
if user_not_found is None:
    print("用户不存在")

高级查询操作

1 关系查询

这是 ORM 的强大之处。

# 1. 为 Alice 添加一个地址
alice = session.query(User).filter_by(name="Alice").one()
alice.addresses.append(Address(email_address="alice_home@example.com"))
session.commit()
# 2. 查询 Alice 的所有地址
# 通过关系属性 addresses 进行查询
alice_addresses = session.query(User).filter_by(name="Alice").one().addresses
print("Alice's addresses:")
for addr in alice_addresses:
    print(addr)
# 输出:
# <Address(id=1, email_address='alice_home@example.com')>
# 3. 通过 Address 反向查询到 User
# 通过关系属性 user 进行查询
address = session.query(Address).filter_by(email_address="alice_home@example.com").one()
print("Address belongs to:", address.user)
# 输出: Address belongs to: <User(id=1, name='Alice', email='alice@example.com')>

2 连接查询 (JOIN)

使用 join() 方法来连接多个表。

# 内连接:查询所有有地址的用户及其地址
# SQLAlchemy 会自动根据外键关系判断如何连接
users_with
分享:
扫描分享到社交APP
上一篇
下一篇