diff --git a/src/controllers/webhooks.py b/src/controllers/webhooks.py index a46eca21..8f648a83 100644 --- a/src/controllers/webhooks.py +++ b/src/controllers/webhooks.py @@ -24,31 +24,31 @@ async def overseerr(request: Request) -> Dict[str, Any]: response = await request.json() if response.get("subject") == "Test Notification": logger.log("API", "Received test notification, Overseerr configured properly") - return {"status": "success"} + return {"success": True} req = OverseerrWebhook.model_validate(response) except (Exception, pydantic.ValidationError) as e: logger.error(f"Failed to process request: {e}") - return {"status": "error", "message": str(e)} + return {"success": False, "message": str(e)} imdb_id = get_imdbid_from_overseerr(req) if not imdb_id: logger.error(f"Failed to get imdb_id from Overseerr: {req.media.tmdbId}") - return {"status": "error", "message": "Failed to get imdb_id from Overseerr"} + return {"success": False, "message": "Failed to get imdb_id from Overseerr"} overseerr: Overseerr = request.app.program.all_services[Overseerr] if not overseerr.initialized: logger.error("Overseerr not initialized") - return {"status": "error", "message": "Overseerr not initialized"} + return {"success": False, "message": "Overseerr not initialized"} try: new_item = MediaItem({"imdb_id": imdb_id, "requested_by": "overseerr", "requested_id": req.request.request_id}) except Exception as e: logger.error(f"Failed to create item for {imdb_id}: {e}") - return {"status": "error", "message": str(e)} + return {"success": False, "message": str(e)} if _ensure_item_exists_in_db(new_item) or imdb_id in overseerr.recurring_items: logger.log("API", "Request already in queue or already exists in the database") - return {"status": "success"} + return {"success": True} else: overseerr.recurring_items.add(imdb_id) @@ -57,7 +57,7 @@ async def overseerr(request: Request) -> Dict[str, Any]: except Exception as e: logger.error(f"Failed to add item for {imdb_id}: {e}") - return {"status": "success"} + return {"success": True} def get_imdbid_from_overseerr(req: OverseerrWebhook) -> str: diff --git a/src/program/indexers/trakt.py b/src/program/indexers/trakt.py index 4790a83a..31a75a99 100644 --- a/src/program/indexers/trakt.py +++ b/src/program/indexers/trakt.py @@ -52,7 +52,7 @@ def copy_items(self, itema: MediaItem, itemb: MediaItem): logger.error(f"Item types {itema.type} and {itemb.type} do not match cant copy metadata") return itemb - def run(self, in_item: MediaItem) -> Generator[Union[Movie, Show, Season, Episode], None, None]: + def run(self, in_item: MediaItem, log_msg: bool = True) -> Generator[Union[Movie, Show, Season, Episode], None, None]: """Run the Trakt indexer for the given item.""" if not in_item: logger.error("Item is None") @@ -79,7 +79,8 @@ def run(self, in_item: MediaItem) -> Generator[Union[Movie, Show, Season, Episod item = self.copy_items(in_item, item) item.indexed_at = datetime.now() - logger.debug(f"Indexed IMDb id ({in_item.imdb_id}) as {item.type.title()}: {item.log_string}") + if log_msg: # used for mapping symlinks to database, need to hide this log message + logger.debug(f"Indexed IMDb id ({in_item.imdb_id}) as {item.type.title()}: {item.log_string}") yield item @staticmethod diff --git a/src/program/libraries/symlink.py b/src/program/libraries/symlink.py index 5b5563dc..adb58ab1 100644 --- a/src/program/libraries/symlink.py +++ b/src/program/libraries/symlink.py @@ -3,7 +3,7 @@ import time from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Generator from sqla_wrapper import Session @@ -13,7 +13,7 @@ from utils.logger import logger if TYPE_CHECKING: - from program.media.item import Movie, Show, Episode + from program.media.item import Movie, Show, Episode, MediaItem imdbid_pattern = re.compile(r"tt\d+") season_pattern = re.compile(r"s(\d+)") @@ -47,22 +47,25 @@ def validate(self) -> bool: return False return True - def run(self): + def run(self) -> list["MediaItem"]: """ Create a library from the symlink paths. Return stub items that should be fed into an Indexer to have the rest of the metadata filled in. """ from program.media.item import Movie + + items = [] for directory, item_type, is_anime in [("shows", "show", False), ("anime_shows", "anime show", True)]: if not self.settings.separate_anime_dirs and is_anime: continue - yield from process_shows(self.settings.library_path / directory, item_type, is_anime) + items.extend(process_shows(self.settings.library_path / directory, item_type, is_anime)) for directory, item_type, is_anime in [("movies", "movie", False), ("anime_movies", "anime movie", True)]: if not self.settings.separate_anime_dirs and is_anime: continue - yield from process_items(self.settings.library_path / directory, Movie, item_type, is_anime) + items.extend(process_items(self.settings.library_path / directory, Movie, item_type, is_anime)) + return items def process_items(directory: Path, item_class, item_type: str, is_anime: bool = False): """Process items in the given directory and yield MediaItem instances.""" @@ -110,7 +113,7 @@ def find_subtitles(item, path: Path): item.subtitles.append(Subtitle({lang_code: (path.parent / file).__str__()})) logger.debug(f"Found subtitle file {file}.") -def process_shows(directory: Path, item_type: str, is_anime: bool = False) -> "Show": +def process_shows(directory: Path, item_type: str, is_anime: bool = False) -> Generator["Show", None, None]: """Process shows in the given directory and yield Show instances.""" from program.media.item import Episode, Season, Show for show in os.listdir(directory): diff --git a/src/program/program.py b/src/program/program.py index 0138f13a..41c21f12 100644 --- a/src/program/program.py +++ b/src/program/program.py @@ -1,4 +1,5 @@ import asyncio +from concurrent.futures import ThreadPoolExecutor, as_completed import linecache import os import threading @@ -7,6 +8,9 @@ from queue import Empty from apscheduler.schedulers.background import BackgroundScheduler +from rich.console import Console +from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeRemainingColumn +from rich.live import Live from program.content import Listrr, Mdblist, Overseerr, PlexWatchlist, TraktContent from program.downloaders import Downloader @@ -140,27 +144,9 @@ def start(self): return run_migrations() + self._init_db_from_symlinks() with db.Session() as session: - res = session.execute(select(func.count(MediaItem._id))).scalar_one() - added = [] - if res == 0: - for item in self.services[SymlinkLibrary].run(): - if settings_manager.settings.map_metadata: - if isinstance(item, (Movie, Show)): - try: - item = next(self.services[TraktIndexer].run(item)) - except StopIteration as e: - logger.error(f"Failed to enhance metadata for {item.title} ({item.item_id}): {e}") - continue - if item.item_id in added: - logger.error(f"Cannot enhance metadata, {item.title} ({item.item_id}) contains multiple folders. Manual resolution required. Skipping.") - continue - added.append(item.item_id) - item.store_state() - session.add(item) - session.commit() - movies_symlinks = session.execute(select(func.count(Movie._id)).where(Movie.symlinked == True)).scalar_one() # noqa episodes_symlinks = session.execute(select(func.count(Episode._id)).where(Episode.symlinked == True)).scalar_one() # noqa total_symlinks = movies_symlinks + episodes_symlinks @@ -348,3 +334,100 @@ def stop(self): if hasattr(self, "scheduler") and self.scheduler.running: self.scheduler.shutdown(wait=False) logger.log("PROGRAM", "Riven has been stopped.") + + # def _init_db_from_symlinks(self): + # with db.Session() as session: + # res = session.execute(select(func.count(MediaItem._id))).scalar_one() + # added = [] + # if res == 0: + # logger.log("PROGRAM", "Collecting items from symlinks") + # items = self.services[SymlinkLibrary].run() + # logger.log("PROGRAM", f"Found {len(items)} symlinks to add to database") + # if settings_manager.settings.map_metadata: + # console = Console() + # progress = Progress( + # SpinnerColumn(), + # TextColumn("[progress.description]{task.description}"), + # BarColumn(), + # TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + # TimeRemainingColumn(), + # console=console, + # transient=True, + # ) + + # task = progress.add_task("Enriching items with metadata", total=len(items)) + # with Live(progress, console=console, refresh_per_second=10): + # for item in items: + # if isinstance(item, (Movie, Show)): + # try: + # enhanced_item = next(self.services[TraktIndexer].run(item, log_msg=False)) + # except StopIteration as e: + # logger.error(f"Failed to enhance metadata for {item.title} ({item.item_id}): {e}") + # continue + # if enhanced_item.item_id in added: + # logger.error(f"Cannot enhance metadata, {item.title} ({item.item_id}) contains multiple folders. Manual resolution required. Skipping.") + # continue + # added.append(enhanced_item.item_id) + # enhanced_item.store_state() + # session.add(enhanced_item) + # progress.update(task, advance=1) + # session.commit() + # logger.log("PROGRAM", "Database initialized") + + def _enhance_item(self, item: MediaItem) -> MediaItem | None: + try: + enhanced_item = next(self.services[TraktIndexer].run(item, log_msg=False)) + return enhanced_item + except StopIteration as e: + logger.error(f"Failed to enhance metadata for {item.title} ({item.item_id}): {e}") + return None + + def _init_db_from_symlinks(self): + with db.Session() as session: + res = session.execute(select(func.count(MediaItem._id))).scalar_one() + added = [] + errors = [] + if res == 0: + logger.log("PROGRAM", "Collecting items from symlinks") + items = self.services[SymlinkLibrary].run() + logger.log("PROGRAM", f"Found {len(items)} Movie and Show symlinks to add to database") + if settings_manager.settings.map_metadata: + console = Console() + progress = Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + TimeRemainingColumn(), + console=console, + transient=True, + ) + + task = progress.add_task("Enriching items with metadata", total=len(items)) + with Live(progress, console=console, refresh_per_second=10): + with ThreadPoolExecutor(max_workers=4) as executor: + future_to_item = {executor.submit(self._enhance_item, item): item for item in items if isinstance(item, (Movie, Show))} + for future in as_completed(future_to_item): + item = future_to_item[future] + try: + enhanced_item = future.result() + if enhanced_item: + if enhanced_item.item_id in added: + errors.append(f"Duplicate symlink found for {item.title} ({item.item_id}), skipping...") + continue + else: + added.append(enhanced_item.item_id) + enhanced_item.store_state() + session.add(enhanced_item) + except Exception as e: + errors.append(f"Error processing {item.title} ({item.item_id}): {e}") + finally: + progress.update(task, advance=1) + session.commit() + + if errors: + logger.error("Errors encountered during initialization") + for error in errors: + logger.error(error) + + logger.log("PROGRAM", "Database initialized") \ No newline at end of file diff --git a/src/program/state_transition.py b/src/program/state_transition.py index 6ba6891a..7670cf26 100644 --- a/src/program/state_transition.py +++ b/src/program/state_transition.py @@ -22,8 +22,8 @@ def process_event(existing_item: MediaItem | None, emitted_by: Service, item: Me no_further_processing: ProcessedEvent = (None, None, []) items_to_submit = [] - source_services = (Overseerr, PlexWatchlist, Listrr, Mdblist, SymlinkLibrary, TraktContent) - if emitted_by in source_services or item.last_state in [States.Requested]: + source_services = (Overseerr, PlexWatchlist, Listrr, Mdblist, SymlinkLibrary, TraktContent, "Manual") + if emitted_by in source_services or item.state in [States.Requested]: next_service = TraktIndexer if _imdb_exists_in_db(item.imdb_id) and item.last_state == States.Completed: logger.debug(f"Item {item.log_string} already exists in the database.") diff --git a/src/utils/event_manager.py b/src/utils/event_manager.py index 65765d09..d6dbaaf2 100644 --- a/src/utils/event_manager.py +++ b/src/utils/event_manager.py @@ -1,20 +1,20 @@ -import asyncio import concurrent.futures +import os +import traceback from datetime import datetime from queue import Empty -import os from threading import Lock -import time -import traceback -from subliminal import Episode, Movie -from program.db.db import db -from program.db.db_functions import _run_thread_with_db_item, _get_item_ids from loguru import logger -import utils.websockets.manager as ws_manager +from subliminal import Episode, Movie +import utils.websockets.manager as ws_manager +from program.db.db import db +from program.db.db_functions import _get_item_ids, _run_thread_with_db_item from program.media.item import Season, Show from program.types import Event + + class EventManager: """ Manages the execution of services and the handling of events. @@ -195,7 +195,6 @@ def next(self): Returns: Event: The next event in the queue. """ - start_time = time.time() while True: if self._queued_events: with self.mutex: