环境准备
在开始之前,你需要确保以下几点:

a) 安装 MySQL 服务器
如果你的电脑上还没有 MySQL 服务器,你需要先安装它,你可以从 MySQL 官网 下载并安装。
b) 创建数据库和表
安装并启动 MySQL 服务后,创建一个数据库和一张用于测试的表。
-- 登录到 MySQL
mysql -u root -p
-- 创建一个名为 'test_db' 的数据库
CREATE DATABASE IF NOT EXISTS test_db;
-- 使用这个数据库
USE test_db;
-- 创建一张名为 'users' 的表
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL UNIQUE,
age INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 插入一些测试数据
INSERT INTO users (name, email, age) VALUES
('Alice', 'alice@example.com', 28),
('Bob', 'bob@example.com', 32),
('Charlie', 'charlie@example.com', 24);
c) 安装 Python MySQL 驱动
Python 本身不包含 MySQL 驱动,你需要安装一个第三方库,最常用的是 mysql-connector-python。
打开你的终端或命令提示符,运行以下命令:

pip install mysql-connector-python
基础代码示例
下面是一个最简单的 Python 脚本,用于连接到 MySQL 数据库并查询 users 表中的所有数据。
import mysql.connector
from mysql.connector import Error
def get_all_users():
"""连接到 MySQL 数据库并获取所有用户数据"""
try:
# 1. 建立连接
# 请将下面的占位符替换为你自己的数据库信息
connection = mysql.connector.connect(
host='localhost', # 数据库主机地址
database='test_db', # 数据库名称
user='root', # 数据库用户名
password='your_password' # 数据库密码
)
if connection.is_connected():
db_info = connection.get_server_info()
print(f"成功连接到 MySQL 服务器,版本: {db_info}")
# 2. 创建一个游标对象
# 游标用于执行 SQL 查询
cursor = connection.cursor()
# 3. 执行 SQL 查询
sql_query = "SELECT id, name, email, age FROM users"
cursor.execute(sql_query)
# 4. 获取查询结果
# fetchall() 获取所有结果行,返回一个元组列表
records = cursor.fetchall()
print(f"共找到 {cursor.rowcount} 条记录。")
# 5. 遍历并打印结果
print("用户列表:")
for row in records:
print(f"ID: {row[0]}, 姓名: {row[1]}, 邮箱: {row[2]}, 年龄: {row[3]}")
except Error as e:
# 6. 捕获并打印错误
print(f"连接或查询时出错: '{e}'")
finally:
# 7. 关闭游标和连接(非常重要!)
if connection.is_connected():
cursor.close()
connection.close()
print("MySQL 连接已关闭。")
# 调用函数
get_all_users()
代码解释:
mysql.connector.connect(...): 使用你的数据库凭证建立连接。connection.cursor(): 创建一个游标对象,它是执行 SQL 语句的句柄。cursor.execute(sql_query): 执行你编写的 SQL 查询。cursor.fetchall(): 获取查询返回的所有行,结果是一个元组列表,每个元组代表一行数据。cursor.fetchone(): 只获取第一行数据,适用于只期望返回一条结果的查询(按 ID 查询)。cursor.rowcount: 获取受影响的行数或返回的行数。try...except...finally: 这是处理数据库连接的标准模式。try: 尝试执行可能出错的代码。except: 捕获特定的错误(如连接失败、SQL 语法错误)。finally: 无论是否发生错误,这里的代码都会执行,我们用它来确保游标和连接被正确关闭,以释放资源。
使用参数化查询防止 SQL 注入
永远不要使用 Python 的字符串格式化(如 f"SELECT * FROM users WHERE name = '{user_name}'")来构建 SQL 查询,这极易受到 SQL 注入攻击。
正确的方法是使用参数化查询,将变量作为参数传递给 execute 方法。
def get_user_by_name(name_to_find):
"""根据用户名查找用户,使用参数化查询"""
try:
connection = mysql.connector.connect(
host='localhost',
database='test_db',
user='root',
password='your_password'
)
if connection.is_connected():
cursor = connection.cursor()
# 使用 %s 作为占位符,即使变量是数字也使用 %s
sql_query = "SELECT id, name, email, age FROM users WHERE name = %s"
# 将变量作为元组传递给 execute 方法
# 注意:即使只有一个参数,也必须写成 (name_to_find,)
cursor.execute(sql_query, (name_to_find,))
# 获取单条结果
record = cursor.fetchone()
if record:
print(f"找到用户: ID: {record[0]}, 姓名: {record[1]}, 邮箱: {record[2]}, 年龄: {record[3]}")
else:
print(f"未找到名为 '{name_to_find}' 的用户。")
except Error as e:
print(f"查询出错: '{e}'")
finally:
if connection.is_connected():
cursor.close()
connection.close()
# 调用函数
get_user_by_name('Bob')
将数据转换为字典列表
默认情况下,fetchall() 返回的是元组列表,如果你希望列名作为键,可以使用 cursor 的 dictionary 或 namedtuple 功能,这样处理数据会更方便。
def get_users_as_dict():
"""将用户数据获取为字典列表"""
try:
connection = mysql.connector.connect(
host='localhost',
database='test_db',
user='root',
password='your_password'
)
if connection.is_connected():
# 创建字典游标
cursor = connection.cursor(dictionary=True)
sql_query = "SELECT id, name, email, age FROM users"
cursor.execute(sql_query)
# 现在fetchall()返回的是字典列表
records = cursor.fetchall()
print("用户列表 (字典格式):")
for row in records:
print(row)
# 可以方便地通过键访问数据
# print(f"用户名: {row['name']}, 邮箱: {row['email']}")
except Error as e:
print(f"查询出错: '{e}'")
finally:
if connection.is_connected():
cursor.close()
connection.close()
# 调用函数
get_users_as_dict()
使用 with 语句(上下文管理器)
为了简化连接和游标的关闭操作,可以使用 with 语句。mysql.connector 从 8.0.12 版本开始支持上下文管理器。
from mysql.connector import pooling
def get_users_with_with():
"""使用 'with' 语句自动管理资源"""
try:
# 配置连接池
dbconfig = {
"host": "localhost",
"user": "root",
"password": "your_password",
"database": "test_db"
}
# 创建一个连接池
connection_pool = pooling.MySQLConnectionPool(pool_name="mypool", pool_size=5, **dbconfig)
# 从连接池中获取一个连接
with connection_pool.get_connection() as connection:
with connection.cursor(dictionary=True) as cursor:
sql_query = "SELECT id, name, email, age FROM users"
cursor.execute(sql_query)
records = cursor.fetchall()
print("用户列表 (使用 'with' 语句):")
for row in records:
print(row)
except Error as e:
print(f"查询出错: '{e}'")
# 调用函数
get_users_with_with()
使用连接池(高级)
在高并发应用中,频繁地创建和销毁数据库连接会带来很大的性能开销,连接池(Connection Pooling)可以解决这个问题,它预先创建一组数据库连接,并将它们保存在池中,当需要时从中获取,用完后放回池中,而不是关闭。
上面的 get_users_with_with 示例已经展示了如何使用连接池。
- 安装驱动:
pip install mysql-connector-python - 基本流程:
connect->cursor->execute->fetch->close。 - 安全第一: 始终使用参数化查询 (
%s占位符) 来防止 SQL 注入。 - 方便数据: 使用
cursor(dictionary=True)将结果转为字典,方便访问。 - 资源管理: 使用
try...finally或with语句确保连接和游标被正确关闭。 - 性能优化: 在生产环境中,强烈建议使用连接池来管理数据库连接。
希望这份详细的指南能帮助你顺利地在 Python 中使用 MySQL 获取数据!
