关于异步的知识网上很多,这里就直接上代码,目前结合Flask这个Python框架实现后台任务的执行操作;
1.需要了解的知识点:
- 了解生产消费模型或者发布订阅模式来实现消息队列
- 了解异步、同步之间的差别
2.实现过程
(1)目录结构
├── LICENSE
├── README.md
├── app // Flask APP应用
│ ├── __init__.py
│ ├── auth
│ ├── email.py
│ ├── main
│ ├── models.py
│ ├── salt
│ ├── static
│ └── templates
├── config.py // 通用配置
├── config.pyc
├── manager.py
├── migrations
│ ├── README
│ ├── alembic.ini
│ ├── env.py
│ └── versions
└── requirements.txt
(2)指定broker/backend(此处使用redis)
vim app/__init__.py
// 部分代码
from celery import Celery
app = Flask(__name__)
broker = 'redis://127.0.0.1:6379'
backend = 'redis://127.0.0.1:6379/0'
celery = Celery(app.name, broker=broker, backend=backend)
// 部分代码
以上引入了Celery,指定了Celery的生产消费端都为我们的redis
(3)这里我将任务写到了视图函数中
vim app/main/views.py
(1 创建一个任务函数, 并且将这个任务绑定上了celery的标记
部分代码
@celery.task(bind=True)
def update_cbt_resource(self):''' @summary: 创建一个需要后台去执行的任务,我这里只是举个栗子 ''' result=commands.getoutput("sleep 20 && echo 'ok'") return result
(2 创建一个执行任务请求函数
部分代码
@main.route(‘/execute_task/<update_env>’, methods=[‘GET’, ‘POST’])
def execute_task(update_env):""" @summary: 请求函数入口 """ # 这里用最笨的办法执行了在我们web中点击执行任务之前去检查celery这个服务是否存在,如果不存在会提示用户 result = commands.getoutput("ps -ef | grep celery | grep -v grep") if not result: return jsonify({"result":False,"message":u'未发现Celery进程,请检查该服务是否正常启动'}) # 将前面写的任务函数直接调用apply_async属性 task = update_cbt_resource.apply_async() # 这里就可以获取到这个任务一开始执行就返回的一些属性 # 比如任务ID, 任务状态 data = { "task_id": task.id, "task_status": task.status } # 将以上的信息作为这请求函数的返回 result = {"result":True,"data":data,"message":u'执行开始'} return jsonify(result)
- (3 创建一个任务状态查询的函数
一般任务触发之后我们想要知道这个任务的执行状态,是处于哪个阶段的,各个状态已经在官网详细说明了;有兴趣的直接看官网
### 部分的代码
@main.route('/task_result', methods=['GET'])
@login_required
def task_result():
'''
@summary: 任务的状态是通过task_id来获取,在触发了任务之后,通过前端js轮询请求这个函数,就可以得到该任务的当前执行状态
'''
# 点击执行按钮前端会传一个task_id到后端,这里使用form的方式获取到task_id
task_id = json.loads(request.form.get('data'))['task_id']
# AsyncResult,它的作用是被用来检查任务状态,等待任务执行完毕或获取任务结果,如果任务失败,它会返回异常信息或者调用栈。
the_task = update_cbt_resource.AsyncResult(task_id)
print("任务:{0} 当前的 state 为:{1}".format(task_id, the_task.state))
# 执the_task.state
if the_task.state == 'PROGRESS':
print the_task.info.get('i', 0)
result = {'state': 'progress',"result_data":the_task.result}
elif the_task.state == 'SUCCESS':
result = {'state': "success", "result_data":the_task.result}
elif the_task.state == 'PENDING':
result = {'state': 'waitting',"result_data":the_task.result}
elif the_task.state == 'REVOKED':
result = { 'state': 'revoke', "result_data":the_task.result}
print the_task.result
else:
result = {'state': the_task.state,'progress':0,"result_data":the_task.result}
return jsonify(result)
- (4 尝试在前端页面点击执行任务

Oops! 这就是上面进行celery进程判断之后得到的提示信息,那下面就把celery启动起来吧1
2
3
4
5
6
* (5 启动Celey服务
celery worker -A manager.celery -l debug
### 我这里是本地使用的virtualenv环境

ok, 现在启动了celery服务之后我们在前台执行一下:

通过点击执行后我们看日志:

任务执行完毕了;

再看看Flask的日志,这就是通过js轮询请求这个task_resul函数的结果

再来看看redis中的内容,这就将结果保存到了backend中

(4)如何撤销一个任务呢?
还是通过task_id来实现。视图函数中编写一个任务的函数
### 部分代码
@main.route('/cancel_task/', methods=['GET', 'POST'])
@login_required
def cancel_task():
# 通过前端的取消按钮来获取到这个任务的id
task_id = json.loads(request.form.get('data'))['task_id']
try:
celery.control.revoke(task_id, terminate=True, signal='SIGKILL')
return Response(json.dumps({'result':True,"message": "取消任务完成" }))
except Exception,e:
return Response(json.dumps({'result': True, "message": u'取消任务失败.{0}'.format(e)}))
执行过程如下:




需要注意的是,在上面视图函数中的这段代码其实有也可以撤销
task = update_cbt_resource.apply_async()
task.revoke()
但是这种方式只是撤销,如果任务已经在执行撤销则无效;所以我这里可以使用下面的方法来撤销
# 通过task_id撤销
celery.control.revoke(task_id)
# 撤销正在执行的任务,默认使用TERM信号
celery.control.revoke(task_id, terminate=True)
# 撤销正在执行的任务,默认使用KILL信号
celery.control.revoke(task_id, terminate=True, signal='SIGKILL')
#在官网文档中也可以将多个task_id组成列表形式然后同时撤销多个任务
celery.control.revoke([task_id1,task_id2,task_id3,task_id4.......])