Flask异步任务 #

一、Celery概述 #

1.1 安装 #

bash
pip install celery redis

1.2 配置Celery #

python
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

二、定义任务 #

2.1 异步任务 #

python
@celery.task
def send_email(to, subject, body):
    # 发送邮件逻辑
    pass

# 调用任务
send_email.delay('user@example.com', '主题', '内容')

2.2 定时任务 #

python
@celery.task
def cleanup():
    # 清理任务
    pass

celery.conf.beat_schedule = {
    'cleanup-every-hour': {
        'task': 'cleanup',
        'schedule': 3600.0
    }
}

三、下一步 #

接下来让我们学习 信号机制,了解Flask信号系统!

最后更新:2026-03-28