-
Notifications
You must be signed in to change notification settings - Fork 0
/
scheduler.py
49 lines (40 loc) · 1.46 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Aug 12 12:42:41 2019
@author: pi
"""
import time
class Scheduler:
def __init__(self):
self.tasks = dict()
self.schedule = set()
self._keep_running = True
def register_task(self, name, function, args=(), next_run_time=None):
self.tasks[name] = (function, args)
if next_run_time is not None:
self.add_to_schedule(name, next_run_time)
def add_to_schedule(self, name, next_run_time):
self.schedule.add((name, next_run_time))
def run_tasklist_once(self):
tasks_to_remove = []
for (name, run_time) in list(self.schedule):
if run_time < time.time():
(function, args) = self.tasks[name]
try:
next_run_time = function(*args)
except Exception as error:
print('Crashed in task:', name)
raise error
if isinstance(next_run_time, (int, float)):
self.schedule.add((name, next_run_time))
tasks_to_remove.append((name, run_time))
for item in tasks_to_remove:
self.schedule.remove(item)
def run_scheduler(self):
while self._keep_running:
self.run_tasklist_once()
time.sleep(0.5)
def stop_scheduler(self):
'''Schedule this function to time when program should stop'''
self._keep_running = False