杰瑞科技汇

Python Tornado如何高效连接与操作MySQL?

Of course! Using Python with Tornado and MySQL is a common stack for building high-performance, non-blocking web applications. The key to making this combination work effectively is to handle database operations asynchronously.

Python Tornado如何高效连接与操作MySQL?-图1
(图片来源网络,侵删)

Here’s a comprehensive guide covering:

  1. Why not use a standard MySQL driver? (The Problem)
  2. The Solution: Asynchronous MySQL Drivers
  3. Setup and Installation
  4. Complete Code Example: A Simple REST API
  5. Best Practices and Advanced Topics

The Problem: Why Not mysql-connector-python or PyMySQL?

Tornado is an asynchronous (non-blocking) web framework. This means when a request comes in, Tornado doesn't wait for I/O operations (like a database query) to complete before moving on to the next request. If you use a standard, synchronous database driver, your Tornado server will block. It will wait for the database query to finish, defeating the purpose of using Tornado in the first place and severely limiting your application's concurrency.

The Solution: Use an asynchronous MySQL driver that integrates with Tornado's I/O loop.


The Solution: Asynchronous MySQL Drivers

There are two excellent choices for this:

Python Tornado如何高效连接与操作MySQL?-图2
(图片来源网络,侵删)

Option A: aiomysql (Recommended)

  • What it is: A "driver" for asyncio, Python's built-in asynchronous I/O framework. It provides an async/await interface.
  • Why it's great: It's the modern, idiomatic way to do async in Python. The code is clean and easy to read. Since Tornado 6.0, Tornado has full support for asyncio.
  • How it works: It uses a pool of connections. When you need to query the database, you borrow a connection from the pool, execute the query, and return the connection to the pool. This is much more efficient than creating a new connection for every query.

Option B: torndb (Legacy)

  • What it is: A wrapper around the standard MySQLdb library that was included with older versions of Tornado.
  • Status: Deprecated and not recommended for new projects. It hasn't seen significant updates in years and doesn't support async/await. While it's simple to use, it's based on older patterns and lacks the performance and features of aiomysql.

Conclusion: For any new project, use aiomysql.


Setup and Installation

First, let's set up a virtual environment and install the necessary libraries.

# 1. Create and activate a virtual environment (recommended)
python -m venv venv
# On Windows:
# venv\Scripts\activate
# On macOS/Linux:
# source venv/bin/activate
# 2. Install the required packages
pip install tornado aiomysql

You will also need a MySQL server running. Make sure you have a database, a user, and a password ready.


Complete Code Example: A Simple REST API

Let's build a simple Tornado application that provides an asynchronous API to fetch and add users from a MySQL database.

Python Tornado如何高效连接与操作MySQL?-图3
(图片来源网络,侵删)

Step 1: Database Setup

Connect to your MySQL server and create a database and a table.

CREATE DATABASE tornado_app;
USE tornado_app;
CREATE TABLE users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    email VARCHAR(255) NOT NULL UNIQUE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Step 2: The Python Application (app.py)

This single file will contain our Tornado application, database connection logic, and request handlers.

import asyncio
import os
import tornado.ioloop
import tornado.web
import tornado.httpserver
import aiomysql
from tornado.options import define, options, parse_command_line
# --- Configuration ---
# Use command-line flags or environment variables
define("port", default=8888, help="run on the given port", type=int)
define("mysql_host", default="127.0.0.1", help="MySQL host")
define("mysql_port", default=3306, help="MySQL port")
define("mysql_database", default="tornado_app", help="MySQL database")
define("mysql_user", default="root", help="MySQL user")
define("mysql_password", default="your_password", help="MySQL password")
# --- Database Connection Pool ---
async def create_db_pool():
    """Creates a connection pool for aiomysql."""
    pool = await aiomysql.create_pool(
        host=options.mysql_host,
        port=options.mysql_port,
        user=options.mysql_user,
        password=options.mysql_password,
        db=options.mysql_database,
        minsize=5,  # Minimum number of connections to keep in the pool
        maxsize=10, # Maximum number of connections in the pool
        autocommit=True # Automatically commit changes
    )
    print("Database connection pool created.")
    return pool
# --- Request Handlers ---
class BaseHandler(tornado.web.RequestHandler):
    """Base handler to access the database pool from all other handlers."""
    def initialize(self, db_pool):
        self.db_pool = db_pool
    async def prepare(self):
        """Called before get/post/etc."""
        # Get a connection from the pool for the request
        self.db = await self.db_pool.acquire()
        print(f"Connection acquired for {self.request.uri}")
    async def on_finish(self):
        """Called after the response has been sent to the client."""
        # Return the connection to the pool
        self.db_pool.release(self.db)
        print(f"Connection released for {self.request.uri}")
class MainHandler(BaseHandler):
    async def get(self):
        self.write("Welcome to the Tornado + MySQL API!")
class UserHandler(BaseHandler):
    async def get(self, user_id=None):
        """Get all users or a single user by ID."""
        if user_id:
            # Get a single user
            try:
                user_id = int(user_id)
            except ValueError:
                self.set_status(400)
                self.write({"error": "Invalid user ID"})
                return
            async with self.db.cursor(aiomysql.DictCursor) as cursor:
                await cursor.execute("SELECT id, name, email FROM users WHERE id = %s", (user_id,))
                user = await cursor.fetchone()
                if user:
                    self.write(user)
                else:
                    self.set_status(404)
                    self.write({"error": "User not found"})
        else:
            # Get all users
            async with self.db.cursor(aiomysql.DictCursor) as cursor:
                await cursor.execute("SELECT id, name, email FROM users")
                users = await cursor.fetchall()
                self.write(users)
    async def post(self):
        """Create a new user."""
        data = tornado.escape.json_decode(self.request.body)
        name = data.get('name')
        email = data.get('email')
        if not name or not email:
            self.set_status(400)
            self.write({"error": "Name and email are required"})
            return
        async with self.db.cursor() as cursor:
            try:
                await cursor.execute(
                    "INSERT INTO users (name, email) VALUES (%s, %s)",
                    (name, email)
                )
                # Get the ID of the newly created user
                user_id = cursor.lastrowid
                self.set_status(201)
                self.write({"id": user_id, "name": name, "email": email})
            except Exception as e:
                self.set_status(500)
                self.write({"error": str(e)})
# --- Application Setup ---
def make_app():
    # Initialize the database pool
    # In a real app, you might manage this more globally
    db_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(db_loop)
    db_pool = db_loop.run_until_complete(create_db_pool())
    # Define routes
    return tornado.web.Application([
        (r"/", MainHandler),
        (r"/api/users", UserHandler),
        (r"/api/users/(\d+)", UserHandler),
    ], db_pool=db_pool)
if __name__ == "__main__":
    parse_command_line()
    app = make_app()
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(options.port)
    print(f"Server running on http://localhost:{options.port}")
    tornado.ioloop.IOLoop.current().start()

Step 3: Run the Application

  1. Make sure your MySQL server is running and the database/table are set up.
  2. Save the code above as app.py.
  3. Run it from your terminal:
    python app.py

    You should see: Server running on http://localhost:8888 and Database connection pool created.

Step 4: Test the API

You can use curl or Postman to test your endpoints.

Create a new user (POST):

curl -X POST http://localhost:8888/api/users \
-H "Content-Type: application/json" \
-d '{"name": "Alice", "email": "alice@example.com"}'

Expected Output:

{"id": 1, "name": "Alice", "email": "alice@example.com"}

Get all users (GET):

curl http://localhost:8888/api
分享:
扫描分享到社交APP
上一篇
下一篇