forked from gati/django-gcalsync
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tasks.py
34 lines (25 loc) · 1.05 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from sync import Synchronizer
from celery.utils.log import get_task_logger
from celery.task import PeriodicTask
from celery.registry import tasks
from datetime import timedelta, datetime
logger = get_task_logger(__name__)
_tasks = []
def run(calender_id, transformer):
def func(self, **kwargs):
synchronizer = Synchronizer(calendar_id=calender_id, transformer=transformer)
synchronizer.sync()
return func
class TaskManager(object):
def create_task(self, calendar_id, transformer):
class_name = "CeleryTask_%s_%s" %(calendar_id, type(transformer).__name__)
return type(class_name, (PeriodicTask,), {
"run_every": timedelta(seconds=120),
"run": run(calendar_id, transformer)
})
def setup_tasks(self, consumer_dict):
for calendar_id, transformers in consumer_dict.iteritems():
for Transformer in transformers:
_tasks.append(self.create_task(calendar_id, Transformer()))
for Task in _tasks:
tasks.register(Task)