下面我将为你提供一个从零开始的详细指南,包括安装、连接、基本操作(增删改查)、处理事务以及使用更高级的 psycopg2.pool 连接池。

第一步:安装必要的库
你需要安装 psycopg2 库,在你的终端或命令行中运行:
pip install psycopg2-binary
psycopg2 vs psycopg2-binary:
psycopg2: 这是一个纯 Python 包,需要你预先安装好 PostgreSQL 的开发头文件(如libpq-dev或postgresql-devel),如果你是从源码编译安装 PostgreSQL,或者是在服务器环境中,这个版本更合适。psycopg2-binary: 这是一个预编译的二进制包,包含了所有需要的依赖,安装非常简单,适合大多数开发者和快速原型开发。对于初学者,推荐直接使用psycopg2-binary。
第二步:连接到 PostgreSQL 数据库
连接数据库是所有操作的第一步,你需要提供数据库的主机、端口、数据库名、用户名和密码。
示例代码:

import psycopg2
import psycopg2.extras # 使用 DictCursor,让查询结果以字典形式返回,更方便
# --- 1. 定义数据库连接信息 ---
# 建议将敏感信息(如密码)放在环境变量或配置文件中,而不是硬编码在代码里
DB_HOST = "localhost"
DB_PORT = "5432"
DB_NAME = "testdb"
DB_USER = "postgres"
DB_PASSWORD = "your_password"
try:
# --- 2. 建立连接 ---
# 使用 with 语句可以确保连接在使用完毕后被正确关闭
conn = psycopg2.connect(
host=DB_HOST,
port=DB_PORT,
database=DB_NAME,
user=DB_USER,
password=DB_PASSWORD
)
# --- 3. 创建一个游标对象 ---
# 游标用于执行 SQL 语句并获取结果
# 使用 DictCursor 可以让查询结果像字典一样访问,row['column_name']
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
print("成功连接到 PostgreSQL 数据库!")
# --- 4. 执行一个简单的查询 ---
cur.execute("SELECT version();")
db_version = cur.fetchone() # 获取一行结果
print(f"PostgreSQL 数据库版本: {db_version[0]}")
except (Exception, psycopg2.Error) as error:
print(f"连接数据库时出错: {error}")
finally:
# --- 5. 关闭游标和连接 ---
if 'cur' in locals():
cur.close()
if 'conn' in locals() and conn is not None:
conn.close()
print("数据库连接已关闭。")
第三步:基本数据库操作(CRUD)
CRUD 代表 Create(创建)、Read(读取)、Update(更新)和 Delete(删除)。
创建表 和 插入数据
import psycopg2
# (使用上面的连接信息...)
try:
conn = psycopg2.connect(...) # 连接数据库
cur = conn.cursor()
# --- 创建表 ---
# 使用 IF NOT EXISTS 避免重复创建时出错
create_table_query = """
CREATE TABLE IF NOT EXISTS employees (
id SERIAL PRIMARY KEY,
name VARCHAR(50) NOT NULL,
age INT,
department VARCHAR(50)
);
"""
cur.execute(create_table_query)
print("表 'employees' 创建成功或已存在。")
# --- 插入单条数据 ---
insert_query = "INSERT INTO employees (name, age, department) VALUES (%s, %s, %s);"
# 注意:数据必须以元组或列表的形式传递,即使只有一个值也要加逗号,如 ('John', 30, 'IT')
employee_data = ('John Doe', 30, 'IT')
cur.execute(insert_query, employee_data)
# --- 插入多条数据 ---
employees_data = [
('Jane Smith', 25, 'HR'),
('Peter Jones', 45, 'Finance')
]
# executemany 用于批量插入,效率更高
cur.executemany(insert_query, employees_data)
# --- 提交事务 ---
# 在 PostgreSQL 中,DML 操作(如 INSERT, UPDATE, DELETE)需要手动提交才能生效
conn.commit()
print(f"成功插入了 {cur.rowcount} 条数据。")
except (Exception, psycopg2.Error) as error:
print(f"操作数据库时出错: {error}")
# 如果出错,回滚事务
if 'conn' in locals():
conn.rollback()
finally:
if 'cur' in locals():
cur.close()
if 'conn' in locals() and conn is not None:
conn.close()
重要提示:SQL 注入防护
永远不要使用 Python 的字符串格式化(如 f"INSERT ... VALUES({name})")来拼接 SQL 语句,这会导致严重的 SQL 注入漏洞。始终使用 %s 作为占位符,并将数据作为第二个参数传递给 execute 或 executemany。psycopg2 会为你安全地处理这些数据。
查询数据
# (连接代码...)
try:
conn = psycopg2.connect(...)
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) # 使用 DictCursor 更方便
# --- 查询所有员工 ---
cur.execute("SELECT * FROM employees;")
all_employees = cur.fetchall() # 获取所有结果
print("\n所有员工信息:")
for emp in all_employees:
# 使用 DictCursor 后,可以通过列名访问
print(f"ID: {emp['id']}, Name: {emp['name']}, Age: {emp['age']}, Dept: {emp['department']}")
# --- 查询特定部门的员工 ---
dept = 'IT'
cur.execute("SELECT * FROM employees WHERE department = %s;", (dept,)) # 注意元组后的逗号
it_employees = cur.fetchall()
print(f"\n{dept} 部门的员工:")
for emp in it_employees:
print(f"ID: {emp['id']}, Name: {emp['name']}")
except (Exception, psycopg2.Error) as error:
print(f"查询数据时出错: {error}")
finally:
# (关闭代码...)
更新数据
# (连接代码...)
try:
conn = psycopg2.connect(...)
cur = conn.cursor()
employee_id_to_update = 1
new_age = 31
# 更新 ID 为 1 的员工的年龄
update_query = "UPDATE employees SET age = %s WHERE id = %s;"
cur.execute(update_query, (new_age, employee_id_to_update))
conn.commit()
print(f"成功更新 ID 为 {employee_id_to_update} 的员工的年龄为 {new_age}。")
except (Exception, psycopg2.Error) as error:
print(f"更新数据时出错: {error}")
if 'conn' in locals():
conn.rollback()
finally:
# (关闭代码...)
删除数据
# (连接代码...)
try:
conn = psycopg2.connect(...)
cur = conn.cursor()
employee_id_to_delete = 2
# 删除 ID 为 2 的员工
delete_query = "DELETE FROM employees WHERE id = %s;"
cur.execute(delete_query, (employee_id_to_delete,))
conn.commit()
print(f"成功删除 ID 为 {employee_id_to_delete} 的员工。")
except (Exception, psycopg2.Error) as error:
print(f"删除数据时出错: {error}")
if 'conn' in locals():
conn.rollback()
finally:
# (关闭代码...)
第四步:使用连接池(高级用法)
在高并发应用中,频繁地创建和销毁数据库连接会带来很大的性能开销,连接池(Connection Pooling)可以复用已建立的连接,显著提高性能。
psycopg2 自带了 psycopg2.pool 模块。
示例代码:
import psycopg2
from psycopg2 import pool
# --- 1. 创建一个连接池 ---
# 最小连接数: 2, 最大连接数: 5
try:
connection_pool = psycopg2.pool.ThreadedConnectionPool(
minconn=2,
maxconn=5,
host="localhost",
database="testdb",
user="postgres",
password="your_password"
)
print("连接池创建成功!")
except (Exception, psycopg2.Error) as error:
print(f"创建连接池时出错: {error}")
exit(1)
# --- 2. 从连接池中获取连接 ---
def get_employee(employee_id):
conn = None
try:
# 从池中获取一个连接
conn = connection_pool.getconn()
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cur.execute("SELECT * FROM employees WHERE id = %s;", (employee_id,))
employee = cur.fetchone()
return employee
except (Exception, psycopg2.Error) as error:
print(f"获取员工 {employee_id} 时出错: {error}")
return None
finally:
# 将连接放回池中,而不是关闭它!
if conn:
connection_pool.putconn(conn)
# --- 3. 使用连接池 ---
emp = get_employee(1)
if emp:
print(f"从连接池获取到的员工: {emp}")
# --- 4. 关闭连接池 ---
# 当你的应用程序关闭时,记得关闭连接池
if 'connection_pool' in locals():
connection_pool.closeall()
print("连接池已关闭。")
总结与最佳实践
- 使用
psycopg2-binary:对于大多数开发者来说,这是最简单快捷的选择。 - 使用
with语句:确保连接和游标在使用完毕后被正确关闭,即使在发生异常时也能执行清理操作。 - 始终使用参数化查询 (
%s):这是防止 SQL 注入的黄金法则。 - 显式提交 (
conn.commit()):不要忘记在执行完 DML 操作后提交事务。 - 处理异常和回滚 (
try...except...rollback):将数据库操作放在try...except块中,并在出错时调用conn.rollback()来撤销未提交的更改。 - 使用
DictCursor:让查询结果以字典形式返回,代码可读性更强。 - 考虑使用连接池:在 Web 应用或任何需要频繁连接数据库的场景下,连接池能极大地提升性能。
- 配置管理:不要将数据库密码等敏感信息硬编码在代码中,使用环境变量(如
os.getenv())或配置文件(如configparser或.env文件)来管理它们。 你就可以在 Python 中游刃有余地操作 PostgreSQL 数据库了。
