码迷,mamicode.com
首页 > 编程语言 > 详细

python使用apscheduler实现定时任务

时间:2020-03-24 10:40:18      阅读:89      评论:0      收藏:0      [点我收藏+]

标签:executor   独立   user   执行   提交   message   tor   star   eve   

安装apscheduler

pip install apscheduler

基本概念

1、触发器triggers。 触发器包含调度逻辑。每个作业都有自己的触发器,用于确定下一个任务何时运行。触发器有三种内建的trigger:

  1. data: 特定的时间触发
  2. interval: 固定的时间间隔触发
  3. cron: 在特定时间周期性地触发

2、 任务存储器 job stores。 用于存放任务,把任务存放在内存(为默认MemoryJobStore)或数据库中

3、执行器 executors。 执行器是将任务提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件

4、调度器scheduler。 把上方三个组件作为参数,通过创建调度器实例来运行

根据开发需求选择对应的组件

BlockingScheduler            阻塞式调度器:适用于只跑调度器的程序。
BackgroundScheduler      后台调度器:适用于非阻塞的情况,调度器会在后台独立运行。
AsyncIOScheduler            AsyncIO调度器,适用于应用使用AsnycIO的情况。
GeventScheduler              Gevent调度器,适用于应用通过Gevent的情况。
TornadoScheduler            Tornado调度器,适用于构建Tornado应用。
TwistedScheduler             Twisted调度器,适用于构建Twisted应用。
QtScheduler                      Qt调度器,适用于构建Qt应用。

使用步骤

1、新建一个调度器schedulers
2、添加调度任务
3、运行调度任务

使用示例

触发器date:特点的时间点触发,只执行一次

from datetime import datetime
from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler

def job(text):    
    print(text)

scheduler = BlockingScheduler()
# 在 2019-8-30 运行一次 job 方法
scheduler.add_job(job, ‘date‘, run_date=date(2019, 8, 30), args=[‘text1‘])
# 在 2019-8-30 01:00:00 运行一次 job 方法
scheduler.add_job(job, ‘date‘, run_date=datetime(2019, 8, 30, 1, 0, 0), args=[‘text2‘])
# 在 2019-8-30 01:00:01 运行一次 job 方法
scheduler.add_job(job, ‘date‘, run_date=‘2019-8-30 01:00:00‘, args=[‘text3‘])

scheduler.start()

触发器interval:固定时间间隔触发。

参数 说明
weeks(int) 间隔几周
days(int) 间隔几天
hours(int) 间隔几小时
minutes(int) 间隔几分钟
seconds(int) 间隔多少秒
start_date(datetime或str) 开始日期
end_date(datetime或str) 结束日期
timezone(datetime.tzinfo 或str)
from apscheduler.schedulers.blocking import BlockingScheduler


def run(text):    
    print(text)


if __name__ == ‘__main__‘:
    scheduler = BlockingScheduler()
    # 每隔 1分钟 运行一次 job 方法
    scheduler.add_job(run, ‘interval‘, minutes=1, args=[‘job1‘])
    # 在 2019-08-29 22:15:00至2019-08-29 22:17:00期间,每隔1分30秒 运行一次 job 方法
    scheduler.add_job(run, ‘interval‘, minutes=1, seconds=30, start_date=‘2019-08-29 22:15:00‘, end_date=‘2019-08-29 22:17:00‘, args=[‘job2‘])
    scheduler.start()

触发器cron: 在特定的时间周期性地触发。

参数 说明
year (int 或 str) 年,4位数字
month (int 或 str) 月 (范围1-12)
day (int 或 str) 日 (范围1-31)
week (int 或 str) 周 (范围1-53)
day_of_week (int 或 str) 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
hour (int 或 str) 时 (范围0-23)
minute (int 或 str) 分 (范围0-59)
second (int 或 str) 秒 (范围0-59)
start_date (datetime 或 str) 最早开始日期(包含)
end_date (datetime 或 str) 最晚结束时间(包含)
timezone (datetime.tzinfo 或str) 指定时区
from apscheduler.schedulers.blocking import BlockingScheduler


def exec_job(text):
    print(text)
    

if __name__ == ‘__main__‘:
    scheduler = BlockingScheduler()
    # 在每天的22点,每隔一分钟 运行一次exec_job方法
    scheduler.add_job(exec_job, ‘cron‘, hour=22, minute=‘*/1‘, args=[‘job_one‘])
    # 在每天的22点和23点的25分, 运行一次exec_job方法
    scheduler.add_job(exec_job, ‘cron‘, hour=‘22-23‘, minute=‘25‘, args=[‘result‘])
    scheduler.start()

添加任务

添加任务的方法有两种
(1)、通过add_job()添加任务
(2)、通过装饰器scheduled_job()

from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler


scheduler = BlockingScheduler()


def tick(message=‘working‘):
    print(‘Tick! The time is: %s‘ % datetime.now())
    print("worker status is:", message)


def tick_other(status="relaxing"):
    print(‘Tick! The time is: %s‘ % datetime.now())
    print("worker status is:", status)


# 添加任务的方法有两种 1、通过调用add_job(), 2、通过装饰器scheduled_job()
# add_job()方法返回一个apscheduler.job.Job实例,可以使用该实例稍后修改或删除该任务
@scheduler.scheduled_job(‘interval‘, seconds=5)
def tick_another(status="restart"):
    print(‘Tick! The time is: %s‘ % datetime.now())
    print("worker status is:", status)


if __name__ == ‘__main__‘:
    # 在指定期间内, 每隔1分10秒运行一次tick方法
    scheduler.add_job(tick, ‘interval‘, minutes=1, seconds=10,
                      start_date=‘2020-01-06 14:19:00‘, end_date=‘2020-01-06 14:24:00‘, args=[‘sleep‘])
    # 在每天的11点25分,运行一次tick_other方法
    scheduler.add_job(tick_other, ‘cron‘, hour=‘11‘, minute=‘25‘, args=[‘hard working‘])
    scheduler.start()

参考来源

在线文档:在线文档
转载来源:文档

python使用apscheduler实现定时任务

标签:executor   独立   user   执行   提交   message   tor   star   eve   

原文地址:https://www.cnblogs.com/ppwang06/p/12534754.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!