Aller au contenu

Planifier des Tâches avec APScheduler

Si vous cherchez à exécuter dans vos codes python des taches de manières répétitives ou à des heures fixes (cron) la bibliothèque APScheduler, pour Advanced Python Scheduler, peut vous économiser beaucoup de lignes de code et de temps. En effet, il regorge de beaucoup de fonctionnalités, par exemple si votre planificateur est redémarré, il exécutera toutes les tâches qu’il aurait dû exécuter pendant qu’il était hors ligne.

APScheduler dispose de trois systèmes de planification intégrés qui peuvent être combiné :

  • Planification de type crontab avec heures de début et de fin facultatives
  • Exécution basée sur des intervalles exécute les travaux à intervalles réguliers, avec des heures de début / fin facultatives
  • Exécution différée unique, exécute les travaux une fois, à une date / heure définie

En plus vous pouvez choisir le framework python gérant vos jobs :

  • BlockingScheduler
  • BackgroundScheduler
  • AsyncIOScheduler
  • GeventScheduler
  • TornadoScheduler
  • TwistedScheduler
  • QtScheduler

Comment lancer des taches répétitives avec APScheduler

Un petit exemple de code :

#!/usr/bin/env python
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def init():
print('begin init...')
time.sleep(120)
print('end init...')
def scan():
print('begin netscan')
time.sleep(50)
print('end netscan...')
def register():
print('begin register...')
time.sleep(20)
print('end register...')
if __name__ == '__main__':
scheduler = BlockingScheduler(timezone='Europe/Paris')
scheduler.add_job(init)
scheduler.add_job(scan, "interval", seconds=90, misfire_grace_time=30)
scheduler.add_job(register, "interval", seconds=60, misfire_grace_time=30)
scheduler.print_jobs()
scheduler.start()
try:
while True:
time.sleep(5)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()

Dans cet exemple qui utilise Blocking comme framework, je génère trois jobs :

  • init qui est lancé une seule fois au démarrage de l’application
  • scan qui se lance toutes les 90s avec une tolérance de 30s
  • register qui se lance toutes les 60s avec une tolérance de 30s

À l’exécution on voit que le job

Jobstore default:
init (trigger: date[2020-11-04 11:11:28 CET], next run at: 2020-11-04 11:11:28 CET)
register (trigger: interval[0:01:00], next run at: 2020-11-04 11:12:28 CET)
scan (trigger: interval[0:01:30], next run at: 2020-11-04 11:12:58 CET)
begin init...
begin register...
end register...
begin scan..
begin register...
end scan..

Vous remarquerez que le job register démarre alors que celui d’init n’est pas terminé et que celles de netscan se mélange. Si vous souhaitez que l’exécution de vos jobs ne se chevauchent pas, il suffit de limiter le nombre de thread. Cela se fait en changeant la ligne suivante :

if __name__ == '__main__':
scheduler = BlockingScheduler(timezone='Europe/Paris')
scheduler.add_job(init)
scheduler.add_job(scan, "interval", seconds=90, misfire_grace_time=30)
scheduler.add_job(register, "interval", seconds=60, misfire_grace_time=30)
scheduler.print_jobs()
scheduler.start()

en :

from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
if __name__ == '__main__':
scheduler = BlockingScheduler(timezone='Europe/Paris', executors={'default': ThreadPoolExecutor(1)})
scheduler.add_job(init)
scheduler.add_job(scan, "interval", seconds=90, misfire_grace_time=30)
scheduler.add_job(register, "interval", seconds=60, misfire_grace_time=30)
scheduler.print_jobs()
scheduler.start()

Désormais on voit que nos jobs register et scan fonctionnent de manières concurrentes et ne démarre qu’après celui d’init :

begin init...
Execution of job "register (trigger: interval[0:01:00], next run at: 2020-11-04 11:48:01 CET)" skipped: maximum number of running instances reached (1)
end init...
Run time of job "register (trigger: interval[0:01:00], next run at: 2020-11-04 11:49:01 CET)" was missed by 0:01:00.099997
Run time of job "scan (trigger: interval[0:01:30], next run at: 2020-11-04 11:49:01 CET)" was missed by 0:00:30.100325

Le lancement du job register a été repoussé, car nous limitons le nombre de thread. Les jobs register et scan ont été également repoussé, car l’heure de programmation initiale a été dépassé alors que nous tolérions 30s de bonus.

Lors de la définition de votre scheduler il est possible de définir d’autres paramètres évitant à devoir les saisir de manière répétitive ensuite dans les jobs avec job_defaults:

self.scheduler = BlockingScheduler(
job_defaults={
'coalesce': False,
'misfire_grace_time': 30
})

Comment lancer des taches en parallèle

Imaginons que nous voulions lancer en // des scripts et d’attendre la fin de celles-ci. Nous allons utiliser les events pour gérer une liste de tâches. Une fois la liste de tache vide le programme s’arrête.

import time
import sys
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler as Scheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_SUBMITTED, EVENT_JOB_EXECUTED, EVENT_JOB_ADDED
running_tasks=[]
def task_started(event):
if event.job_id not in running_tasks:
running_tasks.append(event.job_id)
def task_finished(event):
if event.job_id in running_tasks:
running_tasks.remove(event.job_id)
def myjob():
print('The job started at : %s' % datetime.now())
time.sleep(10)
print('The job ended at : %s' % datetime.now())
if __name__== '__main__':
scheduler = Scheduler(timezone='Europe/Paris', executors={'default': ProcessPoolExecutor(10)})
scheduler.add_listener(task_started, EVENT_JOB_SUBMITTED)
scheduler.add_listener(task_finished, EVENT_JOB_EXECUTED|EVENT_JOB_ADDED)
scheduler.start()
for i in range (0,10):
scheduler.add_job(myjob)
while len(running_tasks) !=0 :
time.sleep(1)
scheduler.shutdown()

Je pense que vous avez plein d’idées en tête pour utiliser cette superbe libraire python dont voici le lien pour la documentation. Amusez vous bien !