杰瑞科技汇

如何在Ubuntu系统用Python连接MySQL?

Of course! This is a very common and powerful combination. Let's break down how to work with Python and MySQL on an Ubuntu system.

如何在Ubuntu系统用Python连接MySQL?-图1
(图片来源网络,侵删)

This guide will cover:

  1. Prerequisites: Installing Python and MySQL on Ubuntu.
  2. Connecting Python to MySQL: Using the official mysql-connector-python library.
  3. Executing Queries: Running SELECT, INSERT, UPDATE, and DELETE statements.
  4. Best Practices: Using try...except...finally for robust code and with statements for automatic resource cleanup.
  5. An Alternative Library: A quick look at SQLAlchemy, a popular Object-Relational Mapper (ORM).
  6. A Complete Example: A full script that ties everything together.

Prerequisites: Installing Python and MySQL on Ubuntu

First, ensure you have Python and a MySQL server installed on your Ubuntu machine.

a) Install Python

Ubuntu usually comes with Python pre-installed. You can check by opening a terminal and running:

python3 --version

If it's not installed, you can install it with:

如何在Ubuntu系统用Python连接MySQL?-图2
(图片来源网络,侵删)
sudo apt update
sudo apt install python3 python3-pip

pip3 is the package installer for Python 3, which we'll need to install the MySQL connector library.

b) Install MySQL Server

If you don't have MySQL server installed, follow these steps:

  1. Install the MySQL Server package:

    sudo apt update
    sudo apt install mysql-server
  2. Run the security script: This is a highly recommended step that helps you secure your MySQL installation by setting a root password, removing anonymous users, disallowing remote root login, and more.

    如何在Ubuntu系统用Python连接MySQL?-图3
    (图片来源网络,侵删)
    sudo mysql_secure_installation

    Follow the prompts on the screen. For this guide, you will set a root password. Remember this password!

  3. Verify MySQL is running:

    sudo systemctl status mysql

    You should see active (running) in the output.


Connecting Python to MySQL

We'll use the official Oracle-developed MySQL connector for Python.

a) Install the MySQL Connector Library

In your terminal, use pip3 to install the library:

pip3 install mysql-connector-python

b) Create a Test Database and User

It's good practice not to use the root user for your applications. Let's create a new database and a user with privileges on that database.

  1. Log in to the MySQL shell as the root user:

    sudo mysql -u root -p

    Enter the root password you set during mysql_secure_installation.

  2. Run the following SQL commands in the MySQL shell:

    -- Create a new database
    CREATE DATABASE myapp_db;
    -- Create a new user and grant privileges on the new database
    -- Replace 'your_strong_password' with a secure password
    CREATE USER 'myapp_user'@'localhost' IDENTIFIED BY 'your_strong_password';
    -- Grant all privileges on the new database to the new user
    GRANT ALL PRIVILEGES ON myapp_db.* TO 'myapp_user'@'localhost';
    -- Apply the changes immediately
    FLUSH PRIVILEGES;
    -- Exit the MySQL shell
    EXIT;

Executing Queries (The Core Logic)

Now, let's write some Python code to interact with our new database.

a) Establishing a Connection

First, you need to import the library and create a connection object.

import mysql.connector
from mysql.connector import Error
try:
    # Establish the connection
    connection = mysql.connector.connect(
        host='localhost',
        database='myapp_db',
        user='myapp_user',
        password='your_strong_password'
    )
    if connection.is_connected():
        db_info = connection.get_server_info()
        print(f"Successfully connected to MySQL Server version {db_info}")
        cursor = connection.cursor()
        cursor.execute("SELECT database();")
        record = cursor.fetchone()
        print(f"You're connected to database: {record[0]}")
except Error as e:
    print(f"Error while connecting to MySQL: {e}")
finally:
    # Closing the connection
    if 'connection' in locals() and connection.is_connected():
        cursor.close()
        connection.close()
        print("MySQL connection is closed.")

b) Creating a Table and Inserting Data (INSERT)

Here's how to create a table and add a new record.

import mysql.connector
from mysql.connector import Error
def create_table(connection, create_table_query):
    cursor = connection.cursor()
    try:
        cursor.execute(create_table_query)
        print("Table created successfully")
    except Error as e:
        print(f"Error creating table: {e}")
def insert_data(connection, insert_query, data):
    cursor = connection.cursor()
    try:
        # Executing the SQL command
        cursor.execute(insert_query, data)
        # Committing the changes to the database
        connection.commit()
        print(f"{cursor.rowcount} record inserted successfully into table")
    except Error as e:
        print(f"Error inserting data: {e}")
# --- Main part of the script ---
if __name__ == "__main__":
    connection = None
    try:
        connection = mysql.connector.connect(
            host='localhost',
            database='myapp_db',
            user='myapp_user',
            password='your_strong_password'
        )
        if connection.is_connected():
            create_table_query = """
            CREATE TABLE IF NOT EXISTS employees (
                id INT AUTO_INCREMENT PRIMARY KEY,
                name VARCHAR(255) NOT NULL,
                position VARCHAR(100),
                salary DECIMAL(10, 2),
                join_date DATE
            )
            """
            create_table(connection, create_table_query)
            insert_query = "INSERT INTO employees (name, position, salary, join_date) VALUES (%s, %s, %s, %s)"
            employee_data = ('Alice Smith', 'Software Engineer', 90000.00, '2025-01-15')
            insert_data(connection, insert_query, employee_data)
            # Insert another employee
            employee_data_2 = ('Bob Johnson', 'Project Manager', 105000.00, '2025-05-20')
            insert_data(connection, insert_query, employee_data_2)
    except Error as e:
        print(f"Error while connecting to MySQL or executing query: {e}")
    finally:
        if connection and connection.is_connected():
            connection.close()
            print("MySQL connection is closed.")

c) Fetching Data (SELECT)

To retrieve data, you execute a query and then use fetchone(), fetchall(), or fetchmany().

# Assuming 'connection' is already established and valid
def fetch_data(connection, query):
    cursor = connection.cursor(dictionary=True) # dictionary=True gives column names as keys
    try:
        cursor.execute(query)
        records = cursor.fetchall()
        print(f"Total number of rows returned: {cursor.rowcount}")
        return records
    except Error as e:
        print(f"Error fetching data: {e}")
        return None
# --- Inside your main script ---
if connection.is_connected():
    select_query = "SELECT id, name, position FROM employees WHERE salary > %s"
    min_salary = 95000
    employees = fetch_data(connection, select_query, (min_salary,)) # Pass as a tuple
    if employees:
        print("\nEmployees with salary greater than 95,000:")
        for emp in employees:
            print(f"ID: {emp['id']}, Name: {emp['name']}, Position: {emp['position']}")

Note: When using WHERE clauses with variables, always use parameter placeholders (%s) to prevent SQL injection attacks. Pass the values as a tuple.

d) Updating and Deleting Data (UPDATE, DELETE)

The pattern is similar to INSERT. You execute the query and then call connection.commit().

def update_data(connection, query, data):
    cursor = connection.cursor()
    try:
        cursor.execute(query, data)
        connection.commit()
        print(f"{cursor.rowcount} record(s) updated successfully")
    except Error as e:
        print(f"Error updating data: {e}")
def delete_data(connection, query, data):
    cursor = connection.cursor()
    try:
        cursor.execute(query, data)
        connection.commit()
        print(f"{cursor.rowcount} record(s) deleted successfully")
    except Error as e:
        print(f"Error deleting data: {e}")
# --- Example usage ---
if connection.is_connected():
    # Update Bob's salary
    update_query = "UPDATE employees SET salary = %s WHERE name = %s"
    update_data(connection, update_query, (110000.00, 'Bob Johnson'))
    # Delete Alice from the table
    delete_query = "DELETE FROM employees WHERE name = %s"
    delete_data(connection, delete_query, ('Alice Smith',))

Best Practices: Using with Statements

Manually creating and closing cursors and connections can be tedious. The mysql.connector library supports context managers (with statements) to handle this automatically, ensuring resources are always released.

import mysql.connector
from mysql.connector import Error
def execute_query_with_with(host, database, user, password, query, data=None):
    """Connects to the DB, executes a query, and closes connection automatically."""
    try:
        with mysql.connector.connect(
            host=host,
            database=database,
            user=user,
            password=password
        ) connection:
            with connection.cursor() as cursor:
                if data:
                    cursor.execute(query, data)
                else:
                    cursor.execute(query)
                # For SELECT queries, fetch the results
                if query.strip().lower().startswith('select'):
                    result = cursor.fetchall()
                    return result
                else:
                    # For INSERT, UPDATE, DELETE, commit the changes
                    connection.commit()
                    return cursor.rowcount
    except Error as e:
        print(f"Error executing query: {e}")
        return None
# --- Example ---
if __name__ == "__main__":
    db_config = {
        'host': 'localhost',
        'database': 'myapp_db',
        'user': 'myapp_user',
        'password': 'your_strong_password'
    }
    # Select all employees
    select_all_query = "SELECT * FROM employees"
    all_employees = execute_query_with_with(**db_config, query=select_all_query)
    print("\nAll employees:")
    for emp in all_employees:
        print(emp)
    # Insert a new employee
    insert_query = "INSERT INTO employees (name, position, salary, join_date) VALUES (%s, %s, %s, %s)"
    new_employee = ('Charlie Brown', 'Data Analyst', 75000.00, '2025-03-10')
    rows_affected = execute_query_with_with(**db_config, query=insert_query, data=new_employee)
    print(f"\nInserted {rows_affected} new employee.")

An Alternative: SQLAlchemy (ORM)

For larger applications, writing raw SQL can become cumbersome. An Object-Relational Mapper (ORM) like SQLAlchemy allows you to interact with your database using Python objects instead of SQL queries.

Installation:

pip3 install sqlalchemy

Conceptual Example:

from sqlalchemy import create_engine, Column, Integer, String, Date, Numeric
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 1. Define the database URL (format: dialect+driver://user:password@host:port/database)
DATABASE_URL = "mysql+mysqlconnector://myapp_user:your_strong_password@localhost/myapp_db"
# 2. Create the engine
engine = create_engine(DATABASE_URL)
# 3. Create a base class for our models
Base = declarative_base()
# 4. Define the Employee table as a Python class
class Employee(Base):
    __tablename__ = 'employees'
    id = Column(Integer, primary_key=True)
    name = Column(String(255), nullable=False)
    position = Column(String(100))
    salary = Column(Numeric(10, 2))
    join_date = Column(Date)
# 5. Create the table in the database (if it doesn't exist)
Base.metadata.create_all(engine)
# 6. Create a session to interact with the database
Session = sessionmaker(bind=engine)
session = Session()
# 7. Use the session to interact with the database (Pythonically!)
# Create a new employee object
new_employee = Employee(name='Diana Prince', position='Hero', salary=120000.00, join_date='2025-08-01')
session.add(new_employee)
# Query for employees
employees = session.query(Employee).filter(Employee.salary > 80000).all()
print("\nEmployees (via SQLAlchemy ORM):")
for emp in employees:
    print(f"Name: {emp.name}, Position: {emp.position}, Salary: {emp.salary}")
# Commit the transaction
session.commit()
session.close()

Complete Example Script

Here is a single, complete script that demonstrates all the CRUD (Create, Read, Update, Delete) operations using the with statement best practice.

# main.py
import mysql.connector
from mysql.connector import Error
import sys
# --- Database Configuration ---
# It's better to load this from a config file or environment variables
DB_CONFIG = {
    'host': 'localhost',
    'database': 'myapp_db',
    'user': 'myapp_user',
    'password': 'your_strong_password'
}
def setup_database():
    """Creates the table if it doesn't exist."""
    create_table_query = """
    CREATE TABLE IF NOT EXISTS products (
        id INT AUTO_INCREMENT PRIMARY KEY,
        name VARCHAR(255) NOT NULL,
        quantity INT DEFAULT 0,
        price DECIMAL(10, 2)
    )
    """
    execute_query(create_table_query)
def execute_query(query, data=None):
    """Connects to DB, executes a query, and closes connection."""
    try:
        with mysql.connector.connect(**DB_CONFIG) as connection:
            with connection.cursor() as cursor:
                cursor.execute(query, data)
                # For SELECT queries, fetch and return results
                if query.strip().lower().startswith('select'):
                    result = cursor.fetchall()
                    return result
                # For other queries, commit and return row count
                else:
                    connection.commit()
                    return cursor.rowcount
    except Error as e:
        print(f"Error: {e}")
        return None
def add_product(name, quantity, price):
    """Inserts a new product into the products table."""
    query = "INSERT INTO products (name, quantity, price) VALUES (%s, %s, %s)"
    data = (name, quantity, price)
    rows_affected = execute_query(query, data)
    if rows_affected is not None:
        print(f"Successfully added {name}. Rows affected: {rows_affected}")
def get_all_products():
    """Retrieves and prints all products from the table."""
    query = "SELECT id, name, quantity, price FROM products"
    products = execute_query(query)
    if products:
        print("\n--- Current Products ---")
        for product in products:
            print(f"ID: {product[0]}, Name: {product[1]}, Qty: {product[2]}, Price: ${product[3]:.2f}")
        print("------------------------\n")
    else:
        print("No products found.")
def update_product_quantity(product_id, new_quantity):
    """Updates the quantity of a specific product."""
    query = "UPDATE products SET quantity = %s WHERE id = %s"
    data = (new_quantity, product_id)
    rows_affected = execute_query(query, data)
    if rows_affected is not None:
        print(f"Updated quantity for product ID {product_id}. Rows affected: {rows_affected}")
def delete_product(product_id):
    """Deletes a product from the table."""
    query = "DELETE FROM products WHERE id = %s"
    data = (product_id,)
    rows_affected = execute_query(query, data)
    if rows_affected is not None:
        print(f"Deleted product ID {product_id}. Rows affected: {rows_affected}")
# --- Main execution block ---
if __name__ == "__main__":
    print("Setting up database...")
    setup_database()
    print("\nAdding initial products...")
    add_product("Laptop", 10, 1200.50)
    add_product("Mouse", 50, 25.00)
    add_product("Keyboard", 30, 75.75)
    print("\nFetching all products:")
    get_all_products()
    print("\nUpdating quantity for product ID 2 (Mouse) to 45...")
    update_product_quantity(2, 45)
    print("\nFetching all products after update:")
    get_all_products()
    print("\nDeleting product ID 3 (Keyboard)...")
    delete_product(3)
    print("\nFetching all products after deletion:")
    get_all_products()
    print("\nScript finished.")
分享:
扫描分享到社交APP
上一篇
下一篇