Skip to content

Commit

Permalink
feat: move symlink db init to progress bar. added threading to speed …
Browse files Browse the repository at this point in the history
…it up. needs testing!
  • Loading branch information
dreulavelle committed Sep 16, 2024
1 parent 797778c commit 71fb859
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 45 deletions.
14 changes: 7 additions & 7 deletions src/controllers/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,31 @@ async def overseerr(request: Request) -> Dict[str, Any]:
response = await request.json()
if response.get("subject") == "Test Notification":
logger.log("API", "Received test notification, Overseerr configured properly")
return {"status": "success"}
return {"success": True}
req = OverseerrWebhook.model_validate(response)
except (Exception, pydantic.ValidationError) as e:
logger.error(f"Failed to process request: {e}")
return {"status": "error", "message": str(e)}
return {"success": False, "message": str(e)}

imdb_id = get_imdbid_from_overseerr(req)
if not imdb_id:
logger.error(f"Failed to get imdb_id from Overseerr: {req.media.tmdbId}")
return {"status": "error", "message": "Failed to get imdb_id from Overseerr"}
return {"success": False, "message": "Failed to get imdb_id from Overseerr"}

overseerr: Overseerr = request.app.program.all_services[Overseerr]
if not overseerr.initialized:
logger.error("Overseerr not initialized")
return {"status": "error", "message": "Overseerr not initialized"}
return {"success": False, "message": "Overseerr not initialized"}

try:
new_item = MediaItem({"imdb_id": imdb_id, "requested_by": "overseerr", "requested_id": req.request.request_id})
except Exception as e:
logger.error(f"Failed to create item for {imdb_id}: {e}")
return {"status": "error", "message": str(e)}
return {"success": False, "message": str(e)}

if _ensure_item_exists_in_db(new_item) or imdb_id in overseerr.recurring_items:
logger.log("API", "Request already in queue or already exists in the database")
return {"status": "success"}
return {"success": True}
else:
overseerr.recurring_items.add(imdb_id)

Expand All @@ -57,7 +57,7 @@ async def overseerr(request: Request) -> Dict[str, Any]:
except Exception as e:
logger.error(f"Failed to add item for {imdb_id}: {e}")

return {"status": "success"}
return {"success": True}


def get_imdbid_from_overseerr(req: OverseerrWebhook) -> str:
Expand Down
5 changes: 3 additions & 2 deletions src/program/indexers/trakt.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def copy_items(self, itema: MediaItem, itemb: MediaItem):
logger.error(f"Item types {itema.type} and {itemb.type} do not match cant copy metadata")
return itemb

def run(self, in_item: MediaItem) -> Generator[Union[Movie, Show, Season, Episode], None, None]:
def run(self, in_item: MediaItem, log_msg: bool = True) -> Generator[Union[Movie, Show, Season, Episode], None, None]:
"""Run the Trakt indexer for the given item."""
if not in_item:
logger.error("Item is None")
Expand All @@ -79,7 +79,8 @@ def run(self, in_item: MediaItem) -> Generator[Union[Movie, Show, Season, Episod
item = self.copy_items(in_item, item)
item.indexed_at = datetime.now()

logger.debug(f"Indexed IMDb id ({in_item.imdb_id}) as {item.type.title()}: {item.log_string}")
if log_msg: # used for mapping symlinks to database, need to hide this log message
logger.debug(f"Indexed IMDb id ({in_item.imdb_id}) as {item.type.title()}: {item.log_string}")
yield item

@staticmethod
Expand Down
15 changes: 9 additions & 6 deletions src/program/libraries/symlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Generator

from sqla_wrapper import Session

Expand All @@ -13,7 +13,7 @@
from utils.logger import logger

if TYPE_CHECKING:
from program.media.item import Movie, Show, Episode
from program.media.item import Movie, Show, Episode, MediaItem

imdbid_pattern = re.compile(r"tt\d+")
season_pattern = re.compile(r"s(\d+)")
Expand Down Expand Up @@ -47,22 +47,25 @@ def validate(self) -> bool:
return False
return True

def run(self):
def run(self) -> list["MediaItem"]:
"""
Create a library from the symlink paths. Return stub items that should
be fed into an Indexer to have the rest of the metadata filled in.
"""
from program.media.item import Movie

items = []
for directory, item_type, is_anime in [("shows", "show", False), ("anime_shows", "anime show", True)]:
if not self.settings.separate_anime_dirs and is_anime:
continue
yield from process_shows(self.settings.library_path / directory, item_type, is_anime)
items.extend(process_shows(self.settings.library_path / directory, item_type, is_anime))

for directory, item_type, is_anime in [("movies", "movie", False), ("anime_movies", "anime movie", True)]:
if not self.settings.separate_anime_dirs and is_anime:
continue
yield from process_items(self.settings.library_path / directory, Movie, item_type, is_anime)
items.extend(process_items(self.settings.library_path / directory, Movie, item_type, is_anime))

return items

def process_items(directory: Path, item_class, item_type: str, is_anime: bool = False):
"""Process items in the given directory and yield MediaItem instances."""
Expand Down Expand Up @@ -110,7 +113,7 @@ def find_subtitles(item, path: Path):
item.subtitles.append(Subtitle({lang_code: (path.parent / file).__str__()}))
logger.debug(f"Found subtitle file {file}.")

def process_shows(directory: Path, item_type: str, is_anime: bool = False) -> "Show":
def process_shows(directory: Path, item_type: str, is_anime: bool = False) -> Generator["Show", None, None]:
"""Process shows in the given directory and yield Show instances."""
from program.media.item import Episode, Season, Show
for show in os.listdir(directory):
Expand Down
121 changes: 102 additions & 19 deletions src/program/program.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
import linecache
import os
import threading
Expand All @@ -7,6 +8,9 @@
from queue import Empty

from apscheduler.schedulers.background import BackgroundScheduler
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeRemainingColumn
from rich.live import Live

from program.content import Listrr, Mdblist, Overseerr, PlexWatchlist, TraktContent
from program.downloaders import Downloader
Expand Down Expand Up @@ -140,27 +144,9 @@ def start(self):
return

run_migrations()
self._init_db_from_symlinks()

with db.Session() as session:
res = session.execute(select(func.count(MediaItem._id))).scalar_one()
added = []
if res == 0:
for item in self.services[SymlinkLibrary].run():
if settings_manager.settings.map_metadata:
if isinstance(item, (Movie, Show)):
try:
item = next(self.services[TraktIndexer].run(item))
except StopIteration as e:
logger.error(f"Failed to enhance metadata for {item.title} ({item.item_id}): {e}")
continue
if item.item_id in added:
logger.error(f"Cannot enhance metadata, {item.title} ({item.item_id}) contains multiple folders. Manual resolution required. Skipping.")
continue
added.append(item.item_id)
item.store_state()
session.add(item)
session.commit()

movies_symlinks = session.execute(select(func.count(Movie._id)).where(Movie.symlinked == True)).scalar_one() # noqa
episodes_symlinks = session.execute(select(func.count(Episode._id)).where(Episode.symlinked == True)).scalar_one() # noqa
total_symlinks = movies_symlinks + episodes_symlinks
Expand Down Expand Up @@ -348,3 +334,100 @@ def stop(self):
if hasattr(self, "scheduler") and self.scheduler.running:
self.scheduler.shutdown(wait=False)
logger.log("PROGRAM", "Riven has been stopped.")

# def _init_db_from_symlinks(self):
# with db.Session() as session:
# res = session.execute(select(func.count(MediaItem._id))).scalar_one()
# added = []
# if res == 0:
# logger.log("PROGRAM", "Collecting items from symlinks")
# items = self.services[SymlinkLibrary].run()
# logger.log("PROGRAM", f"Found {len(items)} symlinks to add to database")
# if settings_manager.settings.map_metadata:
# console = Console()
# progress = Progress(
# SpinnerColumn(),
# TextColumn("[progress.description]{task.description}"),
# BarColumn(),
# TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
# TimeRemainingColumn(),
# console=console,
# transient=True,
# )

# task = progress.add_task("Enriching items with metadata", total=len(items))
# with Live(progress, console=console, refresh_per_second=10):
# for item in items:
# if isinstance(item, (Movie, Show)):
# try:
# enhanced_item = next(self.services[TraktIndexer].run(item, log_msg=False))
# except StopIteration as e:
# logger.error(f"Failed to enhance metadata for {item.title} ({item.item_id}): {e}")
# continue
# if enhanced_item.item_id in added:
# logger.error(f"Cannot enhance metadata, {item.title} ({item.item_id}) contains multiple folders. Manual resolution required. Skipping.")
# continue
# added.append(enhanced_item.item_id)
# enhanced_item.store_state()
# session.add(enhanced_item)
# progress.update(task, advance=1)
# session.commit()
# logger.log("PROGRAM", "Database initialized")

def _enhance_item(self, item: MediaItem) -> MediaItem | None:
try:
enhanced_item = next(self.services[TraktIndexer].run(item, log_msg=False))
return enhanced_item
except StopIteration as e:
logger.error(f"Failed to enhance metadata for {item.title} ({item.item_id}): {e}")
return None

def _init_db_from_symlinks(self):
with db.Session() as session:
res = session.execute(select(func.count(MediaItem._id))).scalar_one()
added = []
errors = []
if res == 0:
logger.log("PROGRAM", "Collecting items from symlinks")
items = self.services[SymlinkLibrary].run()
logger.log("PROGRAM", f"Found {len(items)} Movie and Show symlinks to add to database")
if settings_manager.settings.map_metadata:
console = Console()
progress = Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeRemainingColumn(),
console=console,
transient=True,
)

task = progress.add_task("Enriching items with metadata", total=len(items))
with Live(progress, console=console, refresh_per_second=10):
with ThreadPoolExecutor(max_workers=4) as executor:
future_to_item = {executor.submit(self._enhance_item, item): item for item in items if isinstance(item, (Movie, Show))}
for future in as_completed(future_to_item):
item = future_to_item[future]
try:
enhanced_item = future.result()
if enhanced_item:
if enhanced_item.item_id in added:
errors.append(f"Duplicate symlink found for {item.title} ({item.item_id}), skipping...")
continue
else:
added.append(enhanced_item.item_id)
enhanced_item.store_state()
session.add(enhanced_item)
except Exception as e:
errors.append(f"Error processing {item.title} ({item.item_id}): {e}")
finally:
progress.update(task, advance=1)
session.commit()

if errors:
logger.error("Errors encountered during initialization")
for error in errors:
logger.error(error)

logger.log("PROGRAM", "Database initialized")
4 changes: 2 additions & 2 deletions src/program/state_transition.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ def process_event(existing_item: MediaItem | None, emitted_by: Service, item: Me
no_further_processing: ProcessedEvent = (None, None, [])
items_to_submit = []

source_services = (Overseerr, PlexWatchlist, Listrr, Mdblist, SymlinkLibrary, TraktContent)
if emitted_by in source_services or item.last_state in [States.Requested]:
source_services = (Overseerr, PlexWatchlist, Listrr, Mdblist, SymlinkLibrary, TraktContent, "Manual")
if emitted_by in source_services or item.state in [States.Requested]:
next_service = TraktIndexer
if _imdb_exists_in_db(item.imdb_id) and item.last_state == States.Completed:
logger.debug(f"Item {item.log_string} already exists in the database.")
Expand Down
17 changes: 8 additions & 9 deletions src/utils/event_manager.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import asyncio
import concurrent.futures
import os
import traceback
from datetime import datetime
from queue import Empty
import os
from threading import Lock
import time
import traceback
from subliminal import Episode, Movie

from program.db.db import db
from program.db.db_functions import _run_thread_with_db_item, _get_item_ids
from loguru import logger
import utils.websockets.manager as ws_manager
from subliminal import Episode, Movie

import utils.websockets.manager as ws_manager
from program.db.db import db
from program.db.db_functions import _get_item_ids, _run_thread_with_db_item
from program.media.item import Season, Show
from program.types import Event


class EventManager:
"""
Manages the execution of services and the handling of events.
Expand Down Expand Up @@ -195,7 +195,6 @@ def next(self):
Returns:
Event: The next event in the queue.
"""
start_time = time.time()
while True:
if self._queued_events:
with self.mutex:
Expand Down

0 comments on commit 71fb859

Please sign in to comment.