Skip to content

Commit

Permalink
BUGFIX: Watch more than 1 post/question; Re-watch answers on reboot
Browse files Browse the repository at this point in the history
Resolves Charcoal-SE#11539 and Charcoal-SE#11540

autopull
  • Loading branch information
makyen committed Oct 27, 2024
1 parent 1c1e02e commit ac4f9d6
Showing 1 changed file with 72 additions and 34 deletions.
106 changes: 72 additions & 34 deletions deletionwatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@


PICKLE_FILENAME = "deletionIDs.p"
DELETION_WATCH_MIN_SECONDS = 7200


# noinspection PyClassHasNoInit,PyBroadException,PyMethodParameters
Expand All @@ -28,13 +29,14 @@ class DeletionWatcher:
def __init__(self):
if GlobalVars.no_deletion_watcher:
return
# posts is a dict with the WebSocket action as keys {site_id}-question-{question_id} as keys
# The values are a tuple: (post_id, post_site, post_type, post_url, [(callback, max_time)])
# NOTE: This is not sufficient, as it doesn't account for watching multiple posts per question.
# posts is a dict with the WebSocket action as keys: {site_id}-question-{question_id}
# The value of each is a dict with post IDs as the key.
# The value of each of those is a tuple:
# (post_id, post_site, post_type, post_url, question_id, max_watch_time, [(callback, max_callback_time)])
# Actions are added when a post is subscribed. They are removed when a WebSocket message is received
# indicating the first post subscribed for that question is deleted.
# Upon reboot, questions are not resubscribed to if the questions has either been deleted or the later
# of its creation or last edit date is >= 7200 seconds ago. Answers are never resubscribed to.
# Upon reboot, questions are not resubscribed to if the last subscription was more than
# DELETION_WATCH_MIN_SECONDS ago (currently 7200).
self.posts = {}
self.posts_lock = threading.RLock()

Expand All @@ -49,9 +51,16 @@ def __init__(self):
return

if datahandling.has_pickle(PICKLE_FILENAME):
pickle_data = datahandling.load_pickle(PICKLE_FILENAME)
for post_url in DeletionWatcher._check_batch(pickle_data):
self.subscribe(post_url)
pickle_info = datahandling.load_pickle(PICKLE_FILENAME)
if 'version' not in pickle_info:
# original pickle version
for post_url in DeletionWatcher._check_batch(pickle_info):
self.subscribe(post_url)
elif pickle_info['version'] == '2':
with self.posts_lock:
self.posts = pickle_info['posts']
self.expunge_expired_posts(False)
self._subscribe_to_all_saved_posts()

threading.Thread(name=self.__class__.__name__, target=self._start, daemon=True).start()

Expand All @@ -72,18 +81,18 @@ def _start(self):

if data["a"] == "post-deleted":
try:
post_id = str(data["aId"] if "aId" in data else data["qId"])
with self.posts_lock:
post_id, _, _, post_url, callbacks = self.posts[action]

if post_id == str(data["aId"] if "aId" in data else data["qId"]):
with self.posts_lock:
_, _, _, post_url, _, _, callbacks = self.posts[action][post_id]
del self.posts[action][post_id]
if len(self.posts[action]) == 0:
del self.posts[action]
Tasks.do(self._unsubscribe, action)
Tasks.do(metasmoke.Metasmoke.send_deletion_stats_for_post, post_url, True)

for callback, max_time in callbacks:
if not max_time or time.time() < max_time:
callback()
Tasks.do(self._unsubscribe, action)
Tasks.do(metasmoke.Metasmoke.send_deletion_stats_for_post, post_url, True)
now = time.time()
for callback, max_time in callbacks:
if not max_time or now < max_time:
callback()
except KeyError:
pass
except websocket.WebSocketException as e:
Expand All @@ -94,6 +103,21 @@ def _start(self):
self.connect_time = time.time()
self.hb_time = None

def expunge_expired_posts(self, unsubscribe=True):
now = time.time()
with self.posts_lock:
actions = list(self.posts.keys())
for action in actions:
post_ids = list(self.posts[action].keys())
for post_id in post_ids:
_, _, _, _, _, max_time, _ = self.posts[action][post_id]
if now > max_time:
del self.posts[action][post_id]
if len(self.posts[action]) == 0:
del self.posts[action]
if unsubscribe:
Tasks.do(self._unsubscribe, action)

def _subscribe_to_all_saved_posts(self):
with self.posts_lock:
for action in self.posts:
Expand All @@ -120,18 +144,25 @@ def subscribe(self, post_url, callback=None, timeout=None):
question_id = post_id

action = "{}-question-{}".format(site_id, question_id)
max_time = (time.time() + timeout) if timeout else None
now = time.time()
max_time = (now + timeout) if timeout else None

needs_subscribe = False
post_id = str(post_id)
with self.posts_lock:
if action not in self.posts:
self.posts[action] = (post_id, post_site, post_type, post_url,
[(callback, max_time)] if callback else [])
Tasks.do(self._subscribe, action)
elif callback:
_, _, _, _, callbacks = self.posts[action]
self.posts[action] = {}
needs_subscribe = True
callbacks = []
if post_id in self.posts[action]:
_, _, _, _, _, _, callbacks = self.posts[action][post_id]
if callback:
callbacks.append((callback, max_time))
else:
return
# This is fully replaced in order to update the max_watch_time
self.posts[action][post_id] = (post_id, post_site, post_type, post_url, question_id,
now + DELETION_WATCH_MIN_SECONDS, callbacks)
if needs_subscribe:
Tasks.do(self._subscribe, action)

def _subscribe(self, action):
if self.socket:
Expand All @@ -144,21 +175,28 @@ def _subscribe(self, action):
action))

def save(self):
# We save a copy of the self.posts data, but with the calbacks removed.
if GlobalVars.no_deletion_watcher:
return
pickle_output = {}
pickle_data = {}

with self.posts_lock:
for post_id, post_site, _, _, _ in self.posts.values():
if post_site not in pickle_output:
pickle_output[post_site] = [post_id]
else:
pickle_output[post_site].append(post_id)

for action in self.posts:
pickle_data[action] = {}
for post in self.posts[action]:
(post_id, post_site, post_type, post_url, question_id, max_time, _) = self.posts[action][post]
pickle_data[action][post] = (post_id, post_site, post_type, post_url, question_id, max_time, [])
pickle_output = {
'version': '2',
'posts': pickle_data,
}
datahandling.dump_pickle(PICKLE_FILENAME, pickle_output)

@staticmethod
def _check_batch(saved):
# This was used with version 1 of the pickle. Version 2 of the pickle was in development on 2024-10-27.
# Once some time has past (hours), this can be removed and handling of version one can change to ignoring
# it, as the max watch time will have elapsed and nothing from it would be used anyway.
if time.time() < DeletionWatcher.next_request_time:
time.sleep(DeletionWatcher.next_request_time - time.time())

Expand Down Expand Up @@ -188,7 +226,7 @@ def _check_batch(saved):

for post in response_data['items']:
compare_date = post["last_edit_date"] if "last_edit_date" in post else post["creation_date"]
if time.time() - compare_date < 7200:
if time.time() - compare_date < DELETION_WATCH_MIN_SECONDS:
yield to_protocol_relative(post["link"]).replace("/q/", "/questions/")

def _unsubscribe(self, action):
Expand Down

0 comments on commit ac4f9d6

Please sign in to comment.