如何使用Celery(芹菜)异步神器执行后台任务
关于异步的知识网上很多,这里就直接上代码,目前结合Flask这个Python框架实现后台任务的执行操作;
1.需要了解的知识点:
- 了解生产消费模型或者发布订阅模式来实现消息队列
- 了解异步、同步之间的差别
2.实现过程
(1)目录结构
├── LICENSE
├── README.md
├── app // Flask APP应用
│ ├── __init__.py
│ ├── auth
│ ├── email.py
│ ├── main
│ ├── models.py
│ ├── salt
│ ├── static
│ └── templates
├── config.py // 通用配置
├── config.pyc
├── manager.py
├── migrations
│ ├── README
│ ├── alembic.ini
│ ├── env.py
│ └── versions
└── requirements.txt
(2)指定broker/backend(此处使用redis)
vim app/__init__.py
// 部分代码
from celery import Celery
app = Flask(__name__)
broker = 'redis://127.0.0.1:6379'
backend = 'redis://127.0.0.1:6379/0'
celery = Celery(app.name, broker=broker, backend=backend)
// 部分代码
以上引入了Celery,指定了Celery的生产消费端都为我们的redis
(3)这里我将任务写到了视图函数中
vim app/main/views.py
(1 创建一个任务函数, 并且将这个任务绑定上了celery的标记
部分代码
@celery.task(bind=True)
def update_cbt_resource(self):
‘’’
@summary: 创建一个需要后台去执行的任务,我这里只是举个栗子
‘’’
result=commands.getoutput(“sleep 20 && echo ‘ok’”)
return result(2 创建一个执行任务请求函数
部分代码
@main.route(‘/execute_task/
‘, methods=[‘GET’, ‘POST’])
def execute_task(update_env):
“””
@summary: 请求函数入口
“””
# 这里用最笨的办法执行了在我们web中点击执行任务之前去检查celery这个服务是否存在,如果不存在会提示用户
result = commands.getoutput(“ps -ef | grep celery | grep -v grep”)
if not result:
return jsonify({“result”:False,”message”:u’未发现Celery进程,请检查该服务是否正常启动’})
# 将前面写的任务函数直接调用apply_async属性
task = update_cbt_resource.apply_async()
# 这里就可以获取到这个任务一开始执行就返回的一些属性
# 比如任务ID, 任务状态
data = {
“task_id”: task.id,
“task_status”: task.status
}
# 将以上的信息作为这请求函数的返回
result = {“result”:True,”data”:data,”message”:u’执行开始’}
return jsonify(result)(3 创建一个任务状态查询的函数
一般任务触发之后我们想要知道这个任务的执行状态,是处于哪个阶段的,各个状态已经在官网详细说明了;有兴趣的直接看官网
1 | ### 部分的代码 |
(4 尝试在前端页面点击执行任务

Oops! 这就是上面进行celery进程判断之后得到的提示信息,那下面就把celery启动起来吧~~~(5 启动Celey服务
celery worker -A manager.celery -l debug
我这里是本地使用的virtualenv环境~~~

ok, 现在启动了celery服务之后我们在前台执行一下:
通过点击执行后我们看日志:
任务执行完毕了;
再看看Flask的日志,这就是通过js轮询请求这个task_resul函数的结果
再来看看redis中的内容,这就将结果保存到了backend中
(4)如何撤销一个任务呢?
还是通过task_id来实现。视图函数中编写一个任务的函数
### 部分代码
@main.route('/cancel_task/', methods=['GET', 'POST'])
@login_required
def cancel_task():
# 通过前端的取消按钮来获取到这个任务的id
task_id = json.loads(request.form.get('data'))['task_id']
try:
celery.control.revoke(task_id, terminate=True, signal='SIGKILL')
return Response(json.dumps({'result':True,"message": "取消任务完成" }))
except Exception,e:
return Response(json.dumps({'result': True, "message": u'取消任务失败.{0}'.format(e)}))
执行过程如下:



需要注意的是,在上面视图函数中的这段代码其实有也可以撤销
task = update_cbt_resource.apply_async()
task.revoke()
但是这种方式只是撤销,如果任务已经在执行撤销则无效;所以我这里可以使用下面的方法来撤销
# 通过task_id撤销
celery.control.revoke(task_id)
# 撤销正在执行的任务,默认使用TERM信号
celery.control.revoke(task_id, terminate=True)
# 撤销正在执行的任务,默认使用KILL信号
celery.control.revoke(task_id, terminate=True, signal='SIGKILL')
#在官网文档中也可以将多个task_id组成列表形式然后同时撤销多个任务
celery.control.revoke([task_id1,task_id2,task_id3,task_id4.......])