码迷,mamicode.com
首页 > 其他好文 > 详细

Celery的介绍

时间:2019-03-24 18:48:06      阅读:113      评论:0      收藏:0      [点我收藏+]

标签:进程   错误   expires   实现   date   conf   组件   hub   ali   

1、Celery的简介以及基本使用

  Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用  的例子:

  1. 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。 
  2. 你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天 是客户的生日,就给他发个短信祝福

 

Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis,后面会讲

1.1 Celery有以下优点:

  1. 简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的
  2. 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
  3. 快速:一个单进程的celery每分钟可处理上百万个任务
  4. 灵活: 几乎celery的各个组件都可以被扩展及自定制

Celery基本工作流程图

技术图片

Celery安装使用

Celery的默认broker是RabbitMQ, 仅需配置一行就可以

broker_url = ‘amqp://guest:guest@localhost:5672//‘

 rabbitMQ 没装的话请装一下,安装看这里  http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#id3

使用Redis做broker也可以

安装celery模块

pip3  install celery

 创建一个celery application 用来定义你的任务列表

创建一个任务文件就叫tasks.py吧

from celery import Celery
 
app = Celery(‘tasks‘,
             broker=‘redis://localhost‘,
           # broker=‘redis://:123123@106.13.104.194:6379‘,
             backend=‘redis://localhost‘)
 
@app.task
def add(x,y):
    print("running...",x,y)
    return x+y            

 启动Celery Worker来开始监听并执行任务

celery -A tasks worker --loglevel=info

 启动Celery报错问题的处理:

  File "/usr/local/python3/lib/python3.7/site-packages/celery/backends/redis.py", line 22
    from . import async, base
                      ^
SyntaxError: invalid syntax


搜索了一下错误原因,原来是async名称更换了,如下
[Rename `async` to `asynchronous` (async is a reserved keyword in Python 3.7) #4879](https://github.com/celery/celery/pull/4879)


开发人员已经处理了这个issue,合并了master,快速的解决方案是通过github安装celery,命令如下:
pip3 install --upgrade https://github.com/celery/celery/tarball/master
再次运次,那个应该可以看到如下正常输出:

 

调用任务

再打开一个终端, 进行命令行模式,调用任务

from tasks import add
 add.delay(4, 4)

 看你的worker终端会显示收到 一个任务,此时你想看任务结果的话,需要在调用 任务时 赋值个变量

result = add.delay(4, 4)
result.get(timeout=1)
result.ready()
result.get(propagate=False)

 2、在项目中如何使用celery

可以把celery配置成一个应用

目录格式如下

proj/__init__.py
    /celery.py
    /tasks.py

 proj/celery.py内容

from __future__ import absolute_import, unicode_literals
from celery import Celery
 
app = Celery(‘proj‘,
             broker=‘redis://:123123@106.13.104.194:6379‘,
             backend=‘redis://:123123@106.13.104.194:6379‘,
             include=[‘proj.tasks‘])
 
# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)
 
if __name__ == ‘__main__‘:
    app.start()

 proj/tasks.py中的内容

from __future__ import absolute_import, unicode_literals
from .celery import app

@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

 启动worker

celery -A proj worker -l info

 后台启动worker

celery multi start w1 -A proj -l info

 三、Celery 定时任务

 

参考:https://www.cnblogs.com/alex3714/articles/6351797.html

 

Celery的介绍

标签:进程   错误   expires   实现   date   conf   组件   hub   ali   

原文地址:https://www.cnblogs.com/weidaijie/p/10589438.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!