Skip to content

Commit

Permalink
reduce duplication, clean up code, introduce _constants
Browse files Browse the repository at this point in the history
  • Loading branch information
mpenkov committed Nov 25, 2022
1 parent 985a513 commit 5957d4d
Showing 1 changed file with 82 additions and 26 deletions.
108 changes: 82 additions & 26 deletions sqlitedict.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,51 @@ def reraise(tp, value, tb=None):

logger = logging.getLogger(__name__)

#
# There's a thread that holds the actual SQL connection (SqliteMultithread).
# We communicate with this thread via queues (request and responses).
# The requests can either be SQL commands or one of the "special" commands
# below:
#
# _REQUEST_CLOSE: request that the SQL connection be closed
# _REQUEST_COMMIT: request that any changes be committed to the DB
#
# Responses are either SQL records (e.g. results of a SELECT) or the magic
# _RESPONSES_NO_MORE command, which indicates nothing else will ever be written
# to the response queue.
#
_REQUEST_CLOSE = '--close--'
_REQUEST_COMMIT = '--commit--'
_RESPONSE_NO_MORE = '--no more--'

#
# We work with weak references for better memory efficiency.
# Dereferencing, checking the referent queue still exists, and putting to it
# is boring and repetitive, so we have a _put function to handle it for us.
#
_PUT_OK, _PUT_REFERENT_DESTROYED, _PUT_NOOP = 0, 1, 2


def _put(queue_reference, item):
if queue_reference is not None:
queue = queue_reference()
if queue is None:
#
# We got a reference to a queue, but that queue no longer exists
#
retval = _PUT_REFERENT_DESTROYED
else:
queue.put(item)
retval = _PUT_OK

del queue
return retval

#
# We didn't get a reference to a queue, so do nothing (no-op).
#
return _PUT_NOOP


def open(*args, **kwargs):
"""See documentation of the SqliteDict class."""
Expand Down Expand Up @@ -432,18 +477,20 @@ def run(self):

res_ref = None
while True:
#
# req: an SQL command or one of the --magic-- commands we use internally
# arg: arguments for the command
# res_ref: a weak reference to the queue into which responses must be placed
# outer_stack: the outer stack, for producing more informative traces in case of error
#
req, arg, res_ref, outer_stack = self.reqs.get()

if req == '--close--':
if req == _REQUEST_CLOSE:
assert res_ref, ('--close-- without return queue', res_ref)
break
elif req == '--commit--':
elif req == _REQUEST_COMMIT:
conn.commit()
if res_ref:
res = res_ref()
if res is not None:
res.put('--no more--')
del res
_put(res_ref, _RESPONSE_NO_MORE)
else:
try:
cursor.execute(req, arg)
Expand Down Expand Up @@ -484,27 +531,23 @@ def run(self):

if res_ref:
for rec in cursor:
res = res_ref()
if res is None:
if _put(res_ref, rec) == _PUT_REFERENT_DESTROYED:
#
# The queue we are sending responses to got
# garbage collected, so we stop processing the
# commands.
#
break
res.put(rec)
del res

res = res_ref()
if res is not None:
res.put('--no more--')
del res
_put(res_ref, _RESPONSE_NO_MORE)

if self.autocommit:
conn.commit()

self.log.debug('received: %s, send: --no more--', req)
conn.close()

res = res_ref()
if res is not None:
res.put('--no more--')
del res
_put(res_ref, _RESPONSE_NO_MORE)

def check_raise_error(self):
"""
Expand Down Expand Up @@ -537,6 +580,10 @@ def check_raise_error(self):
def execute(self, req, arg=None, res=None):
"""
`execute` calls are non-blocking: just queue up the request and return immediately.
:param req: The request (an SQL command)
:param arg: Arguments to the SQL command
:param res: A queue in which to place responses as they become available
"""
self._wait_for_initialization()
self.check_raise_error()
Expand All @@ -549,7 +596,16 @@ def execute(self, req, arg=None, res=None):
# so often.
stack = traceback.extract_stack()[:-1]

self.reqs.put((req, arg or tuple(), res, stack))
#
# We pass a weak reference to the response queue instead of a regular
# reference, because we want the queues to be garbage-collected
# more aggressively.
#
res_ref = None
if res:
res_ref = weakref.ref(res)

self.reqs.put((req, arg or tuple(), res_ref, stack))

def executemany(self, req, items):
for item in items:
Expand All @@ -565,11 +621,11 @@ def select(self, req, arg=None):
(`for res in self.select(): ...`), the entire result will be in memory.
"""
res = Queue() # results of the select will appear as items in this queue
self.execute(req, arg, weakref.ref(res))
self.execute(req, arg, res)
while True:
rec = res.get()
self.check_raise_error()
if rec == '--no more--':
if rec == _RESPONSE_NO_MORE:
break
yield rec

Expand All @@ -586,10 +642,10 @@ def commit(self, blocking=True):
# blocking=False. This ensures any available exceptions for any
# previous statement are thrown before returning, and that the
# data has actually persisted to disk!
self.select_one('--commit--')
self.select_one(_REQUEST_COMMIT)
else:
# otherwise, we fire and forget as usual.
self.execute('--commit--')
self.execute(_REQUEST_COMMIT)

def close(self, force=False):
if force:
Expand All @@ -598,12 +654,12 @@ def close(self, force=False):
# can't process the request. Instead, push the close command to the requests
# queue directly. If run() is still alive, it will exit gracefully. If not,
# then there's nothing we can do anyway.
self.reqs.put(('--close--', None, weakref.ref(Queue()), None))
self.reqs.put((_REQUEST_CLOSE, None, weakref.ref(Queue()), None))
else:
# we abuse 'select' to "iter" over a "--close--" statement so that we
# can confirm the completion of close before joining the thread and
# returning (by semaphore '--no more--'
self.select_one('--close--')
self.select_one(_REQUEST_CLOSE)
self.join()

def _wait_for_initialization(self):
Expand Down

0 comments on commit 5957d4d

Please sign in to comment.