杰瑞科技汇

Python、Fabric、Celery如何协同工作?

Of course! This is an excellent and very common combination in the world of Python web development. Let's break down what each component is, how they work together, and then build a practical, step-by-step example.

Python、Fabric、Celery如何协同工作?-图1
(图片来源网络,侵删)

The High-Level Overview: What are they?

Imagine you're running a restaurant.

  • Fabric (The Restaurant Manager): Fabric is a tool for automating tasks on remote servers. It's like a manager who can give orders to the kitchen staff (the server) without having to walk over and shout instructions every time. "Hey server, deploy the new menu!", "Restart the cash register!", "Check the logs for any issues!".
  • Celery (The Kitchen Staff): Celery is a distributed task queue. It's your team of chefs in the kitchen. When an order (a task) comes in, the manager (your web app) doesn't cook it themselves. Instead, they write the order on a slip of paper (a message) and put it on a queue (a "to-do" list). The chefs (Celery workers) are constantly watching this queue, pick up orders, and cook them in the background. This way, the manager (your web app) can take new orders immediately without waiting for the old ones to be finished.
  • The Queue (The Order Ticket System): This is the message broker that connects the Manager and the Kitchen. It's where the order slips are stored. Common choices are Redis or RabbitMQ.

Fabric: Automating Server Management

Fabric is a Python library and command-line tool for streamlining the use of SSH for application deployment, administration, and system maintenance tasks.

Key Concepts:

  • fabfile.py: This is the heart of Fabric. It's a Python file where you define your tasks.
  • env object: A global dictionary that stores environment information, like which server to connect to (env.hosts), your username (env.user), and password or SSH key (env.password or env.key_filename).
  • Tasks: Functions decorated with @task. You run them from the command line with fab <task_name>.

Example fabfile.py:

Python、Fabric、Celery如何协同工作?-图2
(图片来源网络,侵删)
# fabfile.py
from fabric import Connection, task
import os
# --- Environment Configuration ---
# You can set these here or use environment variables
# For production, it's better to use a .env file or pass them as arguments
env = {
    'hosts': ['user@your_server_ip'],
    'user': 'your_server_user',
    'key_filename': '/path/to/your/private_key', # Or use password auth
    'project_root': '/var/www/my_project',
    'repo_url': 'git@github.com:your_user/your_project.git'
}
@task
def deploy(c):
    """Deploys the latest version of the project."""
    print("Starting deployment...")
    # 1. Pull the latest code from the repository
    with c.cd(env['project_root']):
        print("Pulling latest code...")
        c.run('git pull origin main')
    # 2. Install/Update Python dependencies
    print("Installing dependencies...")
    with c.cd(env['project_root']):
        c.run('source venv/bin/activate && pip install -r requirements.txt')
    # 3. Collect static files
    print("Collecting static files...")
    with c.cd(env['project_root']):
        c.run('source venv/bin/activate && python manage.py collectstatic --noinput')
    # 4. Apply database migrations
    print("Applying database migrations...")
    with c.cd(env['project_root']):
        c.run('source venv/bin/activate && python manage.py migrate')
    # 5. Restart the application server (e.g., Gunicorn)
    print("Restarting Gunicorn...")
    c.sudo('systemctl restart myproject_gunicorn')
    # 6. Restart Celery Workers
    print("Restarting Celery workers...")
    c.sudo('systemctl restart myproject_celery')
    print("Deployment complete!")
# You can also define simpler tasks
@task
def restart_web(c):
    """Restarts the web server (Gunicorn)."""
    c.sudo('systemctl restart myproject_gunicorn')
@task
def restart_workers(c):
    """Restarts the Celery workers."""
    c.sudo('systemctl restart myproject_celery')

How to run it:

# Connect to the server and run the deploy task
fab -i /path/to/your/private_key -H user@your_server_ip deploy
# Or if you've defined the env in the fabfile
fab deploy

Celery: Asynchronous Task Processing

Celery allows you to run time-consuming operations in the background so your main application can remain responsive.

Key Concepts:

  • Task: A Python function decorated with @app.task. This is the "recipe" for your background job.
  • Broker: The message transport system (e.g., Redis, RabbitMQ) that receives messages from the application and delivers them to the workers.
  • Worker: A process that listens on the broker for tasks and executes them.
  • Result Backend (Optional): Where Celery stores the results of your tasks (e.g., Redis, a database). This allows you to check if a task is complete and get its return value.

Example tasks.py (inside a Django/Flask app):

Python、Fabric、Celery如何协同工作?-图3
(图片来源网络,侵删)
# your_project/tasks.py
from celery import Celery
import time
import requests
# --- Celery Configuration ---
# The broker_url tells Celery where to get tasks from.
# The result_backend tells Celery where to store results.
app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)
# A simple task
@app.task
def add(x, y):
    """A simple task to add two numbers."""
    return x + y
# A more realistic, long-running task
@app.task
def send_notification_email(user_email, message):
    """Sends an email (simulated)."""
    print(f"Sending email to {user_email}...")
    # Simulate a network call
    time.sleep(5) 
    print(f"Email sent successfully to {user_email}!")
    return {"status": "success", "email": user_email}
# A task that calls an external API
@app.task
def fetch_weather_data(city):
    """Fetches weather data from an external API."""
    print(f"Fetching weather for {city}...")
    try:
        # Replace with a real API call
        # response = requests.get(f"http://api.weather.com/{city}")
        # response.raise_for_status()
        # return response.json()
        return {"city": city, "temp": 22, "condition": "Sunny"}
    except requests.exceptions.RequestException as e:
        print(f"Error fetching weather: {e}")
        return {"error": str(e)}

How to run it:

  1. Start the Celery Worker:

    # From your project's root directory (where manage.py is for Django)
    celery -A your_project.tasks worker --loglevel=info

    This command tells Celery to look for tasks in your_project.tasks and start listening for jobs.

  2. Calling the Task from your App: In your Django view or Flask route:

    # views.py
    from django.http import JsonResponse
    from .tasks import send_notification_email, fetch_weather_data
    def send_email_view(request):
        # The .delay() method sends the task to the queue and immediately returns
        # an AsyncResult object. The web server is not blocked.
        task = send_notification_email.delay("user@example.com", "Your order is ready!")
        return JsonResponse({
            "message": "Email sending initiated!",
            "task_id": task.id  # You can use this ID to check the task's status later
        })
    def get_weather_view(request, city):
        task = fetch_weather_data.delay(city)
        return JsonResponse({
            "message": "Weather data fetching initiated!",
            "task_id": task.id
        })

Putting It All Together: The Full Workflow

This is where Fabric and Celery shine together. You use Fabric to manage your Celery workers on the server.

Project Structure:

my_project/
├── fabfile.py
├── manage.py
├── my_project/
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── tasks.py  <-- Your Celery tasks
├── requirements.txt
└── venv/

Step-by-Step Workflow:

  1. Local Development:

    • You write your Django/Flask app and define Celery tasks in tasks.py.
    • You run celery -A my_project.tasks worker in your local terminal to start a worker.
  2. Deployment with Fabric:

    • When you run fab deploy, Fabric connects to your server and executes the steps:
      • Pulls the latest code (including your new tasks).
      • Installs dependencies.
      • M
分享:
扫描分享到社交APP
上一篇
下一篇