杰瑞科技汇

Python如何连接PostgreSQL?

安装 psycopg2

你需要安装 psycopg2 库,打开你的终端或命令提示符,运行以下命令:

Python如何连接PostgreSQL?-图1
(图片来源网络,侵删)
pip install psycopg2-binary

注意:

  • psycopg2-binary 是一个预编译的版本,非常适合开发和测试,安装非常简单。
  • 如果你在生产环境或有特殊需求(如性能优化),可以考虑安装 psycopg2 的源码版本 (pip install psycopg2),但这需要你的系统上安装了 C 编译器(如 GCC)和 PostgreSQL 的开发头文件。

基础连接与操作

下面是一个最简单的 Python 脚本,用于连接到 PostgreSQL 数据库,执行一个查询,并打印结果。

准备工作

假设你已经有一个 PostgreSQL 数据库正在运行,并且有一个数据库、一个用户和一个密码。

  • 数据库名: mydb
  • 用户名: myuser
  • 密码: mypassword
  • 主机: localhost (如果你数据库在同一台机器上)
  • 端口: 5432 (PostgreSQL 的默认端口)

示例代码 (basic_example.py)

import psycopg2
import os
# --- 方式一:直接在代码中硬编码连接信息 (不推荐用于生产环境) ---
# db_config = {
#     "host": "localhost",
#     "database": "mydb",
#     "user": "myuser",
#     "password": "mypassword",
#     "port": "5432"
# }
# --- 方式二:从环境变量中读取连接信息 (推荐) ---
# 这样更安全,避免密码泄露在代码中
db_config = {
    "host": os.getenv("DB_HOST", "localhost"),
    "database": os.getenv("DB_NAME", "mydb"),
    "user": os.getenv("DB_USER", "myuser"),
    "password": os.getenv("DB_PASSWORD", "mypassword"),
    "port": os.getenv("DB_PORT", "5432")
}
try:
    # 1. 建立连接
    # 使用 **db_config 将字典解包为关键字参数
    conn = psycopg2.connect(**db_config)
    # 2. 创建一个游标对象
    # 游标用于执行 SQL 语句并管理查询结果
    cursor = conn.cursor()
    # 3. 执行 SQL 查询
    # 假设我们有一个名为 'employees' 的表
    print("正在连接到数据库...")
    cursor.execute("SELECT version();")
    # 4. 获取并打印查询结果
    db_version = cursor.fetchone()
    print(f"PostgreSQL 数据库版本: {db_version[0]}")
    # 另一个查询示例
    print("\n正在查询员工信息...")
    cursor.execute("SELECT id, name, email FROM employees;")
    # fetchone() 获取下一行
    # row = cursor.fetchone()
    # print(f"第一行数据: {row}")
    # fetchall() 获取所有剩余的行
    all_employees = cursor.fetchall()
    print("所有员工信息:")
    for emp in all_employees:
        print(f"ID: {emp[0]}, 姓名: {emp[1]}, 邮箱: {emp[2]}")
except (Exception, psycopg2.Error) as error:
    # 5. 发生错误时打印错误信息
    print(f"数据库操作出错: {error}")
finally:
    # 6. 关闭游标和连接
    # 确保无论是否发生错误,资源都会被正确释放
    if 'conn' in locals() and conn is not None:
        if 'cursor' in locals() and cursor is not None:
            cursor.close()
        conn.close()
        print("\n数据库连接已关闭。")

如何运行:

Python如何连接PostgreSQL?-图2
(图片来源网络,侵删)
  1. 将上述代码保存为 basic_example.py
  2. 如果使用方式二,你需要先设置环境变量(例如在 Linux/macOS 中:export DB_PASSWORD="mypassword",在 Windows 中:set DB_PASSWORD="mypassword")。
  3. 在终端中运行:python basic_example.py

使用 with 语句(上下文管理器)

为了更安全、更方便地管理连接和游标,psycopg2 支持上下文管理器 (with 语句),这样可以确保资源(游标和连接)在使用完毕后自动关闭,即使发生错误也不例外。

示例代码 (with_example.py)

import psycopg2
import os
db_config = {
    "host": os.getenv("DB_HOST", "localhost"),
    "database": os.getenv("DB_NAME", "mydb"),
    "user": os.getenv("DB_USER", "myuser"),
    "password": os.getenv("DB_PASSWORD", "mypassword"),
    "port": os.getenv("DB_PORT", "5432")
}
try:
    # with 语句会自动处理连接的关闭
    with psycopg2.connect(**db_config) as conn:
        # with 语句会自动处理游标的关闭
        with conn.cursor() as cursor:
            print("数据库连接成功。")
            # 执行 SQL
            cursor.execute("SELECT id, name FROM employees WHERE id = %s;", (1,)) # 注意这里的参数化查询
            # 获取结果
            employee = cursor.fetchone()
            if employee:
                print(f"找到员工: ID={employee[0]}, Name={employee[1]}")
            else:
                print("未找到ID为1的员工。")
except (Exception, psycopg2.Error) as error:
    print(f"数据库操作出错: {error}")
# 不需要手动写 conn.close() 或 cursor.close()

参数化查询(防止 SQL 注入)

永远不要使用 Python 字符串拼接来构建 SQL 查询,这极易导致 SQL 注入攻击。psycopg2 提供了安全的参数化查询方式。

✅ 正确做法(使用 %s 占位符)

# 假设 user_id 是一个来自用户输入的变量
user_id = 1
# 使用 %s 作为占位符,然后将参数作为元组传递给第二个参数
# 元组中即使只有一个参数,末尾也必须有逗号,如 (user_id,)
sql_query = "SELECT * FROM users WHERE id = %s;"
cursor.execute(sql_query, (user_id,))

❌ 错误做法(易受 SQL 注入)

# 绝对不要这样做!
user_id = "1; DROP TABLE users; --"
sql_query = "SELECT * FROM users WHERE id = " + user_id + ";"
cursor.execute(sql_query) # 这将导致灾难性后果

插入、更新和删除数据

对于修改数据的操作(INSERT, UPDATE, DELETE),你需要提交事务才能使更改永久化。

示例代码 (crud_example.py)

import psycopg2
import os
db_config = {
    "host": os.getenv("DB_HOST", "localhost"),
    "database": os.getenv("DB_NAME", "mydb"),
    "user": os.getenv("DB_USER", "myuser"),
    "password": os.getenv("DB_PASSWORD", "mypassword"),
    "port": os.getenv("DB_PORT", "5432")
}
try:
    with psycopg2.connect(**db_config) as conn:
        with conn.cursor() as cursor:
            # --- 插入数据 ---
            insert_query = "INSERT INTO employees (name, email) VALUES (%s, %s) RETURNING id;"
            # RETURNING id 可以获取到刚插入记录的主键
            new_employee_name = "张三"
            new_employee_email = "zhangsan@example.com"
            cursor.execute(insert_query, (new_employee_name, new_employee_email))
            new_employee_id = cursor.fetchone()[0] # 获取返回的ID
            print(f"成功插入新员工,ID: {new_employee_id}")
            # --- 更新数据 ---
            update_query = "UPDATE employees SET email = %s WHERE id = %s;"
            updated_email = "zhangsan_new@example.com"
            cursor.execute(update_query, (updated_email, new_employee_id))
            print(f"成功更新ID为 {new_employee_id} 的员工邮箱。")
            # --- 删除数据 ---
            # delete_query = "DELETE FROM employees WHERE id = %s;"
            # cursor.execute(delete_query, (new_employee_id,))
            # print(f"成功删除ID为 {new_employee_id} 的员工。")
            # --- 提交事务 ---
            # 所有 DML 操作(INSERT, UPDATE, DELETE)都需要提交
            conn.commit() 
            print("事务已提交。")
except (Exception, psycopg2.Error) as error:
    # 发生错误时回滚事务
    if 'conn' in locals() and conn is not None:
        conn.rollback()
    print(f"数据库操作出错,已回滚: {error}")

高级用法:使用连接池 (psycopg2.pool)

在高并发应用中,频繁地创建和销毁数据库连接会带来巨大的性能开销,连接池可以复用已建立的连接,显著提高性能。

Python如何连接PostgreSQL?-图3
(图片来源网络,侵删)

示例代码 (pool_example.py)

import psycopg2
import psycopg2.pool
import os
db_config = {
    "host": os.getenv("DB_HOST", "localhost"),
    "database": os.getenv("DB_NAME", "mydb"),
    "user": os.getenv("DB_USER", "myuser"),
    "password": os.getenv("DB_PASSWORD", "mypassword"),
    "port": os.getenv("DB_PORT", "5432")
}
# 创建一个最小为1,最大为5的连接池
try:
    connection_pool = psycopg2.pool.SimpleConnectionPool(
        minconn=1,
        maxconn=5,
        **db_config
    )
    print("连接池创建成功。")
    # 从连接池中获取一个连接
    conn = connection_pool.getconn()
    print("从连接池中获取了一个连接。")
    with conn:
        with conn.cursor() as cursor:
            cursor.execute("SELECT * FROM employees;")
            records = cursor.fetchall()
            print(f"从连接池中查询到 {len(records)} 条记录。")
            # 模拟一些工作
            # ...
    # 将连接归还给连接池
    connection_pool.putconn(conn)
    print("连接已归还给连接池。")
except (Exception, psycopg2.Error) as error:
    print(f"连接池操作出错: {error}")
finally:
    # 关闭所有连接池中的连接
    if 'connection_pool' in locals():
        connection_pool.closeall()
        print("所有连接池连接已关闭。")

使用 ORM:SQLAlchemy

如果你更喜欢面向对象的方式来操作数据库,可以使用 ORM(Object-Relational Mapping)框架,最著名的就是 SQLAlchemy,它底层依然可以使用 psycopg2 作为驱动。

安装 SQLAlchemy

pip install sqlalchemy

示例代码 (sqlalchemy_example.py)

import os
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# --- 1. 定义模型 (Python 类) ---
Base = declarative_base()
class Employee(Base):
    __tablename__ = 'employees'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String)
    def __repr__(self):
        return f"<Employee(id={self.id}, name='{self.name}', email='{self.email}')>"
# --- 2. 创建数据库引擎 ---
# 'postgresql+psycopg2://' 是指定使用 psycopg2 驱动
db_url = "postgresql+psycopg2://{}:{}@{}/{}".format(
    os.getenv("DB_USER", "myuser"),
    os.getenv("DB_PASSWORD", "mypassword"),
    os.getenv("DB_HOST", "localhost"),
    os.getenv("DB_NAME", "mydb")
)
engine = create_engine(db_url)
# --- 3. 创建表 (如果不存在) ---
Base.metadata.create_all(engine)
# --- 4. 创建 Session ---
Session = sessionmaker(bind=engine)
session = Session()
# --- 5. 使用 ORM 进行增删改查 ---
try:
    # 增
    new_emp = Employee(name='李四', email='lisi@example.com')
    session.add(new_emp)
    session.commit()
    print(f"添加新员工: {new_emp}")
    # 查
    employees = session.query(Employee).all()
    print("\n所有员工:")
    for emp in employees:
        print(emp)
    # 改
    employee_to_update = session.query(Employee).filter_by(name='李四').first()
    if employee_to_update:
        employee_to_update.email = 'lisi_updated@example.com'
        session.commit()
        print(f"\n更新员工: {employee_to_update}")
    # 删
    # employee_to_delete = session.query(Employee).filter_by(name='李四').first()
    # if employee_to_delete:
    #     session.delete(employee_to_delete)
    #     session.commit()
    #     print(f"\n删除员工: {employee_to_delete}")
except Exception as e:
    session.rollback()
    print(f"ORM 操作出错: {e}")
finally:
    # 关闭 Session
    session.close()
    print("\nSession 已关闭。")
场景 推荐方法 优点
简单脚本、快速原型 psycopg2 + with 语句 轻量级,代码直观,无需额外概念。
Web应用、高并发 psycopg2.pool 性能高,避免频繁创建销毁连接。
复杂应用、大型项目 SQLAlchemy ORM 代码更结构化、可维护,面向对象,减少SQL书写。

希望这份详细的指南能帮助你顺利地在 Python 中连接和使用 PostgreSQL!

分享:
扫描分享到社交APP
上一篇
下一篇