1. 什么是 Celery?
Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息。它可以帮助你在后台异步地执行任务、定时调度任务和支持任务的结果跟踪。Celery 支持多种消息中间件 (Broker)、任务结果存储(Backend),适用于 Django、Flask 等多种 Python Web 框架,也能够独立运行。
2. 核心组件与原理
2.1 Task(任务)
Celery 可将一个普通 Python 函数定义为任务 (@app.task 修饰符),并且让 Worker 进程进行异步或者同步执行。每个任务都有唯一的 task_id。
2.2 Broker(消息中间件)
Broker 提供任务队列,实现分布式通信。常见中间件有:
- RabbitMQ:功能最全,性能高,但依赖多。
- Redis:部署简单,易于维护,同时支持作为结果存储。
- 其他:Amazon SQS, MongoDB, etc.
2.3 Worker(工作进程)
Worker 负责监听任务队列,获取任务后异步处理。可开启多个 worker 实现并行和分布式构建。
2.4 Beat(调度器)
用于定时、周期性调度任务(如定时邮件、周期性清理)。Beat 会根据调度方案,将任务发送到 Broker。
2.5 Result Backend(结果存储)
用于存储任务执行的结果(可选),支持 Redis、数据库(如 MySQL/PostgreSQL)、RabbitMQ 等。
3. Celery 的架构图
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Producer │ ---> │ Broker │ ---> │ Worker │
│ (Client/API)│ │ (Queue) │ │(Consumer) │
└─────────────┘ └─────────────┘ └─────────────┘
| |
┌─────────────┐
│ Result │
│ Backend │
└─────────────┘
4. 安装与环境准备
安装 Celery
pip install celery
# 如果使用 Redis 作为 Broker,可以一起安装
pip install redis
启动 Redis(假设本地开发环境)
# Mac/Linux
brew install redis
redis-server
5. 快速实践示例
5.1 创建 Celery 实例
# 文件名:celery_app.py
from celery import Celery
app = Celery(
'myproject',
broker='redis://127.0.0.1:6379/0', # 使用 Redis 作为消息中间件
backend='redis://127.0.0.1:6379/1' # 使用 Redis 存储结果
)
5.2 定义任务
# 文件名:tasks.py
from celery_app import app
@app.task(bind=True) # bind=True 可获取任务状态等上下文
def add(self, x, y):
return x + y
@app.task
def long_task():
import time
time.sleep(10)
return 'Finished'
5.3 启动 Worker
celery -A celery_app worker --loglevel=info
5.4 调用异步任务
from tasks import add
# 投递任务到队列,立即返回 AsyncResult 实例
result = add.delay(10, 20)
# 获取任务执行结果(阻塞,最多等待5秒)
print(result.get(timeout=5)) # 输出 30
6. 配置与高级用法
6.1 配置文件
可以集中配置 Celery 参数:
# celeryconfig.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Shanghai'
enable_utc = True
在主文件中:
app.config_from_object('celeryconfig')
6.2 定时任务/周期任务
from celery.schedules import crontab
app.conf.beat_schedule = {
'add-every-5-seconds': {
'task': 'tasks.add',
'schedule': 5.0,
'args': (4, 4)
},
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}
app.conf.timezone = 'Asia/Shanghai'
启动 Beat 进程:
celery -A celery_app beat --loglevel=info
或者(同时启动 worker 和 beat):
celery -A celery_app worker -B --loglevel=info
6.3 任务结果与状态
result.status # PENDING, STARTED, FAILURE, SUCCESS, RETRY
result.ready() # 是否已完成
result.get(timeout=xx) # 阻塞获取
result.id # 任务唯一ID
6.4 失败重试、回调
@app.task(bind=True, max_retries=3)
def my_task(self, x, y):
try:
# 做一些事
pass
except Exception as exc:
self.retry(exc=exc, countdown=5) # 5秒后重试
7. 高级进阶
7.1 任务链
一组任务依次执行,前一个的返回值会作为后一个的参数:
from tasks import add
chain = (add.s(2,2) | add.s(4) | add.s(8))
result = chain()
print(result.get()) # (((2+2)+4)+8)
7.2 任务组/批量并行(group)
from celery import group
res = group(add.s(i, i) for i in range(10))()
print(res.get()) # [0, 2, 4, ..., 18]
7.3 chord(回调集)
所有组任务完成后自动调用回调:
from celery import chord
header = [add.s(2, 2), add.s(3, 3)]
callback = add.s(4)
result = chord(header)(callback)
print(result.get()) # callback(sum(header结果), 4)
8. 常见用例
- 网站用户注册后的异步邮件/短信发送
- 图片或视频的异步处理与转码
- 定时数据同步/爬虫任务的调度
- 大型电商平台订单结算、库存同步等后端批量任务
- 财务报表/统计系统的定时汇总
9. 常用监控与运维工具
-
Flower:Celery 实时监控/管理利器
pip install flower
celery -A celery_app flower
打开浏览器访问 http://localhost:5555
-
Prometheus/Grafana:数据指标监控及可视化
10. 参考资源