Flask异步任务 #
一、Celery概述 #
1.1 安装 #
bash
pip install celery redis
1.2 配置Celery #
python
from celery import Celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
二、定义任务 #
2.1 异步任务 #
python
@celery.task
def send_email(to, subject, body):
# 发送邮件逻辑
pass
# 调用任务
send_email.delay('user@example.com', '主题', '内容')
2.2 定时任务 #
python
@celery.task
def cleanup():
# 清理任务
pass
celery.conf.beat_schedule = {
'cleanup-every-hour': {
'task': 'cleanup',
'schedule': 3600.0
}
}
三、下一步 #
接下来让我们学习 信号机制,了解Flask信号系统!
最后更新:2026-03-28