-
Notifications
You must be signed in to change notification settings - Fork 0
/
lock.py
103 lines (91 loc) · 3.72 KB
/
lock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import redis
import pickle
import os
from time import sleep, time
from random import random
class Lock():
def __init__(cls, key, shared=False, expires=25, timeout=30, step=0.5):
"""
A lock mechanism, using Redis as a backend.
key = identifier for this lock
expires = lock expiration time in seconds (when it is considered stale)
timeout = max time to wait for a lock to become available
Multiple shared locks can exist simultaneously.
Only one non-shared (exclusive) lock can exist at a time.
Shared locks wait for exclusive locks to release.
If an exclusive lock is set, new shared lock attempts will wait (block).
Similarly, exclusive locks will wait (block) until all shared locks to clear.
"""
cls.r = redis.StrictRedis()
cls.exclusive_key = "bm-lock-x-{0}".format(key)
cls.shared_key = "bm-lock-s-{0}".format(key)
cls.shared = shared
cls.expires = expires
cls.timeout = timeout
cls.step = step
cls.pid = os.getpid()
def __enter__(cls):
"""
Attempt to acquire the lock.
If the lock is unavailable, retry for `timeout` seconds.
If a stale lock is found (older than `expires`), remove it.
"""
while cls.timeout >= 0:
cls.expires = time() + cls.expires + 1
if cls.shared:
# Make sure nobody has exclusive, but don't take it.
if not cls.r.get(cls.exclusive_key):
# Nobody has exclusive. Get our shared lock
cls._set_shared()
return
else:
# We're getting an exclusive lock. Set it.
if cls._set_exclusive():
# Some shared locks may still exist. Wait for them to release.
cls._wait_for_shared()
return
# Lock not aquired! Check for stale exclusive lock
oldlock = cls.r.get(cls.exclusive_key)
(existing_expires, existing_pid) = pickle.loads(oldlock)
if existing_expires and float(existing_expires) < time():
# Stale Exc Lock found. Delete it.
cls.r.delete(cls.exclusive_key)
# Tick and repeat until timeout.
if existing_expires:
remaining = existing_expires - time()
cls.timeout -= cls.step
sleep(cls.step)
# Timed out
raise(LockException("Could not acquire lock: {0}".format(cls.exclusive_key)))
def __exit__(cls, typ, value, traceback):
"""
Release the lock.
"""
if cls.shared:
cls.r.lrem(cls.shared_key, 0, pickle.dumps((cls.expires, cls.pid)))
else:
cls.r.delete(cls.exclusive_key)
def _set_shared(cls):
"""
Set a shared lock
"""
return cls.r.lpush(cls.shared_key, pickle.dumps((cls.expires, cls.pid)))
def _set_exclusive(cls):
"""
Set an exclusive lock
"""
return cls.r.setnx(cls.exclusive_key, pickle.dumps((cls.expires,cls.pid)))
def _wait_for_shared(cls):
while cls.r.llen(cls.shared_key) > 0 and cls.timeout >= 0:
for sk in cls.r.lrange(cls.shared_key, 0, -1):
(cls.expires, cls.pid) = pickle.loads(sk)
if float(cls.expires) < time():
cls.r.lrem(cls.shared_key, 0, pickle.dumps((cls.expires, cls.pid)))
cls.timeout -= cls.step
sleep(cls.step)
if cls.r.llen(cls.shared_key) == 0:
return
else:
raise(LockException("Shared locks still present: {0}".format(cls.shared_key)))
class LockException(Exception):
pass