安装 psycopg2
你需要安装 psycopg2 库,打开你的终端或命令提示符,运行以下命令:

pip install psycopg2-binary
注意:
psycopg2-binary是一个预编译的版本,非常适合开发和测试,安装非常简单。- 如果你在生产环境或有特殊需求(如性能优化),可以考虑安装
psycopg2的源码版本 (pip install psycopg2),但这需要你的系统上安装了 C 编译器(如 GCC)和 PostgreSQL 的开发头文件。
基础连接与操作
下面是一个最简单的 Python 脚本,用于连接到 PostgreSQL 数据库,执行一个查询,并打印结果。
准备工作
假设你已经有一个 PostgreSQL 数据库正在运行,并且有一个数据库、一个用户和一个密码。
- 数据库名:
mydb - 用户名:
myuser - 密码:
mypassword - 主机:
localhost(如果你数据库在同一台机器上) - 端口:
5432(PostgreSQL 的默认端口)
示例代码 (basic_example.py)
import psycopg2
import os
# --- 方式一:直接在代码中硬编码连接信息 (不推荐用于生产环境) ---
# db_config = {
# "host": "localhost",
# "database": "mydb",
# "user": "myuser",
# "password": "mypassword",
# "port": "5432"
# }
# --- 方式二:从环境变量中读取连接信息 (推荐) ---
# 这样更安全,避免密码泄露在代码中
db_config = {
"host": os.getenv("DB_HOST", "localhost"),
"database": os.getenv("DB_NAME", "mydb"),
"user": os.getenv("DB_USER", "myuser"),
"password": os.getenv("DB_PASSWORD", "mypassword"),
"port": os.getenv("DB_PORT", "5432")
}
try:
# 1. 建立连接
# 使用 **db_config 将字典解包为关键字参数
conn = psycopg2.connect(**db_config)
# 2. 创建一个游标对象
# 游标用于执行 SQL 语句并管理查询结果
cursor = conn.cursor()
# 3. 执行 SQL 查询
# 假设我们有一个名为 'employees' 的表
print("正在连接到数据库...")
cursor.execute("SELECT version();")
# 4. 获取并打印查询结果
db_version = cursor.fetchone()
print(f"PostgreSQL 数据库版本: {db_version[0]}")
# 另一个查询示例
print("\n正在查询员工信息...")
cursor.execute("SELECT id, name, email FROM employees;")
# fetchone() 获取下一行
# row = cursor.fetchone()
# print(f"第一行数据: {row}")
# fetchall() 获取所有剩余的行
all_employees = cursor.fetchall()
print("所有员工信息:")
for emp in all_employees:
print(f"ID: {emp[0]}, 姓名: {emp[1]}, 邮箱: {emp[2]}")
except (Exception, psycopg2.Error) as error:
# 5. 发生错误时打印错误信息
print(f"数据库操作出错: {error}")
finally:
# 6. 关闭游标和连接
# 确保无论是否发生错误,资源都会被正确释放
if 'conn' in locals() and conn is not None:
if 'cursor' in locals() and cursor is not None:
cursor.close()
conn.close()
print("\n数据库连接已关闭。")
如何运行:

- 将上述代码保存为
basic_example.py。 - 如果使用方式二,你需要先设置环境变量(例如在 Linux/macOS 中:
export DB_PASSWORD="mypassword",在 Windows 中:set DB_PASSWORD="mypassword")。 - 在终端中运行:
python basic_example.py
使用 with 语句(上下文管理器)
为了更安全、更方便地管理连接和游标,psycopg2 支持上下文管理器 (with 语句),这样可以确保资源(游标和连接)在使用完毕后自动关闭,即使发生错误也不例外。
示例代码 (with_example.py)
import psycopg2
import os
db_config = {
"host": os.getenv("DB_HOST", "localhost"),
"database": os.getenv("DB_NAME", "mydb"),
"user": os.getenv("DB_USER", "myuser"),
"password": os.getenv("DB_PASSWORD", "mypassword"),
"port": os.getenv("DB_PORT", "5432")
}
try:
# with 语句会自动处理连接的关闭
with psycopg2.connect(**db_config) as conn:
# with 语句会自动处理游标的关闭
with conn.cursor() as cursor:
print("数据库连接成功。")
# 执行 SQL
cursor.execute("SELECT id, name FROM employees WHERE id = %s;", (1,)) # 注意这里的参数化查询
# 获取结果
employee = cursor.fetchone()
if employee:
print(f"找到员工: ID={employee[0]}, Name={employee[1]}")
else:
print("未找到ID为1的员工。")
except (Exception, psycopg2.Error) as error:
print(f"数据库操作出错: {error}")
# 不需要手动写 conn.close() 或 cursor.close()
参数化查询(防止 SQL 注入)
永远不要使用 Python 字符串拼接来构建 SQL 查询,这极易导致 SQL 注入攻击。psycopg2 提供了安全的参数化查询方式。
✅ 正确做法(使用 %s 占位符)
# 假设 user_id 是一个来自用户输入的变量 user_id = 1 # 使用 %s 作为占位符,然后将参数作为元组传递给第二个参数 # 元组中即使只有一个参数,末尾也必须有逗号,如 (user_id,) sql_query = "SELECT * FROM users WHERE id = %s;" cursor.execute(sql_query, (user_id,))
❌ 错误做法(易受 SQL 注入)
# 绝对不要这样做! user_id = "1; DROP TABLE users; --" sql_query = "SELECT * FROM users WHERE id = " + user_id + ";" cursor.execute(sql_query) # 这将导致灾难性后果
插入、更新和删除数据
对于修改数据的操作(INSERT, UPDATE, DELETE),你需要提交事务才能使更改永久化。
示例代码 (crud_example.py)
import psycopg2
import os
db_config = {
"host": os.getenv("DB_HOST", "localhost"),
"database": os.getenv("DB_NAME", "mydb"),
"user": os.getenv("DB_USER", "myuser"),
"password": os.getenv("DB_PASSWORD", "mypassword"),
"port": os.getenv("DB_PORT", "5432")
}
try:
with psycopg2.connect(**db_config) as conn:
with conn.cursor() as cursor:
# --- 插入数据 ---
insert_query = "INSERT INTO employees (name, email) VALUES (%s, %s) RETURNING id;"
# RETURNING id 可以获取到刚插入记录的主键
new_employee_name = "张三"
new_employee_email = "zhangsan@example.com"
cursor.execute(insert_query, (new_employee_name, new_employee_email))
new_employee_id = cursor.fetchone()[0] # 获取返回的ID
print(f"成功插入新员工,ID: {new_employee_id}")
# --- 更新数据 ---
update_query = "UPDATE employees SET email = %s WHERE id = %s;"
updated_email = "zhangsan_new@example.com"
cursor.execute(update_query, (updated_email, new_employee_id))
print(f"成功更新ID为 {new_employee_id} 的员工邮箱。")
# --- 删除数据 ---
# delete_query = "DELETE FROM employees WHERE id = %s;"
# cursor.execute(delete_query, (new_employee_id,))
# print(f"成功删除ID为 {new_employee_id} 的员工。")
# --- 提交事务 ---
# 所有 DML 操作(INSERT, UPDATE, DELETE)都需要提交
conn.commit()
print("事务已提交。")
except (Exception, psycopg2.Error) as error:
# 发生错误时回滚事务
if 'conn' in locals() and conn is not None:
conn.rollback()
print(f"数据库操作出错,已回滚: {error}")
高级用法:使用连接池 (psycopg2.pool)
在高并发应用中,频繁地创建和销毁数据库连接会带来巨大的性能开销,连接池可以复用已建立的连接,显著提高性能。

示例代码 (pool_example.py)
import psycopg2
import psycopg2.pool
import os
db_config = {
"host": os.getenv("DB_HOST", "localhost"),
"database": os.getenv("DB_NAME", "mydb"),
"user": os.getenv("DB_USER", "myuser"),
"password": os.getenv("DB_PASSWORD", "mypassword"),
"port": os.getenv("DB_PORT", "5432")
}
# 创建一个最小为1,最大为5的连接池
try:
connection_pool = psycopg2.pool.SimpleConnectionPool(
minconn=1,
maxconn=5,
**db_config
)
print("连接池创建成功。")
# 从连接池中获取一个连接
conn = connection_pool.getconn()
print("从连接池中获取了一个连接。")
with conn:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM employees;")
records = cursor.fetchall()
print(f"从连接池中查询到 {len(records)} 条记录。")
# 模拟一些工作
# ...
# 将连接归还给连接池
connection_pool.putconn(conn)
print("连接已归还给连接池。")
except (Exception, psycopg2.Error) as error:
print(f"连接池操作出错: {error}")
finally:
# 关闭所有连接池中的连接
if 'connection_pool' in locals():
connection_pool.closeall()
print("所有连接池连接已关闭。")
使用 ORM:SQLAlchemy
如果你更喜欢面向对象的方式来操作数据库,可以使用 ORM(Object-Relational Mapping)框架,最著名的就是 SQLAlchemy,它底层依然可以使用 psycopg2 作为驱动。
安装 SQLAlchemy
pip install sqlalchemy
示例代码 (sqlalchemy_example.py)
import os
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# --- 1. 定义模型 (Python 类) ---
Base = declarative_base()
class Employee(Base):
__tablename__ = 'employees'
id = Column(Integer, primary_key=True)
name = Column(String)
email = Column(String)
def __repr__(self):
return f"<Employee(id={self.id}, name='{self.name}', email='{self.email}')>"
# --- 2. 创建数据库引擎 ---
# 'postgresql+psycopg2://' 是指定使用 psycopg2 驱动
db_url = "postgresql+psycopg2://{}:{}@{}/{}".format(
os.getenv("DB_USER", "myuser"),
os.getenv("DB_PASSWORD", "mypassword"),
os.getenv("DB_HOST", "localhost"),
os.getenv("DB_NAME", "mydb")
)
engine = create_engine(db_url)
# --- 3. 创建表 (如果不存在) ---
Base.metadata.create_all(engine)
# --- 4. 创建 Session ---
Session = sessionmaker(bind=engine)
session = Session()
# --- 5. 使用 ORM 进行增删改查 ---
try:
# 增
new_emp = Employee(name='李四', email='lisi@example.com')
session.add(new_emp)
session.commit()
print(f"添加新员工: {new_emp}")
# 查
employees = session.query(Employee).all()
print("\n所有员工:")
for emp in employees:
print(emp)
# 改
employee_to_update = session.query(Employee).filter_by(name='李四').first()
if employee_to_update:
employee_to_update.email = 'lisi_updated@example.com'
session.commit()
print(f"\n更新员工: {employee_to_update}")
# 删
# employee_to_delete = session.query(Employee).filter_by(name='李四').first()
# if employee_to_delete:
# session.delete(employee_to_delete)
# session.commit()
# print(f"\n删除员工: {employee_to_delete}")
except Exception as e:
session.rollback()
print(f"ORM 操作出错: {e}")
finally:
# 关闭 Session
session.close()
print("\nSession 已关闭。")
| 场景 | 推荐方法 | 优点 |
|---|---|---|
| 简单脚本、快速原型 | psycopg2 + with 语句 |
轻量级,代码直观,无需额外概念。 |
| Web应用、高并发 | psycopg2.pool |
性能高,避免频繁创建销毁连接。 |
| 复杂应用、大型项目 | SQLAlchemy ORM |
代码更结构化、可维护,面向对象,减少SQL书写。 |
希望这份详细的指南能帮助你顺利地在 Python 中连接和使用 PostgreSQL!
