有时我们会处理一些比较耗时的任务,比如:处理文件数据、发送大量的短信、email。而这些耗时的操作不应该在一个request请求中等待完成。这时就需要返回结果给前端,耗时操作在后台运行。
在Flask后台任务使用Celery集成的。Celery是一个强大的任务队列,可以处理大量的后台任务。
Celery基础:
from celery import Celery
app = Celery('background_task_celery', broker='redis://localhost') # 大多数后台处理不需要返回结果,如果需要则必须配置 backend='redis://localhost'
@app.task
def test():
print('task...')
return 'abc'
if __name__ == '__main__':
test.delay() # 调用 test 任务
test.apply_async(countdown=10) # 10s后再次调用
print('no result') #
result = tasks.add.delay(1, 2)
print(result.get()) # 获取返回结果
print('后台任务发送完毕,程序结束!')
Flask中使用Celery
app.config.update(
CELERY_BROKER_URL='redis://localhost:6379',
CELERY_RESULT_BACKEND='redis://localhost:6379')
def make_celery(app):
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
celery = make_celery(app)
@celery.task()
def copy_video():
video_file = os.path.join('e:\\', '迅雷下载', 'The.Hunt.2015.S01.BluRay.1080p.x264.DTS-HD.MA.5.1-HDChina', 'large_8G.mkv') # 读取8G视频大文件
with open(video_file, 'rb') as fr:
while True:
chunk = fr.read(1024 * 1024 * 5)
if len(chunk) > 0:
with open(r'e:\path\1\new.mkv', 'ab') as fw:
fw.write(chunk)
else:
break;
print('ok...')
@app.route('/bg_task')
def bg_task():
copy_video.delay()
return '任务后台运行,当前请求结束!'