diff --git a/sqlitedict.py b/sqlitedict.py index 1117163..7efc3ee 100755 --- a/sqlitedict.py +++ b/sqlitedict.py @@ -68,6 +68,51 @@ def reraise(tp, value, tb=None): logger = logging.getLogger(__name__) +# +# There's a thread that holds the actual SQL connection (SqliteMultithread). +# We communicate with this thread via queues (request and responses). +# The requests can either be SQL commands or one of the "special" commands +# below: +# +# _REQUEST_CLOSE: request that the SQL connection be closed +# _REQUEST_COMMIT: request that any changes be committed to the DB +# +# Responses are either SQL records (e.g. results of a SELECT) or the magic +# _RESPONSES_NO_MORE command, which indicates nothing else will ever be written +# to the response queue. +# +_REQUEST_CLOSE = '--close--' +_REQUEST_COMMIT = '--commit--' +_RESPONSE_NO_MORE = '--no more--' + +# +# We work with weak references for better memory efficiency. +# Dereferencing, checking the referent queue still exists, and putting to it +# is boring and repetitive, so we have a _put function to handle it for us. +# +_PUT_OK, _PUT_REFERENT_DESTROYED, _PUT_NOOP = 0, 1, 2 + + +def _put(queue_reference, item): + if queue_reference is not None: + queue = queue_reference() + if queue is None: + # + # We got a reference to a queue, but that queue no longer exists + # + retval = _PUT_REFERENT_DESTROYED + else: + queue.put(item) + retval = _PUT_OK + + del queue + return retval + + # + # We didn't get a reference to a queue, so do nothing (no-op). + # + return _PUT_NOOP + def open(*args, **kwargs): """See documentation of the SqliteDict class.""" @@ -432,18 +477,20 @@ def run(self): res_ref = None while True: + # + # req: an SQL command or one of the --magic-- commands we use internally + # arg: arguments for the command + # res_ref: a weak reference to the queue into which responses must be placed + # outer_stack: the outer stack, for producing more informative traces in case of error + # req, arg, res_ref, outer_stack = self.reqs.get() - if req == '--close--': + if req == _REQUEST_CLOSE: assert res_ref, ('--close-- without return queue', res_ref) break - elif req == '--commit--': + elif req == _REQUEST_COMMIT: conn.commit() - if res_ref: - res = res_ref() - if res is not None: - res.put('--no more--') - del res + _put(res_ref, _RESPONSE_NO_MORE) else: try: cursor.execute(req, arg) @@ -484,16 +531,15 @@ def run(self): if res_ref: for rec in cursor: - res = res_ref() - if res is None: + if _put(res_ref, rec) == _PUT_REFERENT_DESTROYED: + # + # The queue we are sending responses to got + # garbage collected, so we stop processing the + # commands. + # break - res.put(rec) - del res - res = res_ref() - if res is not None: - res.put('--no more--') - del res + _put(res_ref, _RESPONSE_NO_MORE) if self.autocommit: conn.commit() @@ -501,10 +547,7 @@ def run(self): self.log.debug('received: %s, send: --no more--', req) conn.close() - res = res_ref() - if res is not None: - res.put('--no more--') - del res + _put(res_ref, _RESPONSE_NO_MORE) def check_raise_error(self): """ @@ -537,6 +580,10 @@ def check_raise_error(self): def execute(self, req, arg=None, res=None): """ `execute` calls are non-blocking: just queue up the request and return immediately. + + :param req: The request (an SQL command) + :param arg: Arguments to the SQL command + :param res: A queue in which to place responses as they become available """ self._wait_for_initialization() self.check_raise_error() @@ -549,7 +596,16 @@ def execute(self, req, arg=None, res=None): # so often. stack = traceback.extract_stack()[:-1] - self.reqs.put((req, arg or tuple(), res, stack)) + # + # We pass a weak reference to the response queue instead of a regular + # reference, because we want the queues to be garbage-collected + # more aggressively. + # + res_ref = None + if res: + res_ref = weakref.ref(res) + + self.reqs.put((req, arg or tuple(), res_ref, stack)) def executemany(self, req, items): for item in items: @@ -565,11 +621,11 @@ def select(self, req, arg=None): (`for res in self.select(): ...`), the entire result will be in memory. """ res = Queue() # results of the select will appear as items in this queue - self.execute(req, arg, weakref.ref(res)) + self.execute(req, arg, res) while True: rec = res.get() self.check_raise_error() - if rec == '--no more--': + if rec == _RESPONSE_NO_MORE: break yield rec @@ -586,10 +642,10 @@ def commit(self, blocking=True): # blocking=False. This ensures any available exceptions for any # previous statement are thrown before returning, and that the # data has actually persisted to disk! - self.select_one('--commit--') + self.select_one(_REQUEST_COMMIT) else: # otherwise, we fire and forget as usual. - self.execute('--commit--') + self.execute(_REQUEST_COMMIT) def close(self, force=False): if force: @@ -598,12 +654,12 @@ def close(self, force=False): # can't process the request. Instead, push the close command to the requests # queue directly. If run() is still alive, it will exit gracefully. If not, # then there's nothing we can do anyway. - self.reqs.put(('--close--', None, weakref.ref(Queue()), None)) + self.reqs.put((_REQUEST_CLOSE, None, weakref.ref(Queue()), None)) else: # we abuse 'select' to "iter" over a "--close--" statement so that we # can confirm the completion of close before joining the thread and # returning (by semaphore '--no more--' - self.select_one('--close--') + self.select_one(_REQUEST_CLOSE) self.join() def _wait_for_initialization(self):