Skip to content

Commit

Permalink
fix: tidied push_event_queue. this func has been causing looping issu…
Browse files Browse the repository at this point in the history
…es we're seeing.
  • Loading branch information
dreulavelle committed Jul 26, 2024
1 parent 87c3241 commit 5c7943d
Showing 1 changed file with 26 additions and 36 deletions.
62 changes: 26 additions & 36 deletions src/program/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,54 +235,44 @@ def _schedule_services(self) -> None:
logger.log("PROGRAM", f"Scheduled {service_cls.__name__} to run every {update_interval} seconds.")

def _id_in_queue(self, id):
if id is None:
return False
return any(i._id == id for i in self.queued_items)

def _id_in_running_items(self, id):
if id is None:
return False
return any(i._id == id for i in self.running_items)

def _push_event_queue(self, event) -> bool:
def _push_event_queue(self, event):
with self.mutex:
if event.item in self.queued_items or event.item in self.running_items:
logger.debug(f"Item {event.item.log_string} is already in the queue or running, skipping.")
return False

if hasattr(event.item, "_id") and event.item._id is not None:
if isinstance(event.item, Show):
for season in event.item.seasons:
if season._id and (self._id_in_queue(season._id) or self._id_in_running_items(season._id)):
logger.debug(f"Season {season.log_string} of show {event.item.log_string} is already in the queue or running, skipping.")
return False
for episode in season.episodes:
if episode._id and (self._id_in_queue(episode._id) or self._id_in_running_items(episode._id)):
logger.debug(f"Episode {episode.log_string} of season {season.log_string} is already in the queue or running, skipping.")
if (not event.item in self.queued_items and not event.item in self.running_items):
if hasattr(event.item, "_id"):
if event.item.type == "show":
for s in event.item.seasons:
if self._id_in_queue(s._id) or self._id_in_running_items(s._id):
return False
elif isinstance(event.item, Season):
for episode in event.item.episodes:
if self._id_in_queue(episode._id) or self._id_in_running_items(episode._id):
logger.debug(f"Episode {episode.log_string} of season {event.item.log_string} is already in the queue or running, skipping.")
for e in s.episodes:
if self._id_in_queue(e._id) or self._id_in_running_items(e._id):
return False

elif event.item.type == "season":
for e in event.item.episodes:
if self._id_in_queue(e._id) or self._id_in_running_items(e._id):
return False

elif hasattr(event.item, "parent"):
parent = event.item.parent
if self._id_in_queue(parent._id) or self._id_in_running_items(parent._id):
return False
elif hasattr(event.item, "parent"):
parent = event.item.parent
if self._id_in_queue(parent._id) or self._id_in_running_items(parent._id):
logger.debug(f"Parent {parent.log_string} of item {event.item.log_string} is already in the queue or running, skipping.")
return False
if hasattr(parent, "parent") and parent.parent:
grandparent = parent.parent
if self._id_in_queue(grandparent._id) or self._id_in_running_items(grandparent._id):
logger.debug(f"Grandparent {grandparent.log_string} of item {event.item.log_string} is already in the queue or running, skipping.")
elif hasattr(parent, "parent") and self._id_in_queue(parent.parent._id) or self._id_in_running_items(parent.parent._id):
return False

self.queued_items.append(event.item)
self.event_queue.put(event)

if not isinstance(event.item, (Show, Movie, Episode, Season)):
logger.log("NEW", f"Added {event.item.log_string} to the queue")
else:
logger.log("DISCOVERY", f"Re-added {event.item.log_string} to the queue" )
return True

logger.debug(f"Item {event.item.log_string} is already in the queue or running, skipping.")
return False

Expand All @@ -302,8 +292,11 @@ def add_to_running(self, item, service_name):
return
with self.mutex:
if item not in self.running_items:
self.running_items.append(item)
logger.log("PROGRAM", f"Item {item.log_string} started running section {service_name} with state {item.state.value}")
if isinstance(item, MediaItem) and not self._id_in_running_items(item._id):
self.running_items.append(item)
elif not isinstance(item, MediaItem):
self.running_items.append(item)
logger.log("PROGRAM", f"Item {item.log_string} started running section {service_name} with state {item.state.value}" )

def _process_future_item(self, future: Future, service: Service, orig_item: MediaItem) -> None:
"""Callback to add the results from a future emitted by a service to the event queue."""
Expand Down Expand Up @@ -409,13 +402,10 @@ def run(self):

if items_to_submit:
for item_to_submit in items_to_submit:
logger.debug(f"Submitting {item_to_submit.log_string} to {next_service.__name__}")
self.add_to_running(item_to_submit, next_service.__name__)
self._submit_job(next_service, item_to_submit)
if isinstance(existing_item, MediaItem):
logger.debug(f"Storing state of {existing_item.log_string}")
existing_item.store_state()
logger.debug(f"Committing changes to the database")
session.commit()

def stop(self):
Expand Down

0 comments on commit 5c7943d

Please sign in to comment.