-
Notifications
You must be signed in to change notification settings - Fork 3
/
jobs.py
147 lines (118 loc) · 4.41 KB
/
jobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Copyright (c) 2012, Rui Carmo
Description: In-process job management
License: MIT (see LICENSE.md for details)
"""
import os, sys, logging, time, traceback, multiprocessing, gc
from cPickle import loads, dumps
from Queue import PriorityQueue, Empty
from threading import Thread, Semaphore
from uuid import uuid4
from functools import partial
from collections import defaultdict
log = logging.getLogger(__name__)
default_priority = 0
max_workers = multiprocessing.cpu_count() * 2
class Pool:
"""Represents a thread pool"""
def __init__(self, workers = max_workers, rate_limit = 1000):
self.max_workers = workers
self.mutex = Semaphore()
self.results = {}
self.retries = defaultdict(int)
self.queue = PriorityQueue()
self.threads = []
self.rate_limit = rate_limit
def _tick(self):
time.sleep(1.0/self.rate_limit)
# clean up finished threads
self.threads = [t for t in self.threads if t.isAlive()]
return (not self.queue.empty()) or (len(self.threads) > 0)
def _loop(self):
"""Handle task submissions"""
def run_task(priority, f, uuid, retries, args, kwargs):
"""Run a single task"""
try:
t.name = getattr(f, '__name__', None)
result = f(*args, **kwargs)
except Exception as e:
# Retry the task if applicable
if log:
log.error(traceback.format_exc())
if retries > 0:
with self.mutex:
self.retries[uuid] += 1
# re-queue the task with a lower (i.e., higher-valued) priority
self.queue.put((priority+1, dumps((f, uuid, retries - 1, args, kwargs))))
self.queue.task_done()
return
result = e
with self.mutex:
self.results[uuid] = dumps(result)
self.retries[uuid] += 1
self.queue.task_done()
while self._tick():
# spawn more threads to fill free slots
log.warn("Running %d/%d threads" % (len(self.threads),self.max_workers))
if len(self.threads) < self.max_workers:
log.debug("Queue Length: %d" % self.queue.qsize())
try:
priority, data = self.queue.get(True, 1.0/self.rate_limit)
except Empty:
continue
f, uuid, retries, args, kwargs = loads(data)
t = Thread(target=run_task, args=[priority, f, uuid, retries, args, kwargs])
t.setDaemon(True)
self.threads.append(t)
t.start()
log.debug("Exited loop.")
for t in self.threads:
t.join()
def stop(self):
"""Flush the job queue"""
self.queue = PriorityQueue()
def start(self, daemonize=False):
"""Pool entry point"""
self.results = {}
self.retries = defaultdict(int)
if daemonize:
t = Thread(target = self._loop, args=[self])
t.setDaemon(True)
t.start()
return
else:
self._loop()
default_pool = Pool()
class Deferred(object):
"""Allows lookup of task results and status"""
def __init__(self, pool, uuid):
self.uuid = uuid
self.pool = pool
self._result = None
@property
def result(self):
if self._result is None:
with self.pool.mutex:
if self.uuid in self.pool.results.keys():
self._result = loads(self.pool.results[self.uuid])
return self._result
@property
def retries(self):
return self.pool.retries[self.uuid]
def task(func=None, pool=None, max_retries=0, priority=default_priority):
"""Task decorator - setus up a .delay() attribute in the task function"""
if func is None:
return partial(task, pool=pool, max_retries=max_retries)
if pool is None:
pool = default_pool
def delay(*args, **kwargs):
uuid = str(uuid4()) # one for each task
pool.queue.put((priority,dumps((func, uuid, max_retries, args, kwargs))))
return Deferred(pool, uuid)
func.delay = delay
func.pool = pool
return func
def start(daemonize = False):
default_pool.start(daemonize = daemonize)