diff --git a/.github/workflows/docker-build-dev.yml b/.github/workflows/docker-build-dev.yml index 6631e800..e4568475 100644 --- a/.github/workflows/docker-build-dev.yml +++ b/.github/workflows/docker-build-dev.yml @@ -3,7 +3,7 @@ name: Docker Build and Push Dev on: push: branches: - - main + - dev jobs: build-and-push-dev: diff --git a/.gitignore b/.gitignore index 80e556ba..a8d4f80a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ data/ logs/ settings.json +ignore.txt .vscode .git makefile diff --git a/src/controllers/default.py b/src/controllers/default.py index 0ea8ccbd..e967e49f 100644 --- a/src/controllers/default.py +++ b/src/controllers/default.py @@ -134,30 +134,4 @@ async def get_stats(_: Request): payload["incomplete_retries"] = incomplete_retries payload["states"] = states - return {"success": True, "data": payload} - -@router.get("/scrape/{item_id:path}") -async def scrape_item(item_id: str, request: Request): - with db.Session() as session: - item = DB._get_item_from_db(session, MediaItem({"imdb_id":str(item_id)})) - if item is None: - raise HTTPException(status_code=404, detail="Item not found") - - scraper = request.app.program.services.get(Scraping) - if scraper is None: - raise HTTPException(status_code=404, detail="Scraping service not found") - - time_now = time.time() - scraped_results = scraper.scrape(item, log=False) - time_end = time.time() - duration = time_end - time_now - - results = {} - for hash, torrent in scraped_results.items(): - results[hash] = { - "title": torrent.data.parsed_title, - "raw_title": torrent.raw_title, - "rank": torrent.rank, - } - - return {"success": True, "total": len(results), "duration": round(duration, 3), "results": results} \ No newline at end of file + return {"success": True, "data": payload} \ No newline at end of file diff --git a/src/controllers/items.py b/src/controllers/items.py index 171d593d..5f6258d3 100644 --- a/src/controllers/items.py +++ b/src/controllers/items.py @@ -1,14 +1,16 @@ -from typing import List, Optional +from copy import copy +from typing import Optional import Levenshtein import program.db.db_functions as DB from fastapi import APIRouter, HTTPException, Request from program.db.db import db -from program.media.item import Episode, MediaItem, Season +from program.downloaders import Downloader +from program.media.item import MediaItem from program.media.state import States -from program.symlink import Symlinker -from pydantic import BaseModel from sqlalchemy import func, select +from program.media.stream import Stream +from program.scrapers import Scraping from utils.logger import logger router = APIRouter( @@ -17,11 +19,6 @@ responses={404: {"description": "Not found"}}, ) - -class IMDbIDs(BaseModel): - imdb_ids: Optional[List[str]] = None - - @router.get("/states") async def get_states(): return { @@ -29,7 +26,6 @@ async def get_states(): "states": [state for state in States], } - @router.get( "", summary="Retrieve Media Items", @@ -43,6 +39,7 @@ async def get_items( state: Optional[str] = None, sort: Optional[str] = "desc", search: Optional[str] = None, + extended: Optional[bool] = False, ): if page < 1: raise HTTPException(status_code=400, detail="Page number must be 1 or greater.") @@ -85,9 +82,10 @@ async def get_items( if type not in ["movie", "show", "season", "episode"]: raise HTTPException( status_code=400, - detail=f"Invalid type: {type}. Valid types are: ['movie', 'show', 'season', 'episode']", - ) - query = query.where(MediaItem.type.in_(types)) + detail=f"Invalid type: {type}. Valid types are: ['movie', 'show', 'season', 'episode']") + else: + types=[type] + query = query.where(MediaItem.type.in_(types)) if sort and not search: if sort.lower() == "asc": @@ -108,7 +106,7 @@ async def get_items( return { "success": True, - "items": [item.to_dict() for item in items], + "items": [item.to_extended_dict() if extended else item.to_dict() for item in items], "page": page, "limit": limit, "total_items": total_items, @@ -116,16 +114,11 @@ async def get_items( } -@router.get("/extended/{item_id}") -async def get_extended_item_info(_: Request, item_id: str): - with db.Session() as session: - item = session.execute(select(MediaItem).where(MediaItem.imdb_id == item_id)).unique().scalar_one_or_none() - if item is None: - raise HTTPException(status_code=404, detail="Item not found") - return {"success": True, "item": item.to_extended_dict()} - - -@router.post("/add") +@router.post( + "/add", + summary="Add Media Items", + description="Add media items with bases on imdb IDs", +) async def add_items( request: Request, imdb_ids: str = None ): @@ -151,51 +144,108 @@ async def add_items( return {"success": True, "message": f"Added {len(valid_ids)} item(s) to the queue"} - -@router.delete("/remove") -async def remove_item( - _: Request, imdb_id: str +@router.post( + "/reset", + summary="Reset Media Items", + description="Reset media items with bases on item IDs", +) +async def reset_items( + request: Request, ids: str ): - if not imdb_id: - raise HTTPException(status_code=400, detail="No IMDb ID provided") - if DB._remove_item_from_db(imdb_id): - return {"success": True, "message": f"Removed item with imdb_id {imdb_id}"} - return {"success": False, "message": f"No item with imdb_id ({imdb_id}) found"} - - -@router.get("/imdb/{imdb_id}") -async def get_imdb_info( - _: Request, - imdb_id: str, - season: Optional[int] = None, - episode: Optional[int] = None, + ids = [int(id) for id in ids.split(",")] if "," in ids else [int(ids)] + if not ids: + raise HTTPException(status_code=400, detail="No item ID provided") + with db.Session() as session: + items = [] + for id in ids: + item = session.execute(select(MediaItem).where(MediaItem._id == id)).unique().scalar_one() + item.streams = session.execute(select(Stream).where(Stream.parent_id == item._id)).scalars().all() + items.append(item) + for item in items: + request.app.program._remove_from_running_events(item) + if item.type == "show": + for season in item.seasons: + for episode in season.episodes: + episode.reset() + season.reset() + elif item.type == "season": + for episode in item.episodes: + episode.reset() + item.reset() + + session.commit() + return {"success": True, "message": f"Reset items with id {ids}"} + +@router.post( + "/retry", + summary="Retry Media Items", + description="Retry media items with bases on item IDs", +) +async def retry_items( + request: Request, ids: str ): - """ - Get the item with the given IMDb ID. - If the season and episode are provided, get the item with the given season and episode. - """ + ids = [int(id) for id in ids.split(",")] if "," in ids else [int(ids)] + if not ids: + raise HTTPException(status_code=400, detail="No item ID provided") with db.Session() as session: - if season is not None and episode is not None: - item = session.execute( - select(Episode).where( - (Episode.imdb_id == imdb_id) & - (Episode.season_number == season) & - (Episode.episode_number == episode) - ) - ).scalar_one_or_none() - elif season is not None: - item = session.execute( - select(Season).where( - (Season.imdb_id == imdb_id) & - (Season.season_number == season) - ) - ).scalar_one_or_none() - else: - item = session.execute( - select(MediaItem).where(MediaItem.imdb_id == imdb_id) - ).scalar_one_or_none() - - if item is None: - raise HTTPException(status_code=404, detail="Item not found") - - return {"success": True, "item": item.to_extended_dict()} + items = [] + for id in ids: + items.append(session.execute(select(MediaItem).where(MediaItem._id == id)).unique().scalar_one()) + for item in items: + request.app.program._remove_from_running_events(item) + request.app.program.add_to_queue(item) + + return {"success": True, "message": f"Retried items with id {ids}"} + +@router.delete( + "", + summary="Remove Media Items", + description="Remove media items with bases on item IDs",) +async def remove_item( + _: Request, ids: str +): + ids = [int(id) for id in ids.split(",")] if "," in ids else [int(ids)] + if not ids: + raise HTTPException(status_code=400, detail="No item ID provided") + for id in ids: + DB._remove_item_from_db(id) + return {"success": True, "message": f"Removed item with id {id}"} + +# These require downloaders to be refactored + +# @router.get("/cached") +# async def manual_scrape(request: Request, ids: str): +# scraper = request.app.program.services.get(Scraping) +# downloader = request.app.program.services.get(Downloader).service +# if downloader.__class__.__name__ not in ["RealDebridDownloader", "TorBoxDownloader"]: +# raise HTTPException(status_code=400, detail="Only Real-Debrid is supported for manual scraping currently") +# ids = [int(id) for id in ids.split(",")] if "," in ids else [int(ids)] +# if not ids: +# raise HTTPException(status_code=400, detail="No item ID provided") +# with db.Session() as session: +# items = [] +# return_dict = {} +# for id in ids: +# items.append(session.execute(select(MediaItem).where(MediaItem._id == id)).unique().scalar_one()) +# if any(item for item in items if item.type in ["Season", "Episode"]): +# raise HTTPException(status_code=400, detail="Only shows and movies can be manually scraped currently") +# for item in items: +# new_item = item.__class__({}) +# # new_item.parent = item.parent +# new_item.copy(item) +# new_item.copy_other_media_attr(item) +# scraped_results = scraper.scrape(new_item, log=False) +# cached_hashes = downloader.get_cached_hashes(new_item, scraped_results) +# for hash, stream in scraped_results.items(): +# return_dict[hash] = {"cached": hash in cached_hashes, "name": stream.raw_title} +# return {"success": True, "data": return_dict} + +# @router.post("/download") +# async def download(request: Request, id: str, hash: str): +# downloader = request.app.program.services.get(Downloader).service +# with db.Session() as session: +# item = session.execute(select(MediaItem).where(MediaItem._id == id)).unique().scalar_one() +# item.reset(True) +# downloader.download_cached(item, hash) +# request.app.program.add_to_queue(item) +# return {"success": True, "message": f"Downloading {item.title} with hash {hash}"} \ No newline at end of file diff --git a/src/controllers/settings.py b/src/controllers/settings.py index 64de995e..eca74cc4 100644 --- a/src/controllers/settings.py +++ b/src/controllers/settings.py @@ -1,5 +1,5 @@ from copy import copy -from typing import Any, List +from typing import Any, Dict, List from fastapi import APIRouter, HTTPException from program.settings.manager import settings_manager @@ -65,6 +65,31 @@ async def get_settings(paths: str): } +@router.post("/set/all") +async def set_all_settings(new_settings: Dict[str, Any]): + current_settings = settings_manager.settings.model_dump() + + def update_settings(current_obj, new_obj): + for key, value in new_obj.items(): + if isinstance(value, dict) and key in current_obj: + update_settings(current_obj[key], value) + else: + current_obj[key] = value + + update_settings(current_settings, new_settings) + + # Validate and save the updated settings + try: + settings_manager.settings = settings_manager.settings.model_validate(current_settings) + await save_settings() + except Exception as e: + raise HTTPException(status_code=400, detail=str(e)) + + return { + "success": True, + "message": "All settings updated successfully!", + } + @router.post("/set") async def set_settings(settings: List[SetSettings]): current_settings = settings_manager.settings.model_dump() diff --git a/src/controllers/ws.py b/src/controllers/ws.py new file mode 100644 index 00000000..e601ebfd --- /dev/null +++ b/src/controllers/ws.py @@ -0,0 +1,53 @@ +import json +from loguru import logger +from fastapi import APIRouter, WebSocket, WebSocketDisconnect + +router = APIRouter( + prefix="/ws", + tags=["websocket"], + responses={404: {"description": "Not found"}}) + +class ConnectionManager: + def __init__(self): + self.active_connections: list[WebSocket] = [] + + async def connect(self, websocket: WebSocket): + await websocket.accept() + logger.debug("Frontend connected!") + self.active_connections.append(websocket) + await websocket.send_json({"type": "health", "status": "running"}) + + def disconnect(self, websocket: WebSocket): + logger.debug("Frontend disconnected!") + self.active_connections.remove(websocket) + + async def send_personal_message(self, message: str, websocket: WebSocket): + await websocket.send_text(message) + + async def send_log_message(self, message: str): + await self.broadcast({"type": "log", "message": message}) + + async def send_item_update(self, item: json): + await self.broadcast({"type": "item_update", "item": item}) + + async def broadcast(self, message: json): + for connection in self.active_connections: + try: + await connection.send_json(message) + except RuntimeError: + self.active_connections.remove(connection) + + +manager = ConnectionManager() + + +@router.websocket("") +async def websocket_endpoint(websocket: WebSocket): + await manager.connect(websocket) + try: + while True: + await websocket.receive_text() + except WebSocketDisconnect: + manager.disconnect(websocket) + except RuntimeError: + manager.disconnect(websocket) \ No newline at end of file diff --git a/src/main.py b/src/main.py index 3c482602..59a41221 100644 --- a/src/main.py +++ b/src/main.py @@ -10,6 +10,7 @@ from controllers.actions import router as actions_router from controllers.default import router as default_router from controllers.items import router as items_router +from controllers.ws import router as ws_router # from controllers.metrics import router as metrics_router from controllers.settings import router as settings_router @@ -86,6 +87,7 @@ async def dispatch(self, request: Request, call_next): app.include_router(webhooks_router) app.include_router(tmdb_router) app.include_router(actions_router) +app.include_router(ws_router) # app.include_router(metrics_router) diff --git a/src/program/db/db_functions.py b/src/program/db/db_functions.py index bb69dccd..a4d44e1b 100644 --- a/src/program/db/db_functions.py +++ b/src/program/db/db_functions.py @@ -61,10 +61,10 @@ def _get_item_from_db(session, item: MediaItem): logger.error(f"_get_item_from_db Failed to create item from type: {type}") return None -def _remove_item_from_db(imdb_id): +def _remove_item_from_db(id): try: with db.Session() as session: - item = session.execute(select(MediaItem).where(MediaItem.imdb_id == imdb_id)).unique().scalar_one() + item = session.execute(select(MediaItem).where(MediaItem._id == id)).unique().scalar_one() item_type = None if item.type == "movie": item_type = Movie @@ -98,52 +98,33 @@ def _run_thread_with_db_item(fn, service, program, input_item: MediaItem | None) if input_item is not None: with db.Session() as session: if isinstance(input_item, (Movie, Show, Season, Episode)): - item = input_item - if not _check_for_and_run_insertion_required(session, item): + if not _check_for_and_run_insertion_required(session, input_item): pass - item = _get_item_from_db(session, item) - - # session.merge(item) - for res in fn(item): - if isinstance(res, list): - all_media_items = True - for i in res: - if not isinstance(i, MediaItem): - all_media_items = False - - program._remove_from_running_items(item, service.__name__) - if all_media_items is True: - for i in res: - program._push_event_queue(Event(emitted_by="_run_thread_with_db_item", item=i)) - session.commit() - return - elif not isinstance(res, MediaItem): - logger.log("PROGRAM", f"Service {service.__name__} emitted {res} from input item {item} of type {type(res).__name__}, backing off.") - program._remove_from_running_items(item, service.__name__) - if res is not None and isinstance(res, MediaItem): - program._push_event_queue(Event(emitted_by=service, item=res)) - # self._check_for_and_run_insertion_required(item) - - item.store_state() + input_item = _get_item_from_db(session, input_item) + + for res in fn(input_item): + if not isinstance(res, MediaItem): + logger.log("PROGRAM", f"Service {service.__name__} emitted {res} from input item {input_item} of type {type(res).__name__}, backing off.") + program._remove_from_running_events(input_item, service.__name__) + + input_item.store_state() session.commit() session.expunge_all() - return res - for i in fn(input_item): - if isinstance(i, (Show, Movie, Season, Episode)): - with db.Session() as session: - _check_for_and_run_insertion_required(session, i) - program._push_event_queue(Event(emitted_by=service, item=i)) - yield i + yield res + else: + for i in fn(input_item): + if isinstance(i, (MediaItem)): + with db.Session() as session: + _check_for_and_run_insertion_required(session, i) + yield i return else: for i in fn(): if isinstance(i, (Show, Movie, Season, Episode)): with db.Session() as session: _check_for_and_run_insertion_required(session, i) - program._push_event_queue(Event(emitted_by=service, item=i)) - else: - program._push_event_queue(Event(emitted_by=service, item=i)) + yield i return def hard_reset_database(): diff --git a/src/program/downloaders/__init__.py b/src/program/downloaders/__init__.py index 67c3f809..c1a7dd9a 100644 --- a/src/program/downloaders/__init__.py +++ b/src/program/downloaders/__init__.py @@ -16,6 +16,10 @@ def __init__(self): AllDebridDownloader: AllDebridDownloader(), } self.initialized = self.validate() + + @property + def service(self): + return next(service for service in self.services.values() if service.initialized) def validate(self): initialized_services = [service for service in self.services.values() if service.initialized] @@ -25,12 +29,5 @@ def validate(self): return len(initialized_services) == 1 def run(self, item: MediaItem): - for service in self.services.values(): - if service.initialized: - downloaded = service.run(item) - if not downloaded: - if item.type == "show": - yield [season for season in item.seasons] - elif item.type == "season": - yield [episode for episode in item.episodes] + self.service.run(item) yield item \ No newline at end of file diff --git a/src/program/downloaders/shared.py b/src/program/downloaders/shared.py new file mode 100644 index 00000000..38dea689 --- /dev/null +++ b/src/program/downloaders/shared.py @@ -0,0 +1,144 @@ +import contextlib +from posixpath import splitext +from RTN import parse +from RTN.exceptions import GarbageTorrent + +from program.media.state import States + +WANTED_FORMATS = {".mkv", ".mp4", ".avi"} + +class FileFinder: + """ + A class that helps you find files. + + Attributes: + filename_attr (str): The name of the file attribute. + filesize_attr (str): The size of the file attribute. + min_filesize (int): The minimum file size. + max_filesize (int): The maximum file size. + """ + + def __init__(self, name, size, min, max): + self.filename_attr = name + self.filesize_attr = size + self.min_filesize = min + self.max_filesize = max + + def find_required_files(self, item, container): + """ + Find the required files based on the given item and container. + + Args: + item (Item): The item object representing the movie, show, season, or episode. + container (list): The list of files to search through. + + Returns: + list: A list of files that match the criteria based on the item type. + Returns an empty list if no files match the criteria. + + """ + files = [ + file + for file in container + if file and self.min_filesize < file[self.filesize_attr] < self.max_filesize + and file[self.filesize_attr] > 10000 + and splitext(file[self.filename_attr].lower())[1] in WANTED_FORMATS + ] + return_files = [] + + if not files: + return [] + + if item.type == "movie": + for file in files: + with contextlib.suppress(GarbageTorrent, TypeError): + parsed_file = parse(file[self.filename_attr], remove_trash=True) + if parsed_file.type == "movie": + return_files.append(file) + if item.type == "show": + needed_episodes = {} + acceptable_states = [States.Indexed, States.Scraped, States.Unknown, States.Failed, States.PartiallyCompleted] + + for season in item.seasons: + if season.state in acceptable_states and season.is_released_nolog: + needed_episode_numbers = {episode.number for episode in season.episodes if episode.state in acceptable_states and episode.is_released_nolog} + if needed_episode_numbers: + needed_episodes[season.number] = needed_episode_numbers + + if not any(needed_episodes.values()): + return return_files + + matched_files = {} + one_season = len(item.seasons) == 1 + + for file in files: + with contextlib.suppress(GarbageTorrent, TypeError): + parsed_file = parse(file[self.filename_attr], remove_trash=True) + if not parsed_file or not parsed_file.episode or 0 in parsed_file.season: + continue + + # Check each season and episode to find a match + for season_number, episodes in needed_episodes.items(): + if one_season or season_number in parsed_file.season: + for episode_number in parsed_file.episode: + if episode_number in episodes: + # Store the matched file for this episode + matched_files.setdefault((season_number, episode_number), []).append(file) + + # total_needed_episodes = sum(len(episodes) for episodes in needed_episodes.values()) + # matched_episodes = sum(len(files) for files in matched_files.values()) + + if set(needed_episodes).issubset(matched_files): + for key, files in matched_files.items(): + season_number, episode_number = key + for file in files: + if not file or "sample" in file[self.filename_attr].lower(): + continue + return_files.append(file) + + if item.type == "season": + acceptable_states = [States.Indexed, States.Scraped, States.Unknown, States.Failed, States.PartiallyCompleted] + needed_episodes = [] + for episode in item.episodes: + if episode.state in acceptable_states and episode.is_released_nolog: + needed_episodes.append(episode.number) + + if not needed_episodes: + return return_files + + matched_files = {} + one_season = len(item.parent.seasons) == 1 + + for file in files: + with contextlib.suppress(GarbageTorrent, TypeError): + parsed_file = parse(file[self.filename_attr], remove_trash=True) + if not parsed_file or not parsed_file.episode or 0 in parsed_file.season: + continue + + if one_season or item.number in parsed_file.season: + for episode_number in parsed_file.episode: + if episode_number in needed_episodes: + matched_files.setdefault(episode_number, []).append(file) + + matched_episodes = sum(len(files) for files in matched_files.values()) + + if set(needed_episodes).issubset(matched_files): + for files in matched_files.values(): + for file in files: + if not file or "sample" in file[self.filename_attr].lower(): + continue + return_files.append(file) + + if item.type == "episode": + for file in files: + if not file or not file.get(self.filename_attr): + continue + with contextlib.suppress(GarbageTorrent, TypeError): + parsed_file = parse(file[self.filename_attr], remove_trash=True) + if ( + item.number in parsed_file.episode + and item.parent.number in parsed_file.season + ): + return_files.append(file) + + return return_files diff --git a/src/program/downloaders/torbox.py b/src/program/downloaders/torbox.py index 6c739954..e742aa27 100644 --- a/src/program/downloaders/torbox.py +++ b/src/program/downloaders/torbox.py @@ -6,6 +6,7 @@ from program.media.item import MediaItem from program.media.state import States +from program.media.stream import Stream from program.settings.manager import settings_manager from requests import ConnectTimeout from RTN import parse @@ -92,10 +93,21 @@ def run(self, item: MediaItem) -> bool: logger.log("DEBRID", f"Item is not cached: {item.log_string}") for stream in item.streams: logger.log( - "DEBUG", f"Blacklisting hash ({stream.infohash}) for item: {item.log_string}" + "DEBUG", f"Blacklisting uncached hash ({stream.infohash}) for item: {item.log_string}" ) stream.blacklisted = True return return_value + + def get_cached_hashes(self, item: MediaItem, streams: list[str]) -> list[str]: + """Check if the item is cached in torbox.app""" + cached_hashes = self.get_torrent_cached(streams) + return {stream: cached_hashes[stream]["files"] for stream in streams if stream in cached_hashes} + + def download_cached(self, item: MediaItem, stream: str) -> None: + """Download the cached item from torbox.app""" + cache = self.get_torrent_cached([stream])[stream] + item.active_stream = cache + self.download(item) def find_required_files(self, item, container): diff --git a/src/program/indexers/trakt.py b/src/program/indexers/trakt.py index 2713e6f5..3b41f674 100644 --- a/src/program/indexers/trakt.py +++ b/src/program/indexers/trakt.py @@ -22,22 +22,22 @@ def __init__(self): self.initialized = True self.settings = settings_manager.settings.indexer + def copy_attributes(self, source, target): + """Copy attributes from source to target.""" + attributes = ["file", "folder", "update_folder", "symlinked", "is_anime", "symlink_path"] + for attr in attributes: + target.set(attr, getattr(source, attr, None)) + def copy_items(self, itema: MediaItem, itemb: MediaItem): + """Copy attributes from itema to itemb recursively.""" if isinstance(itema, Show) and isinstance(itemb, Show): - for (seasona, seasonb) in zip(itema.seasons, itemb.seasons): - for (episodea, episodeb) in zip(seasona.episodes, seasonb.episodes): - episodeb.set("update_folder", episodea.update_folder) - episodeb.set("symlinked", episodea.symlinked) - episodeb.set("file", episodea.file) - episodeb.set("folder", episodea.folder) - seasonb.set("is_anime", itema.is_anime) - episodeb.set("is_anime", itema.is_anime) - elif isinstance(itema, Movie) and isinstance(itemb, Movie): - itemb.set("file", itema.file) - itemb.set("folder", itema.folder) - itemb.set("update_folder", itema.update_folder) - itemb.set("symlinked", itema.symlinked) + for seasona, seasonb in zip(itema.seasons, itemb.seasons): + for episodea, episodeb in zip(seasona.episodes, seasonb.episodes): + self.copy_attributes(episodea, episodeb) + seasonb.set("is_anime", itema.is_anime) itemb.set("is_anime", itema.is_anime) + elif isinstance(itema, Movie) and isinstance(itemb, Movie): + self.copy_attributes(itema, itemb) return itemb def run(self, in_item: MediaItem) -> Generator[Union[Movie, Show, Season, Episode], None, None]: diff --git a/src/program/libraries/symlink.py b/src/program/libraries/symlink.py index 85bcd0b8..e6b683f2 100644 --- a/src/program/libraries/symlink.py +++ b/src/program/libraries/symlink.py @@ -84,6 +84,7 @@ def resolve_symlink_and_set_attrs(item, path: Path) -> Path: resolved_path = (path).resolve() item.set("file", str(resolved_path.stem)) item.set("folder", str(resolved_path.parent.stem)) + item.set("symlink_path", str(path)) def process_shows(directory: Path, item_type: str, is_anime: bool = False) -> Show: """Process shows in the given directory and yield Show instances.""" diff --git a/src/program/media/item.py b/src/program/media/item.py index d8f4f737..a3b03226 100644 --- a/src/program/media/item.py +++ b/src/program/media/item.py @@ -1,6 +1,9 @@ """MediaItem class""" from datetime import datetime +import json +from pathlib import Path from typing import List, Optional, Self +import asyncio import sqlalchemy from program.db.db import db @@ -8,6 +11,7 @@ from RTN import parse from sqlalchemy.orm import Mapped, mapped_column, relationship from .stream import Stream +from controllers.ws import manager from utils.logger import logger @@ -24,12 +28,13 @@ class MediaItem(db.Model): indexed_at: Mapped[Optional[datetime]] = mapped_column(sqlalchemy.DateTime, nullable=True) scraped_at: Mapped[Optional[datetime]] = mapped_column(sqlalchemy.DateTime, nullable=True) scraped_times: Mapped[Optional[int]] = mapped_column(sqlalchemy.Integer, default=0) - active_stream: Mapped[Optional[dict[str, str]]] = mapped_column(sqlalchemy.JSON, nullable=True) + active_stream: Mapped[Optional[dict[str]]] = mapped_column(sqlalchemy.JSON, nullable=True) streams: Mapped[List[Stream]] = relationship("Stream", back_populates='parent', lazy="select", cascade="all, delete-orphan") - blacklisted_streams: Mapped[Optional[List[Stream]]] = mapped_column(sqlalchemy.JSON, nullable=True) + blacklisted_streams: Mapped[Optional[List[Stream]]] = mapped_column(sqlalchemy.JSON, default=[]) symlinked: Mapped[Optional[bool]] = mapped_column(sqlalchemy.Boolean, default=False) symlinked_at: Mapped[Optional[datetime]] = mapped_column(sqlalchemy.DateTime, nullable=True) symlinked_times: Mapped[Optional[int]] = mapped_column(sqlalchemy.Integer, default=0) + symlink_path: Mapped[Optional[str]] = mapped_column(sqlalchemy.String, nullable=True) file: Mapped[Optional[str]] = mapped_column(sqlalchemy.String, nullable=True) folder: Mapped[Optional[str]] = mapped_column(sqlalchemy.String, nullable=True) alternative_folder: Mapped[Optional[str]] = mapped_column(sqlalchemy.String, nullable=True) @@ -99,7 +104,10 @@ def __init__(self, item: dict) -> None: self.overseerr_id = item.get("overseerr_id") def store_state(self) -> None: + if self.last_state != self._determine_state().name: + asyncio.run(manager.send_item_update(json.dumps(self.to_dict()))) self.last_state = self._determine_state().name + @property def is_released(self) -> bool: @@ -159,13 +167,13 @@ def copy_other_media_attr(self, other): def is_scraped(self): return (len(self.streams) > 0 - and - all(stream.blacklisted == False for stream in self.streams)) + and + any(stream.blacklisted == False for stream in self.streams)) def to_dict(self): """Convert item to dictionary (API response)""" return { - "item_id": str(self.item_id), + "id": str(self._id), "title": self.title, "type": self.__class__.__name__, "imdb_id": self.imdb_id if hasattr(self, "imdb_id") else None, @@ -173,13 +181,13 @@ def to_dict(self): "tmdb_id": self.tmdb_id if hasattr(self, "tmdb_id") else None, "state": self.state.value, "imdb_link": self.imdb_link if hasattr(self, "imdb_link") else None, - "aired_at": self.aired_at, + "aired_at": str(self.aired_at), "genres": self.genres if hasattr(self, "genres") else None, "is_anime": self.is_anime if hasattr(self, "is_anime") else False, "guid": self.guid, "requested_at": str(self.requested_at), "requested_by": self.requested_by, - "scraped_at": self.scraped_at, + "scraped_at": str(self.scraped_at), "scraped_times": self.scraped_times, } @@ -205,6 +213,8 @@ def to_extended_dict(self, abbreviated_children=False): dict["active_stream"] = ( self.active_stream if hasattr(self, "active_stream") else None ) + dict["streams"] = self.streams if hasattr(self, "streams") else None + dict["number"] = self.number if hasattr(self, "number") else None dict["symlinked"] = self.symlinked if hasattr(self, "symlinked") else None dict["symlinked_at"] = ( self.symlinked_at if hasattr(self, "symlinked_at") else None @@ -218,6 +228,7 @@ def to_extended_dict(self, abbreviated_children=False): ) dict["file"] = self.file if hasattr(self, "file") else None dict["folder"] = self.folder if hasattr(self, "folder") else None + dict["symlink_path"] = self.symlink_path if hasattr(self, "symlink_path") else None return dict def __iter__(self): @@ -225,8 +236,8 @@ def __iter__(self): yield attr def __eq__(self, other): - if isinstance(other, type(self)): - return self.imdb_id == other.imdb_id + if type(other) == type(self): + return self._id == other._id return False def copy(self, other): @@ -258,6 +269,34 @@ def get_top_title(self) -> str: def __hash__(self): return hash(self.item_id) + + def reset(self, reset_times: bool = True): + """Reset item attributes for rescraping.""" + if self.symlink_path: + if Path(self.symlink_path).exists(): + Path(self.symlink_path).unlink() + self.set("symlink_path", None) + + self.set("file", None) + self.set("folder", None) + self.set("alternative_folder", None) + + if hasattr(self, "active_stream"): + stream: Stream = next((stream for stream in self.streams if stream.infohash == self.active_stream), None) + if stream: + stream.blacklisted = True + + self.set("active_stream", None) + self.set("symlinked", False) + self.set("symlinked_at", None) + self.set("update_folder", None) + self.set("scraped_at", None) + + if reset_times: + self.set("symlinked_times", 0) + self.set("scraped_times", 0) + + logger.debug(f"Item {self.log_string} reset for rescraping") @property def log_string(self): @@ -322,17 +361,17 @@ def get_season_index_by_id(self, item_id): def _determine_state(self): if all(season.state == States.Completed for season in self.seasons): return States.Completed + if any( + season.state in (States.Completed, States.PartiallyCompleted) + for season in self.seasons + ): + return States.PartiallyCompleted if all(season.state == States.Symlinked for season in self.seasons): return States.Symlinked if all(season.state == States.Downloaded for season in self.seasons): return States.Downloaded if self.is_scraped(): return States.Scraped - if any( - season.state in (States.Completed, States.PartiallyCompleted) - for season in self.seasons - ): - return States.PartiallyCompleted if any(season.state == States.Indexed for season in self.seasons): return States.Indexed if any(season.state == States.Requested for season in self.seasons): @@ -342,6 +381,8 @@ def _determine_state(self): def store_state(self) -> None: for season in self.seasons: season.store_state() + if self.last_state != self._determine_state().name: + asyncio.run(manager.send_item_update(json.dumps(self.to_dict()))) self.last_state = self._determine_state().name def __repr__(self): @@ -414,6 +455,8 @@ class Season(MediaItem): def store_state(self) -> None: for episode in self.episodes: episode.store_state() + if self.last_state != self._determine_state().name: + asyncio.run(manager.send_item_update(json.dumps(self.to_dict()))) self.last_state = self._determine_state().name def __init__(self, item): @@ -430,14 +473,14 @@ def _determine_state(self): if len(self.episodes) > 0: if all(episode.state == States.Completed for episode in self.episodes): return States.Completed + if any(episode.state == States.Completed for episode in self.episodes): + return States.PartiallyCompleted if all(episode.state == States.Symlinked for episode in self.episodes): return States.Symlinked if all(episode.file and episode.folder for episode in self.episodes): return States.Downloaded if self.is_scraped(): return States.Scraped - if any(episode.state == States.Completed for episode in self.episodes): - return States.PartiallyCompleted if any(episode.state == States.Indexed for episode in self.episodes): return States.Indexed if any(episode.state == States.Requested for episode in self.episodes): @@ -448,13 +491,6 @@ def _determine_state(self): def is_released(self) -> bool: return any(episode.is_released for episode in self.episodes) - def __eq__(self, other): - if ( - type(self) == type(other) - and self.parent_id == other.parent_id - ): - return self.number == other.get("number", None) - def __repr__(self): return f"Season:{self.number}:{self.state.name}" @@ -526,14 +562,6 @@ def __init__(self, item): if self.parent and isinstance(self.parent, Season): self.is_anime = self.parent.parent.is_anime - def __eq__(self, other): - if ( - type(self) == type(other) - and self.item_id == other.item_id - and self.parent.parent.item_id == other.parent.parent.item_id - ): - return self.number == other.get("number", None) - def __repr__(self): return f"Episode:{self.number}:{self.state.name}" diff --git a/src/program/program.py b/src/program/program.py index 90f6ccba..8f8ee9df 100644 --- a/src/program/program.py +++ b/src/program/program.py @@ -45,8 +45,8 @@ def __init__(self, args): self.initialized = False self.event_queue = Queue() self.services = {} - self.queued_items = [] - self.running_items = [] + self.queued_events = [] + self.running_events = [] self.mutex = Lock() self.enable_trace = settings_manager.settings.tracemalloc self.sql_Session = db.Session @@ -208,7 +208,7 @@ def _retry_library(self) -> None: ).unique().scalars().all() for item in items_to_submit: - self._push_event_queue(Event(emitted_by=self.__class__, item=item)) + self._push_event_queue(Event(emitted_by="RetryLibrary", item=item)) def _schedule_functions(self) -> None: """Schedule each service based on its update interval.""" @@ -252,85 +252,89 @@ def _schedule_services(self) -> None: logger.log("PROGRAM", f"Scheduled {service_cls.__name__} to run every {update_interval} seconds.") def _id_in_queue(self, id): - return any(i._id == id for i in self.queued_items) + return any(event.item._id == id for event in self.queued_events) - def _id_in_running_items(self, id): - return any(i._id == id for i in self.running_items) + def _id_in_running_events(self, id): + return any(event.item._id == id for event in self.running_events) def _push_event_queue(self, event): with self.mutex: - if event.item in self.queued_items or event.item in self.running_items: - logger.debug(f"Item {event.item.log_string} is already in the queue or running, skipping.") + if any(qi.item.imdb_id == event.item.imdb_id and qi.emitted_by == event.emitted_by for qi in self.queued_events): + logger.debug(f"Item {event.item.log_string} is already in the queue, skipping.") + return False + elif any(ri.item.imdb_id == event.item.imdb_id and ri.emitted_by == event.emitted_by for ri in self.running_events): + logger.debug(f"Item {event.item.log_string} is already running, skipping.") return False if isinstance(event.item, MediaItem) and hasattr(event.item, "_id"): if event.item.type == "show": for s in event.item.seasons: - if self._id_in_queue(s._id) or self._id_in_running_items(s._id): + if self._id_in_queue(s._id) or self._id_in_running_events(s._id): return False for e in s.episodes: - if self._id_in_queue(e._id) or self._id_in_running_items(e._id): + if self._id_in_queue(e._id) or self._id_in_running_events(e._id): return False elif event.item.type == "season": for e in event.item.episodes: - if self._id_in_queue(e._id) or self._id_in_running_items(e._id): + if self._id_in_queue(e._id) or self._id_in_running_events(e._id): return False elif hasattr(event.item, "parent"): parent = event.item.parent - if self._id_in_queue(parent._id) or self._id_in_running_items(parent._id): + if self._id_in_queue(parent._id) or self._id_in_running_events(parent._id): return False - elif hasattr(parent, "parent") and (self._id_in_queue(parent.parent._id) or self._id_in_running_items(parent.parent._id)): + elif hasattr(parent, "parent") and (self._id_in_queue(parent.parent._id) or self._id_in_running_events(parent.parent._id)): return False if not isinstance(event.item, (Show, Movie, Episode, Season)): logger.log("NEW", f"Added {event.item.log_string} to the queue") else: logger.log("DISCOVERY", f"Re-added {event.item.log_string} to the queue") - - event.item = copy_item(event.item) - self.queued_items.append(event.item) + if not isinstance(event.item, MediaItem): + event.item = copy_item(event.item) + self.queued_events.append(event) self.event_queue.put(event) return True def _pop_event_queue(self, event): with self.mutex: # DB._store_item(event.item) # possibly causing duplicates - self.queued_items.remove(event.item) + self.queued_events.remove(event) - def _remove_from_running_items(self, item, service_name=""): + def _remove_from_running_events(self, item, service_name=""): with self.mutex: - if item in self.running_items: - self.running_items.remove(item) + event = next((event for event in self.running_events if event.item._id == item._id), None) + if event: + self.running_events.remove(event) logger.log("PROGRAM", f"Item {item.log_string} finished running section {service_name}" ) - def add_to_running(self, item, service_name): - if item is None: + def add_to_running(self, e): + if e.item is None: return with self.mutex: - if item not in self.running_items: - if isinstance(item, MediaItem) and not self._id_in_running_items(item._id): - self.running_items.append(copy_item(item)) - elif not isinstance(item, MediaItem): - self.running_items.append(copy_item(item)) - logger.log("PROGRAM", f"Item {item.log_string} started running section {service_name}" ) + if all(event.item._id != e.item._id for event in self.running_events): + if isinstance(e.item, MediaItem) and not self._id_in_running_events(e.item._id): + self.running_events.append(e) + elif not isinstance(e.item, MediaItem): + self.running_events.append(e) + logger.log("PROGRAM", f"Item {e.item.log_string} started running section { e.emitted_by.__name__ if type(e.emitted_by) != str else e.emitted_by}" ) def _process_future_item(self, future: Future, service: Service, orig_item: MediaItem) -> None: """Callback to add the results from a future emitted by a service to the event queue.""" try: - for _item in future.result(): - pass + for i in future.result(): + if i is not None: + self._remove_from_running_events(i, service.__name__) + self._push_event_queue(Event(emitted_by=service, item=i)) if orig_item is not None: logger.log("PROGRAM", f"Service {service.__name__} finished running on {orig_item.log_string}") - else: - logger.log("PROGRAM", f"Service {service.__name__} finished running.") except TimeoutError: logger.debug("Service {service.__name__} timeout waiting for result on {orig_item.log_string}") - self._remove_from_running_items(orig_item, service.__name__) + self._remove_from_running_events(orig_item, service.__name__) except Exception: logger.exception(f"Service {service.__name__} failed with exception {traceback.format_exc()}") - self._remove_from_running_items(orig_item, service.__name__) + self._remove_from_running_events(orig_item, service.__name__) def _submit_job(self, service: Service, item: MediaItem | None) -> None: if item and service: @@ -402,7 +406,7 @@ def run(self): event: Event = self.event_queue.get(timeout=10) if self.enable_trace: self.dump_tracemalloc() - self.add_to_running(event.item, "program.run") + self.add_to_running(event) self._pop_event_queue(event) except Empty: if self.enable_trace: @@ -411,25 +415,25 @@ def run(self): with db.Session() as session: existing_item: MediaItem | None = DB._get_item_from_db(session, event.item) - updated_item, next_service, items_to_submit = process_event( - existing_item, event.emitted_by, existing_item if existing_item is not None else event.item + processed_item, next_service, items_to_submit = process_event( + existing_item, event.emitted_by, event.item ) - if updated_item and isinstance(existing_item, MediaItem) and updated_item.state == States.Symlinked: - if updated_item.type in ["show", "movie"]: - logger.success(f"Item has been completed: {updated_item.log_string}") + if processed_item and processed_item.state == States.Completed: + if processed_item.type in ["show", "movie"]: + logger.success(f"Item has been completed: {processed_item.log_string}") if settings_manager.settings.notifications.enabled: - notify_on_complete(updated_item) + notify_on_complete(processed_item) - self._remove_from_running_items(event.item, "program.run") + self._remove_from_running_events(event.item, "program.run") if items_to_submit: for item_to_submit in items_to_submit: - self.add_to_running(item_to_submit, next_service.__name__) + self.add_to_running(Event(next_service.__name__, item_to_submit)) self._submit_job(next_service, item_to_submit) - if isinstance(existing_item, MediaItem): - existing_item.store_state() + if isinstance(processed_item, MediaItem): + processed_item.store_state() session.commit() def stop(self): @@ -446,9 +450,10 @@ def stop(self): self.scheduler.shutdown(wait=False) logger.log("PROGRAM", "Riven has been stopped.") - def add_to_queue(self, item: MediaItem) -> bool: + def add_to_queue(self, item: MediaItem, emitted_by="Manual") -> bool: """Add item to the queue for processing.""" - return self._push_event_queue(Event(emitted_by=self.__class__, item=item)) + logger.log("PROGRAM", f"Adding {item.log_string} to the queue.") + return self._push_event_queue(Event(emitted_by=emitted_by, item=item)) def clear_queue(self): """Clear the event queue.""" diff --git a/src/program/scrapers/__init__.py b/src/program/scrapers/__init__.py index af6656a0..a40dd67c 100644 --- a/src/program/scrapers/__init__.py +++ b/src/program/scrapers/__init__.py @@ -71,26 +71,16 @@ def partial_state(self, item: MediaItem) -> bool: def run(self, item: Union[Show, Season, Episode, Movie]) -> Generator[Union[Show, Season, Episode, Movie], None, None]: """Scrape an item.""" - if not item or not self.can_we_scrape(item): - yield self.yield_incomplete_children(item) - return - - partial_state = self.partial_state(item) - if partial_state is not False: - yield partial_state - return - - sorted_streams = self.scrape(item) - for stream in sorted_streams.values(): - if stream not in item.streams: - item.streams.append(stream) - item.set("scraped_at", datetime.now()) - item.set("scraped_times", item.scraped_times + 1) - - if not item.get("streams", {}): + if self.can_we_scrape(item): + sorted_streams = self.scrape(item) + for stream in sorted_streams.values(): + if stream not in item.streams: + item.streams.append(stream) + item.set("scraped_at", datetime.now()) + item.set("scraped_times", item.scraped_times + 1) + + if not item.get("streams", []): logger.log("NOT_FOUND", f"Scraping returned no good results for {item.log_string}") - yield self.yield_incomplete_children(item) - return yield item @@ -137,7 +127,10 @@ def run_service(service, item,): @classmethod def can_we_scrape(cls, item: MediaItem) -> bool: """Check if we can scrape an item.""" - return item.is_released and cls.should_submit(item) + if item.is_released and cls.should_submit(item): + return True + logger.debug(f"Conditions not met, will not scrape {item.log_string}") + return False @staticmethod def should_submit(item: MediaItem) -> bool: diff --git a/src/program/state_transition.py b/src/program/state_transition.py index d8d8143a..b8d8d594 100644 --- a/src/program/state_transition.py +++ b/src/program/state_transition.py @@ -14,12 +14,11 @@ def process_event(existing_item: MediaItem | None, emitted_by: Service, item: MediaItem) -> ProcessedEvent: """Process an event and return the updated item, next service and items to submit.""" next_service: Service = None - updated_item = item no_further_processing: ProcessedEvent = (None, None, []) items_to_submit = [] - source_services = (Overseerr, PlexWatchlist, Listrr, Mdblist, SymlinkLibrary, TraktContent) - if emitted_by in source_services or item.state in [States.Requested, States.Unknown]: + source_services = (Overseerr, PlexWatchlist, Listrr, Mdblist, SymlinkLibrary, TraktContent, "RetryLibrary", "Manual") + if not existing_item and emitted_by in source_services or existing_item and existing_item.state in [States.Requested, States.Unknown]: next_service = TraktIndexer if isinstance(item, Season): item = item.parent @@ -28,58 +27,98 @@ def process_event(existing_item: MediaItem | None, emitted_by: Service, item: Me return no_further_processing return None, next_service, [item] - elif item.state in (States.Indexed, States.PartiallyCompleted): - next_service = Scraping - if existing_item: - if not existing_item.indexed_at: - if isinstance(item, (Show, Season)): - existing_item.fill_in_missing_children(item) - existing_item.copy_other_media_attr(item) - existing_item.indexed_at = item.indexed_at - updated_item = item = existing_item - if existing_item.state == States.Completed: - return existing_item, None, [] - items_to_submit = [item] if Scraping.can_we_scrape(item) else [] + if existing_item: + if existing_item.state in (States.Indexed, States.PartiallyCompleted): + next_service = Scraping + if existing_item: + if not existing_item.indexed_at: + if isinstance(item, (Show, Season)): + existing_item.fill_in_missing_children(item) + existing_item.copy_other_media_attr(item) + existing_item.indexed_at = item.indexed_at + if existing_item.state == States.Completed: + return existing_item, None, [] + if existing_item.type in ("movie", "episode"): + items_to_submit = [existing_item] if Scraping.can_we_scrape(existing_item) else [] + elif existing_item.type == "show": + if Scraping.can_we_scrape(existing_item): + items_to_submit = [existing_item] + else: + for season in existing_item.seasons: + if season.state in (States.Indexed, States.PartiallyCompleted) and Scraping.can_we_scrape(season): + items_to_submit.append(season) + elif season.state == States.Scraped: + next_service = Downloader + items_to_submit.append(season) + elif existing_item.type == "season": + if Scraping.can_we_scrape(existing_item): + items_to_submit = [existing_item] + else: + for episode in existing_item.episodes: + if episode.state == States.Indexed and Scraping.can_we_scrape(episode): + items_to_submit.append(episode) + elif episode.state == States.Scraped: + next_service = Downloader + items_to_submit.append(episode) + elif episode.state == States.Downloaded: + next_service = Symlinker + items_to_submit.append(episode) - elif item.state == States.Scraped: - next_service = Downloader - items_to_submit = [item] + elif existing_item.state == States.Scraped: + next_service = Downloader + items_to_submit = [] + if existing_item.type in ["movie", "episode"]: + items_to_submit.append(existing_item) + elif existing_item.type == "show": + seasons_to_promote = [s for s in existing_item.seasons if s.state == States.Downloaded] + episodes_to_promote = [e for s in existing_item.seasons if s.state != States.Completed for e in s.episodes if e.state == States.Downloaded] + if seasons_to_promote: + next_service = Symlinker + items_to_submit = seasons_to_promote + items_to_submit.append(existing_item) + elif existing_item.type == "season": + episodes_to_promote = [e for e in existing_item.episodes if e.state == States.Downloaded] + if episodes_to_promote: + next_service = Symlinker + items_to_submit = episodes_to_promote + items_to_submit.append(existing_item) - elif item.state == States.Downloaded: - next_service = Symlinker - proposed_submissions = [] - if isinstance(item, Show): - all_found = all( - all(e.file and e.folder for e in season.episodes if not e.symlinked) - for season in item.seasons - ) - if all_found: - proposed_submissions = [item] - else: - proposed_submissions = [ - e for season in item.seasons - for e in season.episodes - if not e.symlinked and e.file and e.folder - ] - elif isinstance(item, Season): - if all(e.file and e.folder for e in item.episodes if not e.symlinked): - proposed_submissions = [item] - else: - proposed_submissions = [e for e in item.episodes if not e.symlinked and e.file and e.folder] - elif isinstance(item, (Movie, Episode)): - proposed_submissions = [item] - items_to_submit = [] - for sub_item in proposed_submissions: - if Symlinker.should_submit(sub_item): - items_to_submit.append(sub_item) - else: - logger.debug(f"{sub_item.log_string} not submitted to Symlinker because it is not eligible") - elif item.state == States.Symlinked: - next_service = Updater - items_to_submit = [item] + elif existing_item.state == States.Downloaded : + next_service = Symlinker + proposed_submissions = [] + if isinstance(existing_item, Show): + all_found = all( + all(e.file and e.folder for e in season.episodes if not e.symlinked) + for season in existing_item.seasons + ) + if all_found: + proposed_submissions = [existing_item] + else: + proposed_submissions = [ + e for season in existing_item.seasons + for e in season.episodes + if not e.symlinked and e.file and e.folder + ] + elif isinstance(existing_item, Season): + if all(e.file and e.folder for e in item.episodes if not e.symlinked): + proposed_submissions = [existing_item] + else: + proposed_submissions = [e for e in existing_item.episodes if not e.symlinked and e.file and e.folder] + elif isinstance(existing_item, (Movie, Episode)): + proposed_submissions = [existing_item] + items_to_submit = [] + for sub_item in proposed_submissions: + if Symlinker.should_submit(sub_item): + items_to_submit.append(sub_item) + else: + logger.debug(f"{sub_item.log_string} not submitted to Symlinker because it is not eligible") - elif item.state == States.Completed: - return no_further_processing + elif existing_item.state == States.Symlinked: + next_service = Updater + items_to_submit = [existing_item] - return updated_item, next_service, items_to_submit \ No newline at end of file + elif existing_item.state == States.Completed: + return no_further_processing + + return existing_item, next_service, items_to_submit \ No newline at end of file diff --git a/src/program/symlink.py b/src/program/symlink.py index 67b6c283..34c1978d 100644 --- a/src/program/symlink.py +++ b/src/program/symlink.py @@ -88,10 +88,6 @@ def create_initial_folders(self): def run(self, item: Union[Movie, Show, Season, Episode]): """Check if the media item exists and create a symlink if it does""" - if not item: - logger.error("Invalid item sent to Symlinker: None") - return - try: if isinstance(item, Show): self._symlink_show(item) @@ -103,8 +99,7 @@ def run(self, item: Union[Movie, Show, Season, Episode]): logger.error(f"Exception thrown when creating symlink for {item.log_string}: {e}") item.set("symlinked_times", item.symlinked_times + 1) - if self.should_submit(item): - yield item + yield item @staticmethod def should_submit(item: Union[Movie, Show, Season, Episode]) -> bool: @@ -266,6 +261,7 @@ def _symlink(self, item: Union[Movie, Episode]) -> bool: item.set("symlinked", True) item.set("symlinked_at", datetime.now()) item.set("symlinked_times", item.symlinked_times + 1) + item.set("symlink_path", destination) return True def _create_item_folders(self, item: Union[Movie, Show, Season, Episode], filename: str) -> str: @@ -379,17 +375,17 @@ def delete_item_symlinks(self, id: int) -> bool: logger.debug(f"Deleted symlink for {item.log_string}") if isinstance(item, (Movie, Episode)): - reset_symlinked(item, reset_times=True) + item.reset(True) elif isinstance(item, Show): for season in item.seasons: for episode in season.episodes: - reset_symlinked(episode, reset_times=True) - reset_symlinked(season, reset_times=True) - reset_symlinked(item, reset_times=True) + episode.reset(True) + season.reset(True) + item.reset(True) elif isinstance(item, Season): for episode in item.episodes: - reset_symlinked(episode, reset_times=True) - reset_symlinked(item, reset_times=True) + episode.reset(True) + item.reset(True) item.store_state() session.commit() @@ -450,7 +446,7 @@ def quick_file_check(item: Union[Movie, Episode]) -> bool: return True if item.symlinked_times >= 3: - reset_symlinked(item, reset_times=True) + item.reset(True) logger.log("SYMLINKER", f"Reset item {item.log_string} back to scrapable after 3 failed attempts") return False @@ -475,26 +471,3 @@ def search_file(rclone_path: Path, item: Union[Movie, Episode]) -> bool: except Exception as e: logger.error(f"Error occurred while searching for file {filename} in {rclone_path}: {e}") return False - -def reset_symlinked(item: MediaItem, reset_times: bool = True) -> None: - """Reset item attributes for rescraping.""" - item.set("file", None) - item.set("folder", None) - item.set("alternative_folder", None) - - if hasattr(item, "active_stream") and "hash" in item.active_stream: - hash_to_blacklist = item.active_stream["hash"] - stream: Stream = next((stream for stream in item.streams if stream.infohash == hash_to_blacklist), None) - if stream: - stream.blacklisted = True - - item.set("active_stream", {}) - item.set("symlinked", False) - item.set("symlinked_at", None) - item.set("update_folder", None) - - if reset_times: - item.set("symlinked_times", 0) - item.set("scraped_times", 0) - - logger.debug(f"Item {item.log_string} reset for rescraping") diff --git a/src/utils/logger.py b/src/utils/logger.py index 93c3bd3c..65ff6f5b 100644 --- a/src/utils/logger.py +++ b/src/utils/logger.py @@ -1,5 +1,6 @@ """Logging utils""" +import asyncio import os import sys from datetime import datetime @@ -8,6 +9,7 @@ from program.settings.manager import settings_manager from rich.console import Console from utils import data_dir_path +from controllers.ws import manager LOG_ENABLED: bool = settings_manager.settings.log @@ -98,7 +100,16 @@ def get_log_settings(name, default_color, default_icon): "backtrace": False, "diagnose": True, "enqueue": True, - } + }, + # maybe later + # { + # "sink": manager.send_log_message, + # "level": level.upper() or "INFO", + # "format": log_format, + # "backtrace": False, + # "diagnose": False, + # "enqueue": True, + # } ])