博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[Dynamic Language] Python定时任务框架
阅读量:5818 次
发布时间:2019-06-18

本文共 8656 字,大约阅读时间需要 28 分钟。

APScheduler是一个Python定时任务框架,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务、并以daemon方式运行应用。

在APScheduler中有四个组件:

触发器(trigger)包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置意外,触发器完全是无状态的。
作业存储(job store)存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据讲在保存在持久化作业存储时被序列化,并在加载时被反序列化。调度器不能分享同一个作业存储。

执行器(executor)处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。
调度器(scheduler)是其他的组成部分。你通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。

你需要选择合适的调度器,这取决于你的应用环境和你使用APScheduler的目的。通常最常用的两个:
– BlockingScheduler: 当调度器是你应用中唯一要运行的东西时使用。
– BackgroundScheduler: 当你不运行任何其他框架时使用,并希望调度器在你应用的后台执行。

 

测试计划任务

mac-abeen:timetask abeen$ vim testtask.py

1 # !/usr/bin/env python                                                                                                                                                                                    2 # -*- encoding:utf-8 -*-                                                                  3                                                                                           4 from datetime import datetime                                                             5                                                                                           6                                                                                           7 class TestTask(object):                                                                   8     """                                                                                   9         测试计划任务                                                                     10     """                                                                                  11                                                                                          12     def print_file(self, content):                                                       13         f = open("testtask.log", "a")                                                    14         f.write(content)                                                                 15         f.close()                                                                        16                                                                                          17                                                                                          18     def run_second(self):                                                                19         self.print_file("run_second is run on {0} \r\n ".format(datetime.now()))         20                                                                                          21                                                                                          22     def run_minute(self):                                                                23         self.print_file("run_minute is run on {0} \r\n ".format(datetime.now()))         24                                                                                          25                                                                                          26                                                                                          27                                                                                          28 if __name__ == "__main__":                                                               29     tt = TestTask()                                                                      30     tt.run_second()                                                                      31     tt.run_minute()

 

任务执行

mac-abeen:timetask abeen$ vim run_task.py

1 # !/usr/bin/env python                                                                                                                                                                                    2 # -*- encoding:utf-8 -*-                                                                  3                                                                                           4 from datetime import datetime                                                             5 import time                                                                               6 from apscheduler.schedulers.blocking import BlockingScheduler                             7 from testtask import TestTask                                                             8                                                                                           9 def run():                                                                               10     scheduler = BlockingScheduler()                                                      11     scheduler.add_job(func=TestTask().run_second, trigger='interval', seconds=3)         12     scheduler.add_job(func=TestTask().run_minute, trigger='interval', seconds=60)        13                                                                                          14     scheduler.start()                                                                    15                                                                                          16                                                                                          17 if __name__ == "__main__":                                                               18     print "scheduler is run ......"                                                      19     run()

 日志记录(测试部分日志信息)

1 run_second is run on 2017-06-29 15:13:16.070406                                                                                                                                                           2  run_minute is run on 2017-06-29 15:13:16.071498   3  run_second is run on 2017-06-29 15:13:32.378192   ...  22  run_minute is run on 2017-06-29 15:14:29.379333  23  run_second is run on 2017-06-29 15:14:29.379462  24  run_second is run on 2017-06-29 15:14:32.380961 ...  44  run_minute is run on 2017-06-29 15:15:29.380135  45  run_second is run on 2017-06-29 15:15:32.380093  46  run_second is run on 2017-06-29 15:15:35.378075....

 

 

作业运行的控制

add_job的第二个参数是trigger,它管理着作业的调度方式。它可以为date, interval或者cron。对于不同的trigger,对应的参数也相同。

(1). cron定时调度

year(int|str) – 4-digit year 

month(int|str) – month (1-12) 

day(int|str) – day of the (1-31) 

week(int|str) – ISO week (1-53) 

day_of_week(int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) 

hour(int|str) – hour (0-23) 

minute(int|str) – minute (0-59) 

second(int|str) – second (0-59) 

start_date(datetime|str) – earliest possible date/time to trigger on (inclusive) 

end_date(datetime|str) – latest possible date/time to trigger on (inclusive) 

timezone(datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) 

和Linux的Crontab一样,它的值格式为:

Expression Field Description
* any Fire on every value
*/a any Fire every  values, starting from the minimum 
a-b any Fire on any value within the  a-b range (a must be smaller than b) 
a-b/c any Fire every  values within the  a-b range 
xth y day Fire on the  -th occurrence of weekday  within the month 
last x day Fire on the last occurrence of weekday  within the month 
last day Fire on the last day within the month
x,y,z any Fire on any matching expression; can combine any number of any of the above expressions

几个例子如下:

# The job will be executed on November 6th, 2009sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text']) # The job will be executed on November 6th, 2009 at 16:30:05 sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])

(2). interval 间隔调度

它的参数如下:

weeks(int) – number of weeks to wait 

days(int) – number of days to wait 

hours(int) – number of hours to wait 

minutes(int) – number of minutes to wait 

seconds(int) – number of seconds to wait 

start_date(datetime|str) – starting point for the interval calculation 

end_date(datetime|str) – latest possible date/time to trigger on 

timezone(datetime.tzinfo|str) – time zone to use for the date/time calculations 

例子:

# Schedule job_function to be called every two hourssched.add_job(job_function, 'interval', hours=2)

(3). date 定时调度

最基本的一种调度,作业只会执行一次。它的参数如下:

run_date(datetime|str) – the date/time to run the job at 

timezone(datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already 

例子:

# The job will be executed on November 6th, 2009sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text']) # The job will be executed on November 6th, 2009 at 16:30:05 sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])

 

官方资料帮助:
https://pypi.python.org/pypi/APScheduler/#downloads

https://github.com/agronholm/apscheduler

转载地址:http://znwdx.baihongyu.com/

你可能感兴趣的文章
【官方文档】Nginx负载均衡学习笔记(三) TCP和UDP负载平衡官方参考文档
查看>>
矩阵常用归一化
查看>>
Oracle常用函数总结
查看>>
【聚能聊有奖话题】Boring隧道掘进机完成首段挖掘,离未来交通还有多远?
查看>>
USNews大学排名遭美国计算机研究学会怒怼,指排名荒谬要求撤回
查看>>
七大关键数据 移动安全迎来历史转折点
查看>>
盘点物联网网关现有联网技术及应用场景
查看>>
mui 总结2--新建第一个app项目
查看>>
nginx的lua api
查看>>
考研太苦逼没坚持下来!看苑老师视频有点上头
查看>>
HCNA——RIP的路由汇总
查看>>
zabbix监控php状态(四)
查看>>
定时任务的创建
查看>>
实战Django:小型CMS Part2
查看>>
原创]windows server 2012 AD架构试验系列 – 16更改DC计算机名
查看>>
统治世界的十大算法
查看>>
linux svn安装和配置
查看>>
SSH中调用另一action的方法(chain,redirect)
查看>>
数据库基础
查看>>
表格排序
查看>>