www.zhblog.net

Flask 后台任务 - Celery

有时我们会处理一些比较耗时的任务,比如:处理文件数据、发送大量的短信、email。而这些耗时的操作不应该在一个request请求中等待完成。这时就需要返回结果给前端,耗时操作在后台运行。

在Flask后台任务使用Celery集成的。Celery是一个强大的任务队列,可以处理大量的后台任务。

Celery基础:

from celery import Celery

app = Celery('background_task_celery', broker='redis://localhost') # 大多数后台处理不需要返回结果,如果需要则必须配置 backend='redis://localhost'

@app.task
def test():
    print('task...')
    return 'abc'


if __name__ == '__main__':
    test.delay() # 调用 test 任务
    test.apply_async(countdown=10) # 10s后再次调用
    print('no result')    #    
    
    result = tasks.add.delay(1, 2)    
    print(result.get()) # 获取返回结果
    print('后台任务发送完毕,程序结束!')


参考Celery官网实例


Flask中使用Celery

app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379')


def make_celery(app):
    celery = Celery(
        app.import_name,
        backend=app.config['CELERY_RESULT_BACKEND'],
        broker=app.config['CELERY_BROKER_URL']
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery


celery = make_celery(app)


@celery.task()
def copy_video():
    video_file = os.path.join('e:\\', '迅雷下载', 'The.Hunt.2015.S01.BluRay.1080p.x264.DTS-HD.MA.5.1-HDChina', 'large_8G.mkv') # 读取8G视频大文件
    with open(video_file, 'rb') as fr:
        while True:
            chunk = fr.read(1024 * 1024 * 5)
            if len(chunk) > 0:
                with open(r'e:\path\1\new.mkv', 'ab') as fw:
                    fw.write(chunk)
            else:
                break;
    print('ok...')


@app.route('/bg_task')
def bg_task():
    copy_video.delay()
    return '任务后台运行,当前请求结束!'

展开阅读全文

评论

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 心情