首页 / 技术分享 / Python 实用工具集 /
Python Celery 异步任务

Python Celery 异步任务

码不停提

2026-02-02
20 次浏览
0 条评论

Celery 是 Python 生态下最流行的异步任务队列与调度系统之一,适合海量后台任务处理、系统扩展与解耦,是中大型分布式系统“必备组件”。

Python 实用工具集
Python
Celery
异步任务
分享:

1. 什么是 Celery?

Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息。它可以帮助你在后台异步地执行任务、定时调度任务和支持任务的结果跟踪。Celery 支持多种消息中间件 (Broker)、任务结果存储(Backend),适用于 Django、Flask 等多种 Python Web 框架,也能够独立运行。


2. 核心组件与原理

2.1 Task(任务)

Celery 可将一个普通 Python 函数定义为任务 (@app.task 修饰符),并且让 Worker 进程进行异步或者同步执行。每个任务都有唯一的 task_id

2.2 Broker(消息中间件)

Broker 提供任务队列,实现分布式通信。常见中间件有:

  • RabbitMQ:功能最全,性能高,但依赖多。
  • Redis:部署简单,易于维护,同时支持作为结果存储。
  • 其他:Amazon SQS, MongoDB, etc.

2.3 Worker(工作进程)

Worker 负责监听任务队列,获取任务后异步处理。可开启多个 worker 实现并行和分布式构建。

2.4 Beat(调度器)

用于定时、周期性调度任务(如定时邮件、周期性清理)。Beat 会根据调度方案,将任务发送到 Broker。

2.5 Result Backend(结果存储)

用于存储任务执行的结果(可选),支持 Redis、数据库(如 MySQL/PostgreSQL)、RabbitMQ 等。


3. Celery 的架构图

 ┌─────────────┐      ┌─────────────┐      ┌─────────────┐
 │ Producer    │ ---> │  Broker     │ ---> │  Worker     │
 │ (Client/API)│      │ (Queue)     │      │(Consumer)   │
 └─────────────┘      └─────────────┘      └─────────────┘
                                 |           |
                          ┌─────────────┐
                          │  Result     │
                          │  Backend    │
                          └─────────────┘

4. 安装与环境准备

安装 Celery

pip install celery
# 如果使用 Redis 作为 Broker,可以一起安装
pip install redis

启动 Redis(假设本地开发环境)

# Mac/Linux
brew install redis
redis-server

5. 快速实践示例

5.1 创建 Celery 实例

# 文件名:celery_app.py
from celery import Celery

app = Celery(
    'myproject',
    broker='redis://127.0.0.1:6379/0',          # 使用 Redis 作为消息中间件
    backend='redis://127.0.0.1:6379/1'          # 使用 Redis 存储结果
)

5.2 定义任务

# 文件名:tasks.py
from celery_app import app

@app.task(bind=True)  # bind=True 可获取任务状态等上下文
def add(self, x, y):
    return x + y

@app.task
def long_task():
    import time
    time.sleep(10)
    return 'Finished'

5.3 启动 Worker

celery -A celery_app worker --loglevel=info

5.4 调用异步任务

from tasks import add

# 投递任务到队列,立即返回 AsyncResult 实例
result = add.delay(10, 20)  

# 获取任务执行结果(阻塞,最多等待5秒)
print(result.get(timeout=5))  # 输出 30

6. 配置与高级用法

6.1 配置文件

可以集中配置 Celery 参数:

# celeryconfig.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Shanghai'
enable_utc = True

在主文件中:

app.config_from_object('celeryconfig')

6.2 定时任务/周期任务

from celery.schedules import crontab

app.conf.beat_schedule = {
    'add-every-5-seconds': {
        'task': 'tasks.add',
        'schedule': 5.0,
        'args': (4, 4)
    },
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}
app.conf.timezone = 'Asia/Shanghai'

启动 Beat 进程:

celery -A celery_app beat --loglevel=info

或者(同时启动 worker 和 beat):

celery -A celery_app worker -B --loglevel=info

6.3 任务结果与状态

result.status  # PENDING, STARTED, FAILURE, SUCCESS, RETRY
result.ready() # 是否已完成
result.get(timeout=xx) # 阻塞获取
result.id      # 任务唯一ID

6.4 失败重试、回调

@app.task(bind=True, max_retries=3)
def my_task(self, x, y):
    try:
        # 做一些事
        pass
    except Exception as exc:
        self.retry(exc=exc, countdown=5)  # 5秒后重试

7. 高级进阶

7.1 任务链

一组任务依次执行,前一个的返回值会作为后一个的参数:

from tasks import add

chain = (add.s(2,2) | add.s(4) | add.s(8))
result = chain()
print(result.get())  # (((2+2)+4)+8)

7.2 任务组/批量并行(group)

from celery import group
res = group(add.s(i, i) for i in range(10))()
print(res.get())  # [0, 2, 4, ..., 18]

7.3 chord(回调集)

所有组任务完成后自动调用回调:

from celery import chord

header = [add.s(2, 2), add.s(3, 3)]
callback = add.s(4)
result = chord(header)(callback)
print(result.get())  # callback(sum(header结果), 4)

8. 常见用例

  • 网站用户注册后的异步邮件/短信发送
  • 图片或视频的异步处理与转码
  • 定时数据同步/爬虫任务的调度
  • 大型电商平台订单结算、库存同步等后端批量任务
  • 财务报表/统计系统的定时汇总

9. 常用监控与运维工具

  • Flower:Celery 实时监控/管理利器

    pip install flower
    celery -A celery_app flower

    打开浏览器访问 http://localhost:5555

  • Prometheus/Grafana:数据指标监控及可视化


10. 参考资源

评论区 (0)

你需要先 登录 后才能发表评论。
还没有人评论,赶快成为第一个吧。