-
Notifications
You must be signed in to change notification settings - Fork 83
/
blynktimer.py
133 lines (110 loc) · 4.36 KB
/
blynktimer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Copyright (c) 2019-2020 Anton Morozenko
"""
Polling timers for functions.
Registers timers and performs run once or periodical function execution after defined time intervals.
"""
# select.select call used as polling waiter where it is possible
# cause time.sleep sometimes may load CPU up to 100% with small polling wait interval
try:
# cpython
import time
import select
polling_wait = lambda x: select.select([], [], [], x)
polling_wait(0.01)
except OSError:
# windows case where select.select call fails
polling_wait = lambda x: time.sleep(x)
except ImportError:
# micropython
import utime as time
try:
from uselect import select as s_select
polling_wait = lambda x: s_select([], [], [], x)
except ImportError:
# case when micropython port does not support select.select
polling_wait = lambda x: time.sleep(x)
WAIT_SEC = 0.05
MAX_TIMERS = 16
DEFAULT_INTERVAL = 10
class TimerError(Exception):
pass
class Timer(object):
timers = {}
def __init__(self, no_timers_err=True):
self.no_timers_err = no_timers_err
def _get_func_name(self, obj):
"""retrieves a suitable name for a function"""
if hasattr(obj, 'func'):
# handles nested decorators
return self._get_func_name(obj.func)
# simply returns 'timer' if on port without function attrs
return getattr(obj, '__name__', 'timer')
def register(blynk, *args, **kwargs):
# kwargs with defaults are used cause PEP 3102 no supported by Python2
interval = kwargs.pop('interval', DEFAULT_INTERVAL)
run_once = kwargs.pop('run_once', False)
stopped = kwargs.pop('stopped', False)
class Deco(object):
def __init__(self, func):
self.func = func
if len(list(Timer.timers.keys())) >= MAX_TIMERS:
raise TimerError('Max allowed timers num={}'.format(MAX_TIMERS))
_timer = _Timer(interval, func, run_once, stopped, *args, **kwargs)
Timer.timers['{}_{}'.format(len(list(Timer.timers.keys())), blynk._get_func_name(func))] = _timer
def __call__(self, *f_args, **f_kwargs):
return self.func(*f_args, **f_kwargs)
return Deco
@staticmethod
def stop(t_id):
timer = Timer.timers.get(t_id, None)
if timer is None:
raise TimerError('Timer id={} not found'.format(t_id))
Timer.timers[t_id].stopped = True
@staticmethod
def start(t_id):
timer = Timer.timers.get(t_id, None)
if timer is None:
raise TimerError('Timer id={} not found'.format(t_id))
Timer.timers[t_id].stopped = False
Timer.timers[t_id].fire_time = None
Timer.timers[t_id].fire_time_prev = None
@staticmethod
def is_stopped(t_id):
timer = Timer.timers.get(t_id, None)
if timer is None:
raise TimerError('Timer id={} not found'.format(t_id))
return timer.stopped
def get_timers(self):
states = {True: 'Stopped', False: 'Running'}
return {k: states[v.stopped] for k, v in self.timers.items()}
def run(self):
polling_wait(WAIT_SEC)
timers_intervals = [curr_timer.run() for curr_timer in Timer.timers.values() if not curr_timer.stopped]
if not timers_intervals and self.no_timers_err:
raise TimerError('Running timers not found')
return timers_intervals
class _Timer(object):
def __init__(self, interval, deco, run_once, stopped, *args, **kwargs):
self.interval = interval
self.deco = deco
self.args = args
self.run_once = run_once
self.kwargs = kwargs
self.fire_time = None
self.fire_time_prev = None
self.stopped = stopped
def run(self):
timer_real_interval = 0
if self.fire_time is None:
self.fire_time = time.time() + self.interval
if self.fire_time_prev is None:
self.fire_time_prev = time.time()
curr_time = time.time()
if curr_time >= self.fire_time:
self.deco(*self.args, **self.kwargs)
if self.run_once:
self.stopped = True
timer_real_interval = curr_time - self.fire_time_prev
self.fire_time_prev = self.fire_time
self.fire_time = curr_time + self.interval
return timer_real_interval