码迷,mamicode.com
首页 > 其他好文 > 详细

【学无止境】使用Celery执行有依赖关系的任务

时间:2021-01-29 11:55:03      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:tar   back   name   RKE   实现   star   target   for   tps   

需求

有 A-F 6个任务,它们的执行关系如下:

  • Task A, Task B
    • Task C, Task D
      • Task E, Task F

同一级内的任务可以并行,上一级任务做完了,才能进行下一级任务。

实现这样一个具有依赖关系的系列任务有很多种方法。这里采用的是Celery。

  • 优点是:不用自己重新造一个轮子,功能性强,扩展性强。
  • 缺点是:Celery本身不是很复杂,但是也有一定的学习成本。

Celery基本介绍

Celery是分布式(异步)任务队列。

具体内容可以参考这里 -> https://www.cnblogs.com/maxstack/p/13571996.html

目录结构

示例程序的结构如下:

  • app.py:应用程序
  • tasks.py:任务程序
  • sleep.sh:实际调用的任务脚本
  • celeryconfig.py:配置文件
  • log:存放log的目录

下面从易到难分别讲解。

任务脚本

sleep.sh,这是一个dummy的任务脚本,模拟了一个耗时为3秒的测试任务。在任务程序tasks.py中,会调用到这个脚本。

echo start
sleep 3
echo done

注:在以下示例程序中,所有任务都调用了同一个脚本,实际情况可能是一个任务对应一个脚本。

任务程序

tasks.py,其中主要定义了一个名为job1的任务,在应用程序app.py中会调用到这个任务。

job1根据传入参数 name ,来分别调用脚本执行不同的任务,然后记录在对应的日志文件中。

import time
import subprocess
from celery import Celery

# ini
app = Celery(‘tasks‘)
app.config_from_object(‘celeryconfig‘)

# execute shell cmd
def execute(command):
    p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    stdout, stderr = p.communicate()
    if p.returncode:
        raise Exception("%s exited with status %r: %r " % (command, p.returncode, stderr))
    return stdout if stdout is None else stdout.strip()

@app.task
def job1(name):
    cmd_1 = ‘~/celery/sleep.sh > ~/celery/log/cmd1.log‘
    cmd_2 = ‘~/celery/sleep.sh > ~/celery/log/cmd2.log‘
    cmd_3 = ‘~/celery/sleep.sh > ~/celery/log/cmd3.log‘
    if name == "a":
        execute(cmd_1)
        return 1
    elif name == "b":
        execute(cmd_2)
        return 2
    elif name == "c":
        execute(cmd_3)
        return 3
    return 0

配置文件

celeryconfig.py,这里主要配置了 backend 和 broker 。

result_backend = ‘rpc://‘
broker_url = ‘redis://localhost‘

参考这里 -> https://docs.celeryproject.org/en/master/userguide/configuration.html

应用程序

app.py,这里首先启动了Task A, Task B,让它们并行执行,等它们都执行完了,再启动Task C

from tasks import job1
import time

TIME_INTERVAL = 0.5

def waitFor(input):
    job = input[0]
    name = input[1]
    while not job.ready():
        time.sleep(TIME_INTERVAL)
        print("waiting for job " + name + " - " + job.id + " to complete ...")
    print("done to run job " + name)
    return

def runJob(name):
    print("start to run job " + name)
    job = job1.delay(name)
    return [job,name]

# start job a and b
job_a = runJob("a")
job_b = runJob("b")
waitFor(job_a)
waitFor(job_b)

# start job c
job_c = runJob("c")
waitFor(job_c)

运行

需要注意的是, concurrency 至少要>2,这样才能两个 job 同时运行,一般配置为和机器的CPU核数相等。

service docker start
 
docker run -d -p 6379:6379 redis

celery -A tasks worker --loglevel=info --concurrency=6

python app.py

结果

start to run job a
start to run job b
waiting for job a - e5897b71-8841-4628-9401-b3fa519109a9 to complete ...
waiting for job a - e5897b71-8841-4628-9401-b3fa519109a9 to complete ...
waiting for job a - e5897b71-8841-4628-9401-b3fa519109a9 to complete ...
waiting for job a - e5897b71-8841-4628-9401-b3fa519109a9 to complete ...
waiting for job a - e5897b71-8841-4628-9401-b3fa519109a9 to complete ...
waiting for job a - e5897b71-8841-4628-9401-b3fa519109a9 to complete ...
done to run job a
done to run job b
start to run job c
waiting for job c - bf7d1a4f-c8d8-4f09-9600-bcfe877d8f10 to complete ...
waiting for job c - bf7d1a4f-c8d8-4f09-9600-bcfe877d8f10 to complete ...
waiting for job c - bf7d1a4f-c8d8-4f09-9600-bcfe877d8f10 to complete ...
waiting for job c - bf7d1a4f-c8d8-4f09-9600-bcfe877d8f10 to complete ...
waiting for job c - bf7d1a4f-c8d8-4f09-9600-bcfe877d8f10 to complete ...
waiting for job c - bf7d1a4f-c8d8-4f09-9600-bcfe877d8f10 to complete ...
done to run job c

【学无止境】使用Celery执行有依赖关系的任务

标签:tar   back   name   RKE   实现   star   target   for   tps   

原文地址:https://www.cnblogs.com/maxstack/p/14341330.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!