Skip to content

Commit

Permalink
Add support for lock service API
Browse files Browse the repository at this point in the history
  • Loading branch information
dariko committed Feb 1, 2019
1 parent aceada8 commit 0f4fffc
Show file tree
Hide file tree
Showing 7 changed files with 320 additions and 279 deletions.
2 changes: 2 additions & 0 deletions etcd3/apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .extra import ExtraAPI
from .kv import KVAPI
from .lease import LeaseAPI
from .lock import LockAPI
from .maintenance import MaintenanceAPI
from .watch import WatchAPI

Expand All @@ -17,4 +18,5 @@
'MaintenanceAPI',
'LeaseAPI',
'BaseAPI'
'LockAPI'
]
46 changes: 46 additions & 0 deletions etcd3/apis/lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from .base import BaseAPI


class LockAPI(BaseAPI):
def lock(self, name, lease=0):
"""
Lock acquires a distributed shared lock on a given named lock.
On success, it will return a unique key that exists so long as
the lock is held by the caller. This key can be used in
conjunction with transactions to safely ensure updates to etcd
only occur while holding lock ownership. The lock is held until
Unlock is called on the key or the lease associate with the
owner expires.
:type name: str
:param name: name is the identifier for the distributed shared lock to be acquired.
:type lease: int
:param lease: lease is the lease ID to associate with the key in the key-value store. A lease
value of 0 indicates no lease.
"""

method = '/v3beta/lock/lock'
data = {
"name": name,
"lease": lease
}
return self.call_rpc(method, data=data)

def unlock(self, key):
"""
Unlock takes a key returned by Lock and releases the hold on
lock. The next Lock caller waiting for the lock will then be
woken up and given ownership of the lock.
:type key: str
:param key: key is the lock ownership key granted by Lock.
:type lease: int
:param lease: lease is the lease ID to associate with the key in the key-value store. A lease
value of 0 indicates no lease.
"""

method = '/v3beta/lock/unlock'
data = {
"key": key,
}
return self.call_rpc(method, data=data)
4 changes: 3 additions & 1 deletion etcd3/baseclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .apis import ExtraAPI
from .apis import KVAPI
from .apis import LeaseAPI
from .apis import LockAPI
from .apis import MaintenanceAPI
from .apis import WatchAPI
from .stateful import Lease
Expand Down Expand Up @@ -46,7 +47,8 @@ def __iter__(self):
raise NotImplementedError


class BaseClient(AuthAPI, ClusterAPI, KVAPI, LeaseAPI, MaintenanceAPI, WatchAPI, ExtraAPI):
class BaseClient(AuthAPI, ClusterAPI, KVAPI, LeaseAPI, MaintenanceAPI,
WatchAPI, ExtraAPI, LockAPI):
def __init__(self, host='localhost', port=2379, protocol='http',
cert=(), verify=None,
timeout=None, headers=None, user_agent=None, pool_size=30,
Expand Down
100 changes: 100 additions & 0 deletions etcd3/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,60 @@
]
}
},
"/v3beta/lock/lock": {
"post": {
"summary": "Lock acquires a distributed shared lock on a given named lock.\nOn success, it will return a unique key that exists so long asnthe lock is held by the caller. This key can be used in\nconjunction with transactions to safely ensure updates to etcdnonly occur while holding lock ownership. The lock is held until\nUnlock is called on the key or the lease associate with thenowner expires.",
"operationId": "Lock",
"responses": {
"200": {
"description": "",
"schema": {
"$ref": "#/definitions/etcdserverpbLockResponse"
}
}
},
"parameters": [
{
"name": "body",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/etcdserverpbLockRequest"
}
}
],
"tags": [
"Lock"
]
}
},
"/v3beta/lock/unlock": {
"post": {
"summary": "Unlock takes a key returned by Lock and releases the hold on\nlock. The next Lock caller waiting for the lock will then benwoken up and given ownership of the lock.",
"operationId": "Unlock",
"responses": {
"200": {
"description": "",
"schema": {
"$ref": "#/definitions/etcdserverpbUnlockResponse"
}
}
},
"parameters": [
{
"name": "body",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/etcdserverpbUnlockRequest"
}
}
],
"tags": [
"Lock"
]
}
},
"/v3alpha/maintenance/alarm": {
"post": {
"summary": "Alarm activates, deactivates, and queries alarms regarding cluster health.",
Expand Down Expand Up @@ -1664,6 +1718,52 @@
}
}
},
"etcdserverpbLockRequest": {
"type": "object",
"properties": {
"name": {
"type": "string",
"format": "byte",
"description": "name is the identifier for the distributed shared lock to be acquired."
},
"lease": {
"type": "string",
"format": "int64",
"description": "lease is the ID of the lease that will be attached to ownership of the lock. If the lease expires or is revoked and currently holds the lock, the lock is automatically released. Calls to Lock with the same lease will be treated as a single acquisition; locking twice with the same lease is a no-op."
}
}
},
"etcdserverpbLockResponse": {
"type": "object",
"properties": {
"header": {
"$ref": "#/definitions/etcdserverpbResponseHeader"
},
"key": {
"type": "string",
"format": "byte",
"description": "key is a key that will exist on etcd for the duration that the Lock caller owns the lock. Users should not modify this key or the lock may exhibit undefined behavior."
}
}
},
"etcdserverpbUnlockRequest": {
"type": "object",
"properties": {
"key": {
"type": "string",
"format": "byte",
"description": "key is the lock ownership key granted by Lock."
}
}
},
"etcdserverpbUnlockResponse": {
"type": "object",
"properties": {
"header": {
"$ref": "#/definitions/etcdserverpbResponseHeader"
}
}
},
"etcdserverpbMember": {
"type": "object",
"properties": {
Expand Down
121 changes: 121 additions & 0 deletions etcd3/stateful/apilock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import logging
import os
import six
import socket
import tempfile
import threading
import uuid

from .watch import EventType
from ..errors import ErrLeaseNotFound

log = logging.getLogger('etcd3.Lock')


class EtcdLockError(Exception):
pass


class EtcdLockAcquireTimeout(Exception):
pass


class ApiLock(object): # TODO: maybe we could improve the performance by reduce some HTTP requests
"""
Based on etcd lock API
"""

DEFAULT_LOCK_TTL = 60

HOST = 'host'
PROCESS = 'process'
THREAD = 'thread'

def __init__(self, client, lock_name, lock_ttl=DEFAULT_LOCK_TTL):
"""
:type client: BaseClient
:param client: instance of etcd.Client
:type lock_name: str
:param lock_name: the name of the lock
:type lock_ttl: int
:param lock_ttl: ttl of the lock, default is 60s
"""
self.client = client
self.name = lock_name
self.lock_ttl = lock_ttl
self.is_taken = False # if the lock is taken by someone
self.lease = None
self.lock_key = None
log.debug("Initiating lock %s", self.lock_name)

@property
def is_acquired(self):
"""
if the lock is acquired
"""
if not self.lease or self.lease.ttl < 0:
return False
return True

acquired = is_acquired

def acquire(self, lock_ttl=None):
"""
Acquire the lock.
:type lock_ttl: int
:param lock_ttl: The duration of the lock we acquired, set to None for eternal locks
:type timeout: int
:param timeout: The time to wait before giving up on getting a lock
"""

if self.lease and self.lock_key and self.lease.alive():
return True
else not is_acquired:
self.lease = client.Lease(lock_ttl or self.lock_ttl)
self.lock_key = self.client.lock(self.name, self.lease)
return self.lock_key

def wait(self, locker=None, timeout=None):
"""
Wait until the lock is lock is able to acquire
:param locker: kv of the lock
:param timeout: wait timeout
"""
locker = locker or self._get_locker()
if not locker:
return
self.watcher = watcher = self.client.Watcher(key=locker.key, max_retries=0)
return watcher.watch_once(lambda e: e.type == EventType.DELETE or e.value == self.uuid, timeout=timeout)

def release(self):
"""
Release the lock
"""
if self.reentrant:
n = self.decr_holder()
if n is not None and n == 0:
self.lease.revoke()
self.lease = None
self.is_taken = False
else:
self.lease.cancel_keepalive(join=False)
self.lease = None
self.is_taken = True
else:
self.lease.revoke()
self.lease = None
self.is_taken = False
log.debug("Lock released (lock_key: %s, value: %s)" % (self.lock_key, self.uuid))

def __enter__(self):
"""
You can use the lock as a contextmanager
"""
self.acquire()
return self

def __exit__(self, type, value, traceback):
self.release()
return False
Loading

0 comments on commit 0f4fffc

Please sign in to comment.