由于 Django 是一个同步框架,它在一个请求-响应周期中处理所有事情,如果一个任务需要很长时间,它会阻塞整个 Web 服务器,导致其他用户请求无法处理,实现进度条的核心思想是:将耗时的任务从 Web 请求中分离出来,异步执行,并通过某种机制让前端可以轮询(poll)任务的进度。

下面我将为你介绍几种主流的实现方法,从简单到复杂,并给出详细的代码示例。
核心思想
- 用户发起请求:用户点击一个按钮,触发一个任务。
- 创建任务记录:后端立即创建一个任务记录(存入数据库),并为其分配一个唯一的 ID,同时设置初始状态(如
PENDING)和进度(0%)。 - 异步执行任务:后端将这个耗时的任务交给一个后台 worker(如 Celery)去处理,同时立即向用户返回一个响应,这个响应中包含了任务 ID。
- 前端轮询:前端收到任务 ID 后,开始通过 JavaScript 定期(例如每秒)向后端发送一个请求,询问该任务的当前进度。
- 后端更新进度:后台 worker 在执行任务的过程中,会定期更新数据库中对应任务的进度信息。
- 前端显示进度:前端每次轮询都获取到最新的进度,并用进度条组件展示出来。
- 任务完成:当任务完成后,后台 worker 将任务状态标记为
SUCCESS或FAILURE,前端轮询到最终状态后,停止轮询并显示最终结果。
使用数据库 + 简单轮询 (Django 内置)
这是最基础的方法,不需要额外的依赖,适合理解进度条的基本原理。
创建模型
创建一个模型来跟踪任务状态。
# myapp/models.py
from django.db import models
class Task(models.Model):
STATUS_CHOICES = (
('PENDING', '等待中'),
('PROCESSING', '处理中'),
('SUCCESS', '成功'),
('FAILURE', '失败'),
)
task_id = models.CharField(max_length=36, unique=True, primary_key=True)
status = models.CharField(max_length=10, choices=STATUS_CHOICES, default='PENDING')
progress = models.IntegerField(default=0) # 0 to 100
created_at = models.DateTimeField(auto_now_add=True)
result = models.TextField(blank=True, null=True) # 存储任务结果,如文件路径
error_message = models.TextField(blank=True, null=True) # 存储错误信息
def __str__(self):
return f"Task {self.task_id} - {self.status}"
视图层
我们需要三个视图:

- 一个视图来启动任务。
- 一个视图来检查任务进度(轮询时调用)。
- 一个视图来显示包含进度条的前端页面。
# myapp/views.py
import uuid
import time
from django.http import JsonResponse
from django.shortcuts import render
from .models import Task
# 模拟一个耗时的任务
def long_running_task(task_id):
task = Task.objects.get(task_id=task_id)
task.status = 'PROCESSING'
task.save()
total_steps = 100
for i in range(total_steps):
# 模拟工作
time.sleep(0.1)
# 更新进度
progress = int((i + 1) / total_steps * 100)
task.progress = progress
task.save()
# 任务完成
task.status = 'SUCCESS'
task.result = f"任务 {task_id} 已完成,结果已保存。"
task.save()
def start_task(request):
if request.method == 'POST':
# 生成唯一的任务ID
task_id = str(uuid.uuid4())
# 创建任务记录
Task.objects.create(task_id=task_id)
# !!! 注意:这里为了演示,我们直接在主线程调用了。
# 在实际生产环境中,你需要使用 Celery 或其他任务队列。
# long_running_task(task_id)
# 返回任务ID,让前端可以开始轮询
return JsonResponse({'task_id': task_id})
return JsonResponse({'error': 'Invalid request'}, status=400)
def get_progress(request, task_id):
try:
task = Task.objects.get(task_id=task_id)
data = {
'status': task.status,
'progress': task.progress,
'result': task.result,
'error_message': task.error_message
}
return JsonResponse(data)
except Task.DoesNotExist:
return JsonResponse({'error': 'Task not found'}, status=404)
def task_page(request):
return render(request, 'myapp/task_page.html')
URL 配置
# myapp/urls.py
from django.urls import path
from . import views
urlpatterns = [
path('start-task/', views.start_task, name='start_task'),
path('progress/<str:task_id>/', views.get_progress, name='get_progress'),
path('task/', views.task_page, name='task_page'),
]
前端模板
这里使用 Bootstrap 来美化进度条,你也可以用其他 CSS 框架。
<!-- myapp/templates/myapp/task_page.html -->
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">Django 进度条示例</title>
<!-- 引入 Bootstrap CSS -->
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet">
</head>
<body>
<div class="container mt-5">
<h1>任务进度条</h1>
<button id="startBtn" class="btn btn-primary">开始任务</button>
<div class="mt-4">
<div class="progress">
<div id="progressBar" class="progress-bar" role="progressbar" style="width: 0%;" aria-valuenow="0" aria-valuemin="0" aria-valuemax="100">0%</div>
</div>
<p id="statusText" class="mt-2">等待开始...</p>
<p id="resultText" class="mt-2" style="display: none;"></p>
</div>
</div>
<script>
document.getElementById('startBtn').addEventListener('click', function() {
const startBtn = this;
const progressBar = document.getElementById('progressBar');
const statusText = document.getElementById('statusText');
const resultText = document.getElementById('resultText');
startBtn.disabled = true;
progressBar.style.width = '0%';
progressBar.setAttribute('aria-valuenow', 0);
progressBar.textContent = '0%';
statusText.textContent = '任务已启动,正在等待...';
resultText.style.display = 'none';
// 1. 发送请求启动任务
fetch('/start-task/', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-CSRFToken': getCookie('csrftoken'), // Django 需要 CSRF token
},
})
.then(response => response.json())
.then(data => {
if (data.task_id) {
const taskId = data.task_id;
statusText.textContent = `任务 ID: ${taskId},正在处理中...`;
// 2. 开始轮询进度
pollProgress(taskId);
} else {
statusText.textContent = '启动任务失败: ' + (data.error || '未知错误');
startBtn.disabled = false;
}
})
.catch(error => {
console.error('Error:', error);
statusText.textContent = '启动任务时发生错误。';
startBtn.disabled = false;
});
});
function pollProgress(taskId) {
const progressBar = document.getElementById('progressBar');
const statusText = document.getElementById('statusText');
const resultText = document.getElementById('resultText');
fetch(`/progress/${taskId}/`)
.then(response => response.json())
.then(data => {
// 更新进度条
progressBar.style.width = `${data.progress}%`;
progressBar.setAttribute('aria-valuenow', data.progress);
progressBar.textContent = `${data.progress}%`;
// 更新状态文本
statusText.textContent = `状态: ${getStatusText(data.status)} (${data.progress}%)`;
if (data.status === 'SUCCESS') {
statusText.textContent = '任务完成!';
resultText.textContent = data.result;
resultText.style.display = 'block';
// 可以在这里停止轮询
return;
} else if (data.status === 'FAILURE') {
statusText.textContent = `任务失败: ${data.error_message}`;
// 可以在这里停止轮询
return;
}
// 如果任务还在进行中,继续轮询
setTimeout(() => pollProgress(taskId), 1000); // 每秒轮询一次
})
.catch(error => {
console.error('Error polling progress:', error);
statusText.textContent = '获取进度时发生错误。';
});
}
function getStatusText(status) {
const statusMap = {
'PENDING': '等待中',
'PROCESSING': '处理中',
'SUCCESS': '成功',
'FAILURE': '失败'
};
return statusMap[status] || status;
}
function getCookie(name) {
let cookieValue = null;
if (document.cookie && document.cookie !== '') {
const cookies = document.cookie.split(';');
for (let i = 0; i < cookies.length; i++) {
const cookie = cookies[i].trim();
if (cookie.substring(0, name.length + 1) === (name + '=')) {
cookieValue = decodeURIComponent(cookie.substring(name.length + 1));
break;
}
}
}
return cookieValue;
}
</script>
</body>
</html>
使用 Celery (生产环境推荐)
Celery 是一个强大的分布式任务队列,是处理后台任务的行业标准,它能确保任务在后台可靠地执行,即使服务器重启,任务也不会丢失。
安装 Celery 和 Redis/RabbitMQ
pip install celery redis # 使用 Redis 作为消息代理 # 或者 pip install celery flower # Flower 是一个 Celery 的监控工具
配置 Celery
在你的 Django 项目根目录下创建一个 celery.py 文件。
# your_project/celery.py
import os
from celery import Celery
# 设置默认的 Django 设置模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')
app = Celery('your_project')
# 使用 Django 的 settings 来配置 Celery
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现任务
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
在 your_project/__init__.py 中导入这个 app,确保它在 Django 启动时被加载。

# your_project/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
在 settings.py 中添加 Celery 配置:
# your_project/settings.py # ... 其他设置 # Celery 配置 CELERY_BROKER_URL = 'redis://localhost:6379/0' # 你的 Redis 地址 CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 结果存储后端 CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = TIME_ZONE
修改视图以使用 Celery
我们将 start_task 视图修改为调用 Celery 任务。
# myapp/views.py
import uuid
from django.http import JsonResponse
from django.shortcuts import render
from .models import Task
from .tasks import run_long_running_task # 导入 Celery 任务
def start_task(request):
if request.method == 'POST':
task_id = str(uuid.uuid4())
Task.objects.create(task_id=task_id)
# 调用 Celery 任务,并传入任务 ID
run_long_running_task.delay(task_id)
return JsonResponse({'task_id': task_id})
return JsonResponse({'error': 'Invalid request'}, status=400)
# ... 其他视图保持不变 ...
创建 Celery 任务
创建一个 myapp/tasks.py 文件来定义耗时的任务。
# myapp/tasks.py
from celery import shared_task
from .models import Task
import time
@shared_task
def run_long_running_task(task_id):
try:
task = Task.objects.get(task_id=task_id)
task.status = 'PROCESSING'
task.save()
total_steps = 100
for i in range(total_steps):
time.sleep(0.1)
progress = int((i + 1) / total_steps * 100)
task.progress = progress
task.save()
task.status = 'SUCCESS'
task.result = f"任务 {task_id} 已完成,结果已保存。"
task.save()
except Task.DoesNotExist:
# 可以选择记录错误或忽略
pass
except Exception as e:
task.status = 'FAILURE'
task.error_message = str(e)
task.save()
启动 Celery Worker
在项目根目录下的终端运行:
celery -A your_project worker --loglevel=info
当你访问页面并点击“开始任务”时,任务会被发送到 Celery Worker 异步执行,而你的 Web 服务器可以立即响应其他请求。
使用 Django Channels (实时更新)
轮询虽然简单,但不够高效,它会产生很多不必要的 HTTP 请求,Django Channels 允许你建立 WebSocket 连接,实现服务器向客户端的实时推送,当进度更新时,服务器可以主动将新进度推送给前端,无需前端轮询。
这种方法更复杂,但体验最好。
安装 Channels
pip install channels channels_redis
配置 Channels
- 在
settings.py中添加channels:
# your_project/settings.py
INSTALLED_APPS = [
# ...
'channels',
'myapp',
]
ASGI_APPLICATION = 'your_project.asgi.application'
- 创建
asgi.py文件(如果不存在):
# your_project/asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
import myapp.routing
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": AuthMiddlewareStack(
URLRouter(
myapp.routing.websocket_urlpatterns
)
),
})
创建消费者
消费者是 Channels 中的“视图”,它处理 WebSocket 连接。
# myapp/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from .models import Task
class ProgressConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.task_id = self.scope["url_route"]["kwargs"]["task_id"]
self.task_group_name = f'progress_{self.task_id}'
# 将频道添加到组
await self.channel_layer.group_add(
self.task_group_name,
self.channel_name
)
await self.accept()
async def disconnect(self, close_code):
# 从组中移除频道
await self.channel_layer.group_discard(
self.task_group_name,
self.channel_name
)
# 从接收到的消息(这里我们不需要)
async def receive(self, text_data):
pass
# 接收来自组(即任务)的消息
async def progress_update(self, event):
progress = event['progress']
status = event['status']
result = event.get('result', '')
error_message = event.get('error_message', '')
# 将进度数据发送到 WebSocket
await self.send(text_data=json.dumps({
'progress': progress,
'status': status,
'result': result,
'error_message': error_message
}))
定义 WebSocket 路由
# myapp/routing.py
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/progress/(?P<task_id>\w+)/$', consumers.ProgressConsumer.as_asgi()),
]
修改 Celery 任务以发送消息
我们需要修改 Celery 任务,让它通过 channel_layer 向对应的组发送消息。
# myapp/tasks.py
from celery import shared_task
from .models import Task
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
import time
@shared_task
def run_long_running_task(task_id):
channel_layer = get_channel_layer()
try:
task = Task.objects.get(task_id=task_id)
task.status = 'PROCESSING'
task.save()
# 发送初始状态
async_to_sync(channel_layer.group_send)(
f'progress_{task_id}',
{
'type': 'progress_update', # 对应 consumer 中的 progress_update 方法
'progress': 0,
'status': 'PROCESSING'
}
)
total_steps = 100
for i in range(total_steps):
time.sleep(0.1)
progress = int((i + 1) / total_steps * 100)
task.progress = progress
task.save()
# 发送进度更新
async_to_sync(channel_layer.group_send)(
f'progress_{task_id}',
{
'type': 'progress_update',
'progress': progress,
'status': 'PROCESSING'
}
)
task.status = 'SUCCESS'
task.result = f"任务 {task_id} 已完成。"
task.save()
# 发送最终完成状态
async_to_sync(channel_layer.group_send)(
f'progress_{task_id}',
{
'type': 'progress_update',
'progress': 100,
'status': 'SUCCESS',
'result': task.result
}
)
except Task.DoesNotExist:
pass
except Exception as e:
task.status = 'FAILURE'
task.error_message = str(e)
task.save()
async_to_sync(channel_layer.group_send)(
f'progress_{task_id}',
{
'type': 'progress_update',
'status': 'FAILURE',
'error_message': str(e)
}
)
修改前端模板
前端需要使用 WebSocket API 来连接和接收消息。
<!-- myapp/templates/myapp/task_page.html -->
<!-- ... head 部分和 HTML 结构与之前相同 ... -->
<script>
document.getElementById('startBtn').addEventListener('click', function() {
const startBtn = this;
const progressBar = document.getElementById('progressBar');
const statusText = document.getElementById('statusText');
const resultText = document.getElementById('resultText');
startBtn.disabled = true;
progressBar.style.width = '0%';
progressBar.setAttribute('aria-valuenow', 0);
progressBar.textContent = '0%';
statusText.textContent = '等待开始...';
resultText.style.display = 'none';
fetch('/start-task/', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-CSRFToken': getCookie('csrftoken'),
},
})
.then(response => response.json())
.then(data => {
if (data.task_id) {
const taskId = data.task_id;
statusText.textContent = `任务 ID: ${taskId},正在建立连接...`;
// 建立 WebSocket 连接
connectWebSocket(taskId);
} else {
statusText.textContent = '启动任务失败: ' + (data.error || '未知错误');
startBtn.disabled = false;
}
})
.catch(error => {
console.error('Error:', error);
statusText.textContent = '启动任务时发生错误。';
startBtn.disabled = false;
});
});
function connectWebSocket(taskId) {
const progressBar = document.getElementById('progressBar');
const statusText = document.getElementById('statusText');
const resultText = document.getElementById('resultText');
// 注意:ws:// 用于 HTTP,wss:// 用于 HTTPS
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${protocol}//${window.location.host}/ws/progress/${taskId}/`;
const socket = new WebSocket(wsUrl);
socket.onopen = function(e) {
statusText.textContent = `WebSocket 已连接,任务 ID: ${taskId},`;
};
socket.onmessage = function(event) {
const data = JSON.parse(event.data);
// 更新进度条
progressBar.style.width = `${data.progress}%`;
progressBar.setAttribute('aria-valuenow', data.progress);
progressBar.textContent = `${data.progress}%`;
// 更新状态文本
statusText.textContent = `状态: ${getStatusText(data.status)} (${data.progress}%)`;
if (data.status === 'SUCCESS') {
statusText.textContent = '任务完成!';
if (data.result) {
resultText.textContent = data.result;
resultText.style.display = 'block';
}
socket.close();
} else if (data.status === 'FAILURE') {
statusText.textContent = `任务失败: ${data.error_message}`;
socket.close();
}
};
socket.onclose = function(event) {
if (event.wasClean) {
console.log(`[close] Connection closed cleanly, code=${event.code} reason=${event.reason}`);
} else {
console.log('[close] Connection died');
}
startBtn.disabled = false; // 任务结束或断开后,重新启用按钮
};
socket.onerror = function(error) {
console.error(`[error] ${error.message}`);
statusText.textContent = 'WebSocket 连接发生错误。';
startBtn.disabled = false;
};
}
// getCookie 和 getStatusText 函数与之前相同
function getStatusText(status) { /* ... */ }
function getCookie(name) { /* ... */ }
</script>
</body>
</html>
总结与选择
| 方法 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 数据库 + 轮询 | 简单,无需额外依赖,易于理解。 | 效率低,产生大量 HTTP 请求,有延迟。 | 学习、演示、或对实时性要求不高的简单任务。 |
| Celery + 轮询 | 生产环境标准,任务可靠,可扩展。 | 仍然依赖轮询,有延迟;需要额外部署 Worker。 | 几乎所有需要后台处理的 Web 应用。 |
| Django Channels | 实时性最好,效率高,用户体验好。 | 架构复杂,需要 ASGI 服务器(如 Daphne),学习曲线陡峭。 | 对实时性要求极高的应用,如聊天、在线协作、股票行情等。 |
对于大多数 Django 项目,强烈推荐从方法二(Celery + 轮询)开始,它提供了一个健壮的后台任务处理机制,并且前端实现相对简单,只有当你对延迟非常敏感,并且愿意承担更复杂的架构时,才考虑使用方法三(Django Channels)。
