杰瑞科技汇

Django 如何实现进度条功能?

由于 Django 是一个同步框架,它在一个请求-响应周期中处理所有事情,如果一个任务需要很长时间,它会阻塞整个 Web 服务器,导致其他用户请求无法处理,实现进度条的核心思想是:将耗时的任务从 Web 请求中分离出来,异步执行,并通过某种机制让前端可以轮询(poll)任务的进度。

Django 如何实现进度条功能?-图1
(图片来源网络,侵删)

下面我将为你介绍几种主流的实现方法,从简单到复杂,并给出详细的代码示例。


核心思想

  1. 用户发起请求:用户点击一个按钮,触发一个任务。
  2. 创建任务记录:后端立即创建一个任务记录(存入数据库),并为其分配一个唯一的 ID,同时设置初始状态(如 PENDING)和进度(0%)。
  3. 异步执行任务:后端将这个耗时的任务交给一个后台 worker(如 Celery)去处理,同时立即向用户返回一个响应,这个响应中包含了任务 ID。
  4. 前端轮询:前端收到任务 ID 后,开始通过 JavaScript 定期(例如每秒)向后端发送一个请求,询问该任务的当前进度。
  5. 后端更新进度:后台 worker 在执行任务的过程中,会定期更新数据库中对应任务的进度信息。
  6. 前端显示进度:前端每次轮询都获取到最新的进度,并用进度条组件展示出来。
  7. 任务完成:当任务完成后,后台 worker 将任务状态标记为 SUCCESSFAILURE,前端轮询到最终状态后,停止轮询并显示最终结果。

使用数据库 + 简单轮询 (Django 内置)

这是最基础的方法,不需要额外的依赖,适合理解进度条的基本原理。

创建模型

创建一个模型来跟踪任务状态。

# myapp/models.py
from django.db import models
class Task(models.Model):
    STATUS_CHOICES = (
        ('PENDING', '等待中'),
        ('PROCESSING', '处理中'),
        ('SUCCESS', '成功'),
        ('FAILURE', '失败'),
    )
    task_id = models.CharField(max_length=36, unique=True, primary_key=True)
    status = models.CharField(max_length=10, choices=STATUS_CHOICES, default='PENDING')
    progress = models.IntegerField(default=0)  # 0 to 100
    created_at = models.DateTimeField(auto_now_add=True)
    result = models.TextField(blank=True, null=True) # 存储任务结果,如文件路径
    error_message = models.TextField(blank=True, null=True) # 存储错误信息
    def __str__(self):
        return f"Task {self.task_id} - {self.status}"

视图层

我们需要三个视图:

Django 如何实现进度条功能?-图2
(图片来源网络,侵删)
  • 一个视图来启动任务。
  • 一个视图来检查任务进度(轮询时调用)。
  • 一个视图来显示包含进度条的前端页面。
# myapp/views.py
import uuid
import time
from django.http import JsonResponse
from django.shortcuts import render
from .models import Task
# 模拟一个耗时的任务
def long_running_task(task_id):
    task = Task.objects.get(task_id=task_id)
    task.status = 'PROCESSING'
    task.save()
    total_steps = 100
    for i in range(total_steps):
        # 模拟工作
        time.sleep(0.1) 
        # 更新进度
        progress = int((i + 1) / total_steps * 100)
        task.progress = progress
        task.save()
    # 任务完成
    task.status = 'SUCCESS'
    task.result = f"任务 {task_id} 已完成,结果已保存。"
    task.save()
def start_task(request):
    if request.method == 'POST':
        # 生成唯一的任务ID
        task_id = str(uuid.uuid4())
        # 创建任务记录
        Task.objects.create(task_id=task_id)
        # !!! 注意:这里为了演示,我们直接在主线程调用了。
        # 在实际生产环境中,你需要使用 Celery 或其他任务队列。
        # long_running_task(task_id)
        # 返回任务ID,让前端可以开始轮询
        return JsonResponse({'task_id': task_id})
    return JsonResponse({'error': 'Invalid request'}, status=400)
def get_progress(request, task_id):
    try:
        task = Task.objects.get(task_id=task_id)
        data = {
            'status': task.status,
            'progress': task.progress,
            'result': task.result,
            'error_message': task.error_message
        }
        return JsonResponse(data)
    except Task.DoesNotExist:
        return JsonResponse({'error': 'Task not found'}, status=404)
def task_page(request):
    return render(request, 'myapp/task_page.html')

URL 配置

# myapp/urls.py
from django.urls import path
from . import views
urlpatterns = [
    path('start-task/', views.start_task, name='start_task'),
    path('progress/<str:task_id>/', views.get_progress, name='get_progress'),
    path('task/', views.task_page, name='task_page'),
]

前端模板

这里使用 Bootstrap 来美化进度条,你也可以用其他 CSS 框架。

<!-- myapp/templates/myapp/task_page.html -->
<!DOCTYPE html>
<html lang="zh">
<head>
    <meta charset="UTF-8">Django 进度条示例</title>
    <!-- 引入 Bootstrap CSS -->
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet">
</head>
<body>
    <div class="container mt-5">
        <h1>任务进度条</h1>
        <button id="startBtn" class="btn btn-primary">开始任务</button>
        <div class="mt-4">
            <div class="progress">
                <div id="progressBar" class="progress-bar" role="progressbar" style="width: 0%;" aria-valuenow="0" aria-valuemin="0" aria-valuemax="100">0%</div>
            </div>
            <p id="statusText" class="mt-2">等待开始...</p>
            <p id="resultText" class="mt-2" style="display: none;"></p>
        </div>
    </div>
    <script>
        document.getElementById('startBtn').addEventListener('click', function() {
            const startBtn = this;
            const progressBar = document.getElementById('progressBar');
            const statusText = document.getElementById('statusText');
            const resultText = document.getElementById('resultText');
            startBtn.disabled = true;
            progressBar.style.width = '0%';
            progressBar.setAttribute('aria-valuenow', 0);
            progressBar.textContent = '0%';
            statusText.textContent = '任务已启动,正在等待...';
            resultText.style.display = 'none';
            // 1. 发送请求启动任务
            fetch('/start-task/', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                    'X-CSRFToken': getCookie('csrftoken'), // Django 需要 CSRF token
                },
            })
            .then(response => response.json())
            .then(data => {
                if (data.task_id) {
                    const taskId = data.task_id;
                    statusText.textContent = `任务 ID: ${taskId},正在处理中...`;
                    // 2. 开始轮询进度
                    pollProgress(taskId);
                } else {
                    statusText.textContent = '启动任务失败: ' + (data.error || '未知错误');
                    startBtn.disabled = false;
                }
            })
            .catch(error => {
                console.error('Error:', error);
                statusText.textContent = '启动任务时发生错误。';
                startBtn.disabled = false;
            });
        });
        function pollProgress(taskId) {
            const progressBar = document.getElementById('progressBar');
            const statusText = document.getElementById('statusText');
            const resultText = document.getElementById('resultText');
            fetch(`/progress/${taskId}/`)
            .then(response => response.json())
            .then(data => {
                // 更新进度条
                progressBar.style.width = `${data.progress}%`;
                progressBar.setAttribute('aria-valuenow', data.progress);
                progressBar.textContent = `${data.progress}%`;
                // 更新状态文本
                statusText.textContent = `状态: ${getStatusText(data.status)} (${data.progress}%)`;
                if (data.status === 'SUCCESS') {
                    statusText.textContent = '任务完成!';
                    resultText.textContent = data.result;
                    resultText.style.display = 'block';
                    // 可以在这里停止轮询
                    return; 
                } else if (data.status === 'FAILURE') {
                    statusText.textContent = `任务失败: ${data.error_message}`;
                    // 可以在这里停止轮询
                    return;
                }
                // 如果任务还在进行中,继续轮询
                setTimeout(() => pollProgress(taskId), 1000); // 每秒轮询一次
            })
            .catch(error => {
                console.error('Error polling progress:', error);
                statusText.textContent = '获取进度时发生错误。';
            });
        }
        function getStatusText(status) {
            const statusMap = {
                'PENDING': '等待中',
                'PROCESSING': '处理中',
                'SUCCESS': '成功',
                'FAILURE': '失败'
            };
            return statusMap[status] || status;
        }
        function getCookie(name) {
            let cookieValue = null;
            if (document.cookie && document.cookie !== '') {
                const cookies = document.cookie.split(';');
                for (let i = 0; i < cookies.length; i++) {
                    const cookie = cookies[i].trim();
                    if (cookie.substring(0, name.length + 1) === (name + '=')) {
                        cookieValue = decodeURIComponent(cookie.substring(name.length + 1));
                        break;
                    }
                }
            }
            return cookieValue;
        }
    </script>
</body>
</html>

使用 Celery (生产环境推荐)

Celery 是一个强大的分布式任务队列,是处理后台任务的行业标准,它能确保任务在后台可靠地执行,即使服务器重启,任务也不会丢失。

安装 Celery 和 Redis/RabbitMQ

pip install celery redis  # 使用 Redis 作为消息代理
# 或者
pip install celery flower # Flower 是一个 Celery 的监控工具

配置 Celery

在你的 Django 项目根目录下创建一个 celery.py 文件。

# your_project/celery.py
import os
from celery import Celery
# 设置默认的 Django 设置模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')
app = Celery('your_project')
# 使用 Django 的 settings 来配置 Celery
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现任务
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

your_project/__init__.py 中导入这个 app,确保它在 Django 启动时被加载。

Django 如何实现进度条功能?-图3
(图片来源网络,侵删)
# your_project/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)

settings.py 中添加 Celery 配置:

# your_project/settings.py
# ... 其他设置
# Celery 配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'  # 你的 Redis 地址
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 结果存储后端
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = TIME_ZONE

修改视图以使用 Celery

我们将 start_task 视图修改为调用 Celery 任务。

# myapp/views.py
import uuid
from django.http import JsonResponse
from django.shortcuts import render
from .models import Task
from .tasks import run_long_running_task # 导入 Celery 任务
def start_task(request):
    if request.method == 'POST':
        task_id = str(uuid.uuid4())
        Task.objects.create(task_id=task_id)
        # 调用 Celery 任务,并传入任务 ID
        run_long_running_task.delay(task_id)
        return JsonResponse({'task_id': task_id})
    return JsonResponse({'error': 'Invalid request'}, status=400)
# ... 其他视图保持不变 ...

创建 Celery 任务

创建一个 myapp/tasks.py 文件来定义耗时的任务。

# myapp/tasks.py
from celery import shared_task
from .models import Task
import time
@shared_task
def run_long_running_task(task_id):
    try:
        task = Task.objects.get(task_id=task_id)
        task.status = 'PROCESSING'
        task.save()
        total_steps = 100
        for i in range(total_steps):
            time.sleep(0.1)
            progress = int((i + 1) / total_steps * 100)
            task.progress = progress
            task.save()
        task.status = 'SUCCESS'
        task.result = f"任务 {task_id} 已完成,结果已保存。"
        task.save()
    except Task.DoesNotExist:
        # 可以选择记录错误或忽略
        pass
    except Exception as e:
        task.status = 'FAILURE'
        task.error_message = str(e)
        task.save()

启动 Celery Worker

在项目根目录下的终端运行:

celery -A your_project worker --loglevel=info

当你访问页面并点击“开始任务”时,任务会被发送到 Celery Worker 异步执行,而你的 Web 服务器可以立即响应其他请求。


使用 Django Channels (实时更新)

轮询虽然简单,但不够高效,它会产生很多不必要的 HTTP 请求,Django Channels 允许你建立 WebSocket 连接,实现服务器向客户端的实时推送,当进度更新时,服务器可以主动将新进度推送给前端,无需前端轮询。

这种方法更复杂,但体验最好。

安装 Channels

pip install channels channels_redis

配置 Channels

  1. settings.py 中添加 channels
# your_project/settings.py
INSTALLED_APPS = [
    # ...
    'channels',
    'myapp',
]
ASGI_APPLICATION = 'your_project.asgi.application'
  1. 创建 asgi.py 文件(如果不存在):
# your_project/asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
import myapp.routing
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')
application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AuthMiddlewareStack(
        URLRouter(
            myapp.routing.websocket_urlpatterns
        )
    ),
})

创建消费者

消费者是 Channels 中的“视图”,它处理 WebSocket 连接。

# myapp/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from .models import Task
class ProgressConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.task_id = self.scope["url_route"]["kwargs"]["task_id"]
        self.task_group_name = f'progress_{self.task_id}'
        # 将频道添加到组
        await self.channel_layer.group_add(
            self.task_group_name,
            self.channel_name
        )
        await self.accept()
    async def disconnect(self, close_code):
        # 从组中移除频道
        await self.channel_layer.group_discard(
            self.task_group_name,
            self.channel_name
        )
    # 从接收到的消息(这里我们不需要)
    async def receive(self, text_data):
        pass
    # 接收来自组(即任务)的消息
    async def progress_update(self, event):
        progress = event['progress']
        status = event['status']
        result = event.get('result', '')
        error_message = event.get('error_message', '')
        # 将进度数据发送到 WebSocket
        await self.send(text_data=json.dumps({
            'progress': progress,
            'status': status,
            'result': result,
            'error_message': error_message
        }))

定义 WebSocket 路由

# myapp/routing.py
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
    re_path(r'ws/progress/(?P<task_id>\w+)/$', consumers.ProgressConsumer.as_asgi()),
]

修改 Celery 任务以发送消息

我们需要修改 Celery 任务,让它通过 channel_layer 向对应的组发送消息。

# myapp/tasks.py
from celery import shared_task
from .models import Task
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
import time
@shared_task
def run_long_running_task(task_id):
    channel_layer = get_channel_layer()
    try:
        task = Task.objects.get(task_id=task_id)
        task.status = 'PROCESSING'
        task.save()
        # 发送初始状态
        async_to_sync(channel_layer.group_send)(
            f'progress_{task_id}',
            {
                'type': 'progress_update', # 对应 consumer 中的 progress_update 方法
                'progress': 0,
                'status': 'PROCESSING'
            }
        )
        total_steps = 100
        for i in range(total_steps):
            time.sleep(0.1)
            progress = int((i + 1) / total_steps * 100)
            task.progress = progress
            task.save()
            # 发送进度更新
            async_to_sync(channel_layer.group_send)(
                f'progress_{task_id}',
                {
                    'type': 'progress_update',
                    'progress': progress,
                    'status': 'PROCESSING'
                }
            )
        task.status = 'SUCCESS'
        task.result = f"任务 {task_id} 已完成。"
        task.save()
        # 发送最终完成状态
        async_to_sync(channel_layer.group_send)(
            f'progress_{task_id}',
            {
                'type': 'progress_update',
                'progress': 100,
                'status': 'SUCCESS',
                'result': task.result
            }
        )
    except Task.DoesNotExist:
        pass
    except Exception as e:
        task.status = 'FAILURE'
        task.error_message = str(e)
        task.save()
        async_to_sync(channel_layer.group_send)(
            f'progress_{task_id}',
            {
                'type': 'progress_update',
                'status': 'FAILURE',
                'error_message': str(e)
            }
        )

修改前端模板

前端需要使用 WebSocket API 来连接和接收消息。

<!-- myapp/templates/myapp/task_page.html -->
<!-- ... head 部分和 HTML 结构与之前相同 ... -->
<script>
    document.getElementById('startBtn').addEventListener('click', function() {
        const startBtn = this;
        const progressBar = document.getElementById('progressBar');
        const statusText = document.getElementById('statusText');
        const resultText = document.getElementById('resultText');
        startBtn.disabled = true;
        progressBar.style.width = '0%';
        progressBar.setAttribute('aria-valuenow', 0);
        progressBar.textContent = '0%';
        statusText.textContent = '等待开始...';
        resultText.style.display = 'none';
        fetch('/start-task/', {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json',
                'X-CSRFToken': getCookie('csrftoken'),
            },
        })
        .then(response => response.json())
        .then(data => {
            if (data.task_id) {
                const taskId = data.task_id;
                statusText.textContent = `任务 ID: ${taskId},正在建立连接...`;
                // 建立 WebSocket 连接
                connectWebSocket(taskId);
            } else {
                statusText.textContent = '启动任务失败: ' + (data.error || '未知错误');
                startBtn.disabled = false;
            }
        })
        .catch(error => {
            console.error('Error:', error);
            statusText.textContent = '启动任务时发生错误。';
            startBtn.disabled = false;
        });
    });
    function connectWebSocket(taskId) {
        const progressBar = document.getElementById('progressBar');
        const statusText = document.getElementById('statusText');
        const resultText = document.getElementById('resultText');
        // 注意:ws:// 用于 HTTP,wss:// 用于 HTTPS
        const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
        const wsUrl = `${protocol}//${window.location.host}/ws/progress/${taskId}/`;
        const socket = new WebSocket(wsUrl);
        socket.onopen = function(e) {
            statusText.textContent = `WebSocket 已连接,任务 ID: ${taskId},`;
        };
        socket.onmessage = function(event) {
            const data = JSON.parse(event.data);
            // 更新进度条
            progressBar.style.width = `${data.progress}%`;
            progressBar.setAttribute('aria-valuenow', data.progress);
            progressBar.textContent = `${data.progress}%`;
            // 更新状态文本
            statusText.textContent = `状态: ${getStatusText(data.status)} (${data.progress}%)`;
            if (data.status === 'SUCCESS') {
                statusText.textContent = '任务完成!';
                if (data.result) {
                    resultText.textContent = data.result;
                    resultText.style.display = 'block';
                }
                socket.close();
            } else if (data.status === 'FAILURE') {
                statusText.textContent = `任务失败: ${data.error_message}`;
                socket.close();
            }
        };
        socket.onclose = function(event) {
            if (event.wasClean) {
                console.log(`[close] Connection closed cleanly, code=${event.code} reason=${event.reason}`);
            } else {
                console.log('[close] Connection died');
            }
            startBtn.disabled = false; // 任务结束或断开后,重新启用按钮
        };
        socket.onerror = function(error) {
            console.error(`[error] ${error.message}`);
            statusText.textContent = 'WebSocket 连接发生错误。';
            startBtn.disabled = false;
        };
    }
    // getCookie 和 getStatusText 函数与之前相同
    function getStatusText(status) { /* ... */ }
    function getCookie(name) { /* ... */ }
</script>
</body>
</html>

总结与选择

方法 优点 缺点 适用场景
数据库 + 轮询 简单,无需额外依赖,易于理解。 效率低,产生大量 HTTP 请求,有延迟。 学习、演示、或对实时性要求不高的简单任务。
Celery + 轮询 生产环境标准,任务可靠,可扩展。 仍然依赖轮询,有延迟;需要额外部署 Worker。 几乎所有需要后台处理的 Web 应用。
Django Channels 实时性最好,效率高,用户体验好。 架构复杂,需要 ASGI 服务器(如 Daphne),学习曲线陡峭。 对实时性要求极高的应用,如聊天、在线协作、股票行情等。

对于大多数 Django 项目,强烈推荐从方法二(Celery + 轮询)开始,它提供了一个健壮的后台任务处理机制,并且前端实现相对简单,只有当你对延迟非常敏感,并且愿意承担更复杂的架构时,才考虑使用方法三(Django Channels)。

分享:
扫描分享到社交APP
上一篇
下一篇