Skip to content

Commit

Permalink
feat(commands): schedule commands
Browse files Browse the repository at this point in the history
  • Loading branch information
BrianLusina committed Sep 20, 2022
1 parent b0d5a8c commit 5f3cce4
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 4 deletions.
3 changes: 2 additions & 1 deletion kvault/commands/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@


class BaseCommand(object):
def __init__(self, kv: Optional[Dict[AnyStr, Value]], expiry_map: Dict, expiry: List):
def __init__(self, kv: Optional[Dict[AnyStr, Value]], expiry_map: Dict, expiry: List, schedule: List):
self._kv: Dict[AnyStr, Value] = kv
self._expiry_map = expiry_map
self._expiry = expiry
self._schedule = schedule

def check_expired(self, key, ts=None):
ts = ts or time.time()
Expand Down
35 changes: 34 additions & 1 deletion kvault/commands/schedule_commands.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,38 @@
import heapq
import datetime
from commands import BaseCommand
from ..utils import decode
from ..exceptions import CommandError


class ScheduleCommands(BaseCommand):
pass
def _decode_timestamp(self, timestamp):
timestamp = decode(timestamp)
fmt = '%Y-%m-%d %H:%M:%S'
if '.' in timestamp:
fmt = fmt + '.%f'
try:
return datetime.datetime.strptime(timestamp, fmt)
except ValueError:
raise CommandError('Timestamp must be formatted Y-m-d H:M:S')

def schedule_add(self, timestamp, data):
dt = self._decode_timestamp(timestamp)
heapq.heappush(self._schedule, (dt, data))
return 1

def schedule_read(self, timestamp=None):
dt = self._decode_timestamp(timestamp)
accum = []
while self._schedule and self._schedule[0][0] <= dt:
ts, data = heapq.heappop(self._schedule)
accum.append(data)
return accum

def schedule_flush(self):
schedule_len = self.schedule_length()
self._schedule = []
return schedule_len

def schedule_length(self):
return len(self._schedule)
4 changes: 2 additions & 2 deletions kvault/queue_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ def __init__(self, host: str = '127.0.0.1', port: int = 31337, max_clients: int
self._kv: Dict[AnyStr, Value] = {}
self._expiry_map = {}
self._expiry = []
super().__init__(kv=self._kv, expiry_map=self._expiry_map, expiry=self._expiry)
self._schedule = []
super().__init__(kv=self._kv, expiry_map=self._expiry_map, expiry=self._expiry, schedule=self._schedule)
self._host = host
self._port = port
self._max_clients = max_clients
self._pool = Pool(max_clients)
self._server = StreamServer(listener=(self._host, self._port), handle=self.connection_handler)
self._commands = self.get_commands()
self._protocol = ProtocolHandler()
self._schedule = []

self._active_connections = 0
self._commands_processed = 0
Expand Down

0 comments on commit 5f3cce4

Please sign in to comment.