杰瑞科技汇

Python如何连接PostgreSQL数据库?

下面我将为你提供一个从零开始的详细指南,包括安装、连接、基本操作(增删改查)、处理事务以及使用更高级的 psycopg2.pool 连接池。

Python如何连接PostgreSQL数据库?-图1
(图片来源网络,侵删)

第一步:安装必要的库

你需要安装 psycopg2 库,在你的终端或命令行中运行:

pip install psycopg2-binary

psycopg2 vs psycopg2-binary

  • psycopg2: 这是一个纯 Python 包,需要你预先安装好 PostgreSQL 的开发头文件(如 libpq-devpostgresql-devel),如果你是从源码编译安装 PostgreSQL,或者是在服务器环境中,这个版本更合适。
  • psycopg2-binary: 这是一个预编译的二进制包,包含了所有需要的依赖,安装非常简单,适合大多数开发者和快速原型开发。对于初学者,推荐直接使用 psycopg2-binary

第二步:连接到 PostgreSQL 数据库

连接数据库是所有操作的第一步,你需要提供数据库的主机、端口、数据库名、用户名和密码。

示例代码:

Python如何连接PostgreSQL数据库?-图2
(图片来源网络,侵删)
import psycopg2
import psycopg2.extras # 使用 DictCursor,让查询结果以字典形式返回,更方便
# --- 1. 定义数据库连接信息 ---
# 建议将敏感信息(如密码)放在环境变量或配置文件中,而不是硬编码在代码里
DB_HOST = "localhost"
DB_PORT = "5432"
DB_NAME = "testdb"
DB_USER = "postgres"
DB_PASSWORD = "your_password"
try:
    # --- 2. 建立连接 ---
    # 使用 with 语句可以确保连接在使用完毕后被正确关闭
    conn = psycopg2.connect(
        host=DB_HOST,
        port=DB_PORT,
        database=DB_NAME,
        user=DB_USER,
        password=DB_PASSWORD
    )
    # --- 3. 创建一个游标对象 ---
    # 游标用于执行 SQL 语句并获取结果
    # 使用 DictCursor 可以让查询结果像字典一样访问,row['column_name']
    cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
    print("成功连接到 PostgreSQL 数据库!")
    # --- 4. 执行一个简单的查询 ---
    cur.execute("SELECT version();")
    db_version = cur.fetchone()  # 获取一行结果
    print(f"PostgreSQL 数据库版本: {db_version[0]}")
except (Exception, psycopg2.Error) as error:
    print(f"连接数据库时出错: {error}")
finally:
    # --- 5. 关闭游标和连接 ---
    if 'cur' in locals():
        cur.close()
    if 'conn' in locals() and conn is not None:
        conn.close()
        print("数据库连接已关闭。")

第三步:基本数据库操作(CRUD)

CRUD 代表 Create(创建)、Read(读取)、Update(更新)和 Delete(删除)。

创建表 和 插入数据

import psycopg2
# (使用上面的连接信息...)
try:
    conn = psycopg2.connect(...) # 连接数据库
    cur = conn.cursor()
    # --- 创建表 ---
    # 使用 IF NOT EXISTS 避免重复创建时出错
    create_table_query = """
    CREATE TABLE IF NOT EXISTS employees (
        id SERIAL PRIMARY KEY,
        name VARCHAR(50) NOT NULL,
        age INT,
        department VARCHAR(50)
    );
    """
    cur.execute(create_table_query)
    print("表 'employees' 创建成功或已存在。")
    # --- 插入单条数据 ---
    insert_query = "INSERT INTO employees (name, age, department) VALUES (%s, %s, %s);"
    # 注意:数据必须以元组或列表的形式传递,即使只有一个值也要加逗号,如 ('John', 30, 'IT')
    employee_data = ('John Doe', 30, 'IT')
    cur.execute(insert_query, employee_data)
    # --- 插入多条数据 ---
    employees_data = [
        ('Jane Smith', 25, 'HR'),
        ('Peter Jones', 45, 'Finance')
    ]
    # executemany 用于批量插入,效率更高
    cur.executemany(insert_query, employees_data)
    # --- 提交事务 ---
    # 在 PostgreSQL 中,DML 操作(如 INSERT, UPDATE, DELETE)需要手动提交才能生效
    conn.commit()
    print(f"成功插入了 {cur.rowcount} 条数据。")
except (Exception, psycopg2.Error) as error:
    print(f"操作数据库时出错: {error}")
    # 如果出错,回滚事务
    if 'conn' in locals():
        conn.rollback()
finally:
    if 'cur' in locals():
        cur.close()
    if 'conn' in locals() and conn is not None:
        conn.close()

重要提示:SQL 注入防护 永远不要使用 Python 的字符串格式化(如 f"INSERT ... VALUES({name})")来拼接 SQL 语句,这会导致严重的 SQL 注入漏洞。始终使用 %s 作为占位符,并将数据作为第二个参数传递给 executeexecutemanypsycopg2 会为你安全地处理这些数据。

查询数据

# (连接代码...)
try:
    conn = psycopg2.connect(...)
    cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) # 使用 DictCursor 更方便
    # --- 查询所有员工 ---
    cur.execute("SELECT * FROM employees;")
    all_employees = cur.fetchall() # 获取所有结果
    print("\n所有员工信息:")
    for emp in all_employees:
        # 使用 DictCursor 后,可以通过列名访问
        print(f"ID: {emp['id']}, Name: {emp['name']}, Age: {emp['age']}, Dept: {emp['department']}")
    # --- 查询特定部门的员工 ---
    dept = 'IT'
    cur.execute("SELECT * FROM employees WHERE department = %s;", (dept,)) # 注意元组后的逗号
    it_employees = cur.fetchall()
    print(f"\n{dept} 部门的员工:")
    for emp in it_employees:
        print(f"ID: {emp['id']}, Name: {emp['name']}")
except (Exception, psycopg2.Error) as error:
    print(f"查询数据时出错: {error}")
finally:
    # (关闭代码...)

更新数据

# (连接代码...)
try:
    conn = psycopg2.connect(...)
    cur = conn.cursor()
    employee_id_to_update = 1
    new_age = 31
    # 更新 ID 为 1 的员工的年龄
    update_query = "UPDATE employees SET age = %s WHERE id = %s;"
    cur.execute(update_query, (new_age, employee_id_to_update))
    conn.commit()
    print(f"成功更新 ID 为 {employee_id_to_update} 的员工的年龄为 {new_age}。")
except (Exception, psycopg2.Error) as error:
    print(f"更新数据时出错: {error}")
    if 'conn' in locals():
        conn.rollback()
finally:
    # (关闭代码...)

删除数据

# (连接代码...)
try:
    conn = psycopg2.connect(...)
    cur = conn.cursor()
    employee_id_to_delete = 2
    # 删除 ID 为 2 的员工
    delete_query = "DELETE FROM employees WHERE id = %s;"
    cur.execute(delete_query, (employee_id_to_delete,))
    conn.commit()
    print(f"成功删除 ID 为 {employee_id_to_delete} 的员工。")
except (Exception, psycopg2.Error) as error:
    print(f"删除数据时出错: {error}")
    if 'conn' in locals():
        conn.rollback()
finally:
    # (关闭代码...)

第四步:使用连接池(高级用法)

在高并发应用中,频繁地创建和销毁数据库连接会带来很大的性能开销,连接池(Connection Pooling)可以复用已建立的连接,显著提高性能。

psycopg2 自带了 psycopg2.pool 模块。

示例代码:

import psycopg2
from psycopg2 import pool
# --- 1. 创建一个连接池 ---
# 最小连接数: 2, 最大连接数: 5
try:
    connection_pool = psycopg2.pool.ThreadedConnectionPool(
        minconn=2,
        maxconn=5,
        host="localhost",
        database="testdb",
        user="postgres",
        password="your_password"
    )
    print("连接池创建成功!")
except (Exception, psycopg2.Error) as error:
    print(f"创建连接池时出错: {error}")
    exit(1)
# --- 2. 从连接池中获取连接 ---
def get_employee(employee_id):
    conn = None
    try:
        # 从池中获取一个连接
        conn = connection_pool.getconn()
        cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
        cur.execute("SELECT * FROM employees WHERE id = %s;", (employee_id,))
        employee = cur.fetchone()
        return employee
    except (Exception, psycopg2.Error) as error:
        print(f"获取员工 {employee_id} 时出错: {error}")
        return None
    finally:
        # 将连接放回池中,而不是关闭它!
        if conn:
            connection_pool.putconn(conn)
# --- 3. 使用连接池 ---
emp = get_employee(1)
if emp:
    print(f"从连接池获取到的员工: {emp}")
# --- 4. 关闭连接池 ---
# 当你的应用程序关闭时,记得关闭连接池
if 'connection_pool' in locals():
    connection_pool.closeall()
    print("连接池已关闭。")

总结与最佳实践

  1. 使用 psycopg2-binary:对于大多数开发者来说,这是最简单快捷的选择。
  2. 使用 with 语句:确保连接和游标在使用完毕后被正确关闭,即使在发生异常时也能执行清理操作。
  3. 始终使用参数化查询 (%s):这是防止 SQL 注入的黄金法则。
  4. 显式提交 (conn.commit()):不要忘记在执行完 DML 操作后提交事务。
  5. 处理异常和回滚 (try...except...rollback):将数据库操作放在 try...except 块中,并在出错时调用 conn.rollback() 来撤销未提交的更改。
  6. 使用 DictCursor:让查询结果以字典形式返回,代码可读性更强。
  7. 考虑使用连接池:在 Web 应用或任何需要频繁连接数据库的场景下,连接池能极大地提升性能。
  8. 配置管理:不要将数据库密码等敏感信息硬编码在代码中,使用环境变量(如 os.getenv())或配置文件(如 configparser.env 文件)来管理它们。 你就可以在 Python 中游刃有余地操作 PostgreSQL 数据库了。
分享:
扫描分享到社交APP
上一篇
下一篇