Skip to content

Commit

Permalink
Merge pull request #165 from RaRe-Technologies/Cologler_feat-weakref-…
Browse files Browse the repository at this point in the history
…reader

Introduce weak references
  • Loading branch information
mpenkov authored Dec 3, 2022
2 parents 01b13c3 + 8545a17 commit 109e5ea
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 18 deletions.
107 changes: 90 additions & 17 deletions sqlitedict.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import logging
import traceback
from base64 import b64decode, b64encode
import weakref

__version__ = '2.0.0'

Expand Down Expand Up @@ -65,6 +66,51 @@ def reraise(tp, value, tb=None):

logger = logging.getLogger(__name__)

#
# There's a thread that holds the actual SQL connection (SqliteMultithread).
# We communicate with this thread via queues (request and responses).
# The requests can either be SQL commands or one of the "special" commands
# below:
#
# _REQUEST_CLOSE: request that the SQL connection be closed
# _REQUEST_COMMIT: request that any changes be committed to the DB
#
# Responses are either SQL records (e.g. results of a SELECT) or the magic
# _RESPONSE_NO_MORE command, which indicates nothing else will ever be written
# to the response queue.
#
_REQUEST_CLOSE = '--close--'
_REQUEST_COMMIT = '--commit--'
_RESPONSE_NO_MORE = '--no more--'

#
# We work with weak references for better memory efficiency.
# Dereferencing, checking the referent queue still exists, and putting to it
# is boring and repetitive, so we have a _put function to handle it for us.
#
_PUT_OK, _PUT_REFERENT_DESTROYED, _PUT_NOOP = 0, 1, 2


def _put(queue_reference, item):
if queue_reference is not None:
queue = queue_reference()
if queue is None:
#
# We got a reference to a queue, but that queue no longer exists
#
retval = _PUT_REFERENT_DESTROYED
else:
queue.put(item)
retval = _PUT_OK

del queue
return retval

#
# We didn't get a reference to a queue, so do nothing (no-op).
#
return _PUT_NOOP


def open(*args, **kwargs):
"""See documentation of the SqliteDict class."""
Expand Down Expand Up @@ -454,16 +500,22 @@ def run(self):
finally:
self._lock.release()

res = None
res_ref = None
while True:
req, arg, res, outer_stack = self.reqs.get()
if req == '--close--':
assert res, ('--close-- without return queue', res)
#
# req: an SQL command or one of the --magic-- commands we use internally
# arg: arguments for the command
# res_ref: a weak reference to the queue into which responses must be placed
# outer_stack: the outer stack, for producing more informative traces in case of error
#
req, arg, res_ref, outer_stack = self.reqs.get()

if req == _REQUEST_CLOSE:
assert res_ref, ('--close-- without return queue', res_ref)
break
elif req == '--commit--':
elif req == _REQUEST_COMMIT:
conn.commit()
if res:
res.put('--no more--')
_put(res_ref, _RESPONSE_NO_MORE)
else:
try:
cursor.execute(req, arg)
Expand Down Expand Up @@ -504,17 +556,25 @@ def run(self):
'SqliteDict instance to show the outer stack.'
)

if res:
if res_ref:
for rec in cursor:
res.put(rec)
res.put('--no more--')
if _put(res_ref, rec) == _PUT_REFERENT_DESTROYED:
#
# The queue we are sending responses to got garbage
# collected. Nobody is listening anymore, so we
# stop sending responses.
#
break

_put(res_ref, _RESPONSE_NO_MORE)

if self.autocommit:
conn.commit()

self.log.debug('received: %s, send: --no more--', req)
conn.close()
res.put('--no more--')

_put(res_ref, _RESPONSE_NO_MORE)

def check_raise_error(self):
"""
Expand Down Expand Up @@ -548,6 +608,10 @@ def check_raise_error(self):
def execute(self, req, arg=None, res=None):
"""
`execute` calls are non-blocking: just queue up the request and return immediately.
:param req: The request (an SQL command)
:param arg: Arguments to the SQL command
:param res: A queue in which to place responses as they become available
"""
self.check_raise_error()
stack = None
Expand All @@ -559,7 +623,16 @@ def execute(self, req, arg=None, res=None):
# so often.
stack = traceback.extract_stack()[:-1]

self.reqs.put((req, arg or tuple(), res, stack))
#
# We pass a weak reference to the response queue instead of a regular
# reference, because we want the queues to be garbage-collected
# more aggressively.
#
res_ref = None
if res:
res_ref = weakref.ref(res)

self.reqs.put((req, arg or tuple(), res_ref, stack))

def executemany(self, req, items):
for item in items:
Expand All @@ -579,7 +652,7 @@ def select(self, req, arg=None):
while True:
rec = res.get()
self.check_raise_error()
if rec == '--no more--':
if rec == _RESPONSE_NO_MORE:
break
yield rec

Expand All @@ -596,10 +669,10 @@ def commit(self, blocking=True):
# blocking=False. This ensures any available exceptions for any
# previous statement are thrown before returning, and that the
# data has actually persisted to disk!
self.select_one('--commit--')
self.select_one(_REQUEST_COMMIT)
else:
# otherwise, we fire and forget as usual.
self.execute('--commit--')
self.execute(_REQUEST_COMMIT)

def close(self, force=False):
if force:
Expand All @@ -608,12 +681,12 @@ def close(self, force=False):
# can't process the request. Instead, push the close command to the requests
# queue directly. If run() is still alive, it will exit gracefully. If not,
# then there's nothing we can do anyway.
self.reqs.put(('--close--', None, Queue(), None))
self.reqs.put((_REQUEST_CLOSE, None, weakref.ref(Queue()), None))
else:
# we abuse 'select' to "iter" over a "--close--" statement so that we
# can confirm the completion of close before joining the thread and
# returning (by semaphore '--no more--'
self.select_one('--close--')
self.select_one(_REQUEST_CLOSE)
self.join()


Expand Down
2 changes: 1 addition & 1 deletion tests/test_autocommit.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

def test():
"Verify autocommit just before program exits."
assert os.system('PYTHONPATH=. %s tests/autocommit.py' % sys.executable) == 0
assert os.system('env PYTHONPATH=. %s tests/autocommit.py' % sys.executable) == 0
# The above script relies on the autocommit feature working correctly.
# Now, let's check if it actually worked.
d = sqlitedict.SqliteDict('tests/db/autocommit.sqlite')
Expand Down
35 changes: 35 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import unittest
import tempfile
import os
from unittest.mock import patch

# local
import sqlitedict
Expand Down Expand Up @@ -66,6 +67,40 @@ def test_commit_nonblocking(self):
d['key'] = 'value'
d.commit(blocking=False)

def test_cancel_iterate(self):
import time

class EndlessKeysIterator:
def __init__(self) -> None:
self.value = 0

def __iter__(self):
return self

def __next__(self):
self.value += 1
return [self.value]

with patch('sqlitedict.sqlite3') as mock_sqlite3:
ki = EndlessKeysIterator()
cursor = mock_sqlite3.connect().cursor()
cursor.__iter__.return_value = ki

with SqliteDict(autocommit=True) as d:
for i, k in enumerate(d.keys()):
assert i + 1 == k
if k > 100:
break
assert ki.value > 101

# Release GIL, let background threads run.
# Don't use gc.collect because this is simulate user code.
time.sleep(0.01)

current = ki.value
time.sleep(1)
assert current == ki.value, 'Will not read more after iterate stop'


class NamedSqliteDictCreateOrReuseTest(TempSqliteDictTest):
"""Verify default flag='c', and flag='n' of SqliteDict()."""
Expand Down

0 comments on commit 109e5ea

Please sign in to comment.