Python调度框架APScheduler使用

APScheduler是Python下一个轻量级的任务调度框架,它允许在特定的时间间隔或者特定的时间执行某项任务。同时它也提供了各种灵活的选项来允许我们根据我们需要的方式进行任务计划。下面将从多个角度详细介绍python调度框架APScheduler的使用。
一、APScheduler的基本使用
APScheduler分为三个模块:触发器(trigger),任务(job)和调度器(scheduler),下面看一下基本使用方法。
1、触发器(trigger):
APScheduler允许我们以不同的单位:年(year),月(month),周(week),日(day),时(hour),分(minute),秒(second)和微秒(microsecond)来设置任务的执行时间。同时,也可以使用多种触发器:date,interval或者cron。以下是基本使用方法。
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
def job():
print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
scheduler = BlockingScheduler()
scheduler.add_job(job, 'interval', seconds=3)
scheduler.start()
2、任务(job):
任务包括函数和它的参数,可以说任务的核心就是自己实现的函数。以下是基本使用方法。
from apscheduler.schedulers.blocking import BlockingScheduler
def job(*args, **kwargs):
print(args)
print(kwargs)
scheduler = BlockingScheduler()
scheduler.add_job(job, 'interval', args=('参数1', '参数2'), kwargs={'参数3': '参数4'})
scheduler.start()
3、调度器(scheduler):
调度器负责管理和调度任务,包括加入任务和删除任务。以下是基本使用方法。
from apscheduler.schedulers.blocking import BlockingScheduler
def job():
print('任务执行')
scheduler = BlockingScheduler()
scheduler.add_job(job, 'interval', seconds=3)
scheduler.remove_job(job_id=None)
scheduler.start()
二、APScheduler的高级用法
1、多任务和多个调度器管理
APScheduler允许在同一应用程序中创建多个调度器,并在不同的调度器中执行多个任务。同时,允许定义和引用不同的触发器。以下是基本使用方法。
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from datetime import datetime
def job():
print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
jobstores = {
'mongo': MongoDBJobStore(database='test', collection='example_jobs')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
schedulers = {
'background': BackgroundScheduler(jobstores=jobstores, executors=executors,
job_defaults={'coalesce': False, 'max_instances': 3},
timezone='Asia/Shanghai')
}
schedulers['background'].add_job(job, 'interval', seconds=5)
schedulers['background'].start()
2、任务监听器
APScheduler允许通过任务监听器来监控任务的状态,比如任务的异常和完成情况。以下是基本使用方法。
from apscheduler.schedulers.blocking import BlockingScheduler
def job_listener(event):
if event.exception:
print('任务执行失败')
else:
print('任务执行成功')
scheduler = BlockingScheduler()
scheduler.add_listener(job_listener, 'job_execution_failed | job_execution_success')
3、持久化存储
APScheduler提供了多种持久化存储的选项,用于存储任务和调度器的状态,这些选项包括内存存储、文件存储和数据库存储。以下是基本使用方法。
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
def job():
print('任务执行')
jobstores = {
'mongo': MongoDBJobStore(database='test', collection='example_jobs')
}
executors = {
'default': ThreadPoolExecutor(10)
}
schedulers = {
'background': BackgroundScheduler(jobstores=jobstores, executors=executors,
timezone='Asia/Shanghai')
}
schedulers['background'].start()
二、总结
APScheduler是Python中一个轻量级的任务调度框架,它提供了各种灵活的选项来允许我们根据我们需要的方式进行任务计划。同时,APScheduler还提供了多种高级用法,比如多任务和多个调度器管理、任务监听器和持久化存储等,让我们可以更好的管理和调度任务。