From ac94a25094b44358efc8d321ac9f528119c43749 Mon Sep 17 00:00:00 2001 From: Gaisberg Date: Wed, 24 Jul 2024 23:05:37 +0300 Subject: [PATCH] fix: include stream in db, rework blacklisting --- src/program/db/db_functions.py | 4 --- src/program/downloaders/__init__.py | 8 ++++- src/program/downloaders/alldebrid.py | 30 ++++++---------- src/program/downloaders/realdebrid.py | 30 +++++----------- src/program/downloaders/torbox.py | 19 +++++++---- src/program/media/item.py | 49 +++++---------------------- src/program/media/stream.py | 34 +++++++++++++++++++ src/program/scrapers/__init__.py | 16 +++++---- src/program/scrapers/shared.py | 9 +++-- src/program/state_transition.py | 7 ++++ src/program/symlink.py | 3 +- src/utils/request.py | 9 +---- 12 files changed, 107 insertions(+), 111 deletions(-) create mode 100644 src/program/media/stream.py diff --git a/src/program/db/db_functions.py b/src/program/db/db_functions.py index 1bd1a286..a0d3a64d 100644 --- a/src/program/db/db_functions.py +++ b/src/program/db/db_functions.py @@ -38,19 +38,15 @@ def _get_item_from_db(session, item: MediaItem): match type: case "movie": r = session.execute(select(Movie).where(MediaItem.imdb_id==item.imdb_id).options(joinedload("*"))).unique().scalar_one() - r.set("streams", item.get("streams", {})) return r case "show": r = session.execute(select(Show).where(MediaItem.imdb_id==item.imdb_id).options(joinedload("*"))).unique().scalar_one() - r.set("streams", item.get("streams", {})) return r case "season": r = session.execute(select(Season).where(Season._id==item._id).options(joinedload("*"))).unique().scalar_one() - r.set("streams", item.get("streams", {})) return r case "episode": r = session.execute(select(Episode).where(Episode._id==item._id).options(joinedload("*"))).unique().scalar_one() - r.set("streams", item.get("streams", {})) return r case _: logger.error(f"_get_item_from_db Failed to create item from type: {type}") diff --git a/src/program/downloaders/__init__.py b/src/program/downloaders/__init__.py index 089b0994..67c3f809 100644 --- a/src/program/downloaders/__init__.py +++ b/src/program/downloaders/__init__.py @@ -27,4 +27,10 @@ def validate(self): def run(self, item: MediaItem): for service in self.services.values(): if service.initialized: - return service.run(item) \ No newline at end of file + downloaded = service.run(item) + if not downloaded: + if item.type == "show": + yield [season for season in item.seasons] + elif item.type == "season": + yield [episode for episode in item.episodes] + yield item \ No newline at end of file diff --git a/src/program/downloaders/alldebrid.py b/src/program/downloaders/alldebrid.py index 01d645ca..71f9f1ed 100644 --- a/src/program/downloaders/alldebrid.py +++ b/src/program/downloaders/alldebrid.py @@ -98,26 +98,14 @@ def validate(self) -> bool: logger.exception(f"Failed to validate All-Debrid settings: {e}") return False - def run(self, item: MediaItem) -> Generator[MediaItem, None, None]: + def run(self, item: MediaItem) -> bool: """Download media item from all-debrid.com""" - if (item.file and item.folder): - yield None - return - if not self.is_cached(item): - if isinstance(item, Season): - res = [e for e in item.episodes] - yield res - return - if isinstance(item, Show): - res = [s for s in item.seasons] - yield res - return - yield None - return - if not self._is_downloaded(item): + return_value = False + if self.is_cached(item) and not self._is_downloaded(item): self._download_item(item) + return_value = True self.log_item(item) - yield item + return return_value @staticmethod def log_item(item: MediaItem) -> None: @@ -165,7 +153,7 @@ def _chunked(lst: List, n: int) -> Generator[List, None, None]: logger.log("DEBRID", f"Processing {len(item.streams)} streams for {item.log_string}") processed_stream_hashes = set() - filtered_streams = [hash for hash in item.streams if hash and hash not in processed_stream_hashes] + filtered_streams = [stream.infohash for stream in item.streams if stream.infohash and stream.infohash not in processed_stream_hashes] if not filtered_streams: logger.log("NOT_FOUND", f"No streams found from filtering: {item.log_string}") return False @@ -182,7 +170,6 @@ def _chunked(lst: List, n: int) -> Generator[List, None, None]: except Exception as e: logger.error(f"Error checking cache for streams: {str(e)}", exc_info=True) - item.set("streams", {}) logger.log("NOT_FOUND", f"No wanted cached streams found for {item.log_string} out of {len(filtered_streams)}") return False @@ -203,7 +190,10 @@ def _evaluate_stream_response(self, data, processed_stream_hashes, item): processed_stream_hashes.add(stream_hash) if self._process_providers(item, magnet, stream_hash): - return True + return True + else: + stream = next(stream for stream in item.streams if stream.infohash == stream_hash) + stream.blacklisted = True return False def _process_providers(self, item: MediaItem, magnet: dict, stream_hash: str) -> bool: diff --git a/src/program/downloaders/realdebrid.py b/src/program/downloaders/realdebrid.py index b2d140ca..fc4503b4 100644 --- a/src/program/downloaders/realdebrid.py +++ b/src/program/downloaders/realdebrid.py @@ -100,27 +100,14 @@ def validate(self) -> bool: logger.error("Couldn't parse user data response from Real-Debrid.") return False - - def run(self, item: MediaItem) -> Generator[MediaItem, None, None]: + def run(self, item: MediaItem) -> bool: """Download media item from real-debrid.com""" - if (item.file and item.folder): - yield None - return - if not self.is_cached(item): - if isinstance(item, Season): - res = [e for e in item.episodes] - yield res - return - if isinstance(item, Show): - res = [s for s in item.seasons] - yield res - return - yield None - return - if not self._is_downloaded(item): + return_value = False + if self.is_cached(item) and not self._is_downloaded(item): self._download_item(item) + return_value = True self.log_item(item) - yield item + return return_value @staticmethod def log_item(item: MediaItem) -> None: @@ -153,7 +140,7 @@ def _chunked(lst: List, n: int) -> Generator[List, None, None]: logger.log("DEBRID", f"Processing {len(item.streams)} streams for {item.log_string}") processed_stream_hashes = set() - filtered_streams = [hash for hash in item.streams if hash and hash not in processed_stream_hashes] + filtered_streams = [stream.infohash for stream in item.streams if stream.infohash and stream.infohash not in processed_stream_hashes] if not filtered_streams: logger.log("NOT_FOUND", f"No streams found from filtering: {item.log_string}") return False @@ -164,13 +151,11 @@ def _chunked(lst: List, n: int) -> Generator[List, None, None]: response = get(f"{RD_BASE_URL}/torrents/instantAvailability/{streams}/", additional_headers=self.auth_headers, proxies=self.proxy, response_type=dict, specific_rate_limiter=self.torrents_rate_limiter, overall_rate_limiter=self.overall_rate_limiter) if response.is_ok and response.data and isinstance(response.data, dict): if self._evaluate_stream_response(response.data, processed_stream_hashes, item): - item.set("streams", {}) return True except Exception as e: logger.exception(f"Error checking cache for streams: {str(e)}", exc_info=True) continue - item.set("streams", {}) logger.log("NOT_FOUND", f"No wanted cached streams found for {item.log_string} out of {len(filtered_streams)}") return False @@ -185,6 +170,9 @@ def _evaluate_stream_response(self, data, processed_stream_hashes, item): if self._process_providers(item, provider_list, stream_hash): logger.debug(f"Finished processing providers - selecting {stream_hash} for downloading") return True + else: + stream = next(stream for stream in item.streams if stream.infohash == stream_hash) + stream.blacklisted = True return False def _process_providers(self, item: MediaItem, provider_list: dict, stream_hash: str) -> bool: diff --git a/src/program/downloaders/torbox.py b/src/program/downloaders/torbox.py index a8ec40e7..6c739954 100644 --- a/src/program/downloaders/torbox.py +++ b/src/program/downloaders/torbox.py @@ -67,9 +67,10 @@ def validate(self) -> bool: logger.exception(f"Failed to validate Torbox settings: {e}") return False - def run(self, item: MediaItem) -> Generator[MediaItem, None, None]: + def run(self, item: MediaItem) -> bool: """Download media item from torbox.app""" - cached_hashes = self.get_torrent_cached([hash for hash in item.streams]) + return_value = False + cached_hashes = self.get_torrent_cached([stream.infohash for stream in item.streams]) if cached_hashes: for cache in cached_hashes.values(): item.active_stream = cache @@ -82,15 +83,19 @@ def run(self, item: MediaItem) -> Generator[MediaItem, None, None]: {"hash": cache["hash"], "files": cache["files"], "id": None}, ) self.download(item) + return_value = True break + else: + stream = next(stream for stream in item.streams if stream.infohash == cache["hash"]) + stream.blacklisted = True else: logger.log("DEBRID", f"Item is not cached: {item.log_string}") - for hash in item.streams: + for stream in item.streams: logger.log( - "DEBUG", f"Blacklisting hash ({hash}) for item: {item.log_string}" + "DEBUG", f"Blacklisting hash ({stream.infohash}) for item: {item.log_string}" ) - item.streams = {} - yield item + stream.blacklisted = True + return return_value def find_required_files(self, item, container): @@ -267,7 +272,7 @@ def get_torrent_cached(self, hash_list): return response.data["data"] def create_torrent(self, hash) -> int: - magnet_url = f"magnet:?xt=urn:btih:{hash}" + magnet_url = f"magnet:?xt=urn:btih:{hash}&dn=&tr=" response = post( f"{self.base_url}/torrents/createtorrent", data={"magnet": magnet_url, "seed": 1, "allow_zip": False}, diff --git a/src/program/media/item.py b/src/program/media/item.py index f696aa61..d1ef41f5 100644 --- a/src/program/media/item.py +++ b/src/program/media/item.py @@ -7,7 +7,8 @@ from program.media.state import States from RTN import Torrent, parse from sqlalchemy import orm -from sqlalchemy.orm import Mapped, mapped_column, relationship +from sqlalchemy.orm import Mapped, mapped_column, relationship, collections +from .stream import Stream # from RTN.patterns import extract_episodes from utils.logger import logger @@ -26,6 +27,7 @@ class MediaItem(db.Model): scraped_at: Mapped[Optional[datetime]] = mapped_column(sqlalchemy.DateTime, nullable=True) scraped_times: Mapped[Optional[int]] = mapped_column(sqlalchemy.Integer, default=0) active_stream: Mapped[Optional[dict[str, str]]] = mapped_column(sqlalchemy.JSON, nullable=True) + streams: Mapped[List[Stream]] = relationship("Stream", back_populates="parent", cascade="all, delete-orphan") symlinked: Mapped[Optional[bool]] = mapped_column(sqlalchemy.Boolean, default=False) symlinked_at: Mapped[Optional[datetime]] = mapped_column(sqlalchemy.DateTime, nullable=True) symlinked_times: Mapped[Optional[int]] = mapped_column(sqlalchemy.Integer, default=0) @@ -54,18 +56,7 @@ class MediaItem(db.Model): "polymorphic_on":"type", "with_polymorphic":"*", } - @orm.reconstructor - def init_on_load(self): - self.streams: Optional[dict[str, Torrent]] = {} def __init__(self, item: dict) -> None: - # id: Mapped[int] = mapped_column(primary_key=True) - # name: Mapped[str] = mapped_column(String(30)) - # fullname: Mapped[Optional[str]] - # addresses: Mapped[List["Address"]] = relationship(lazy=False, - # back_populates="user", cascade="all, delete-orphan" - # ) - # user_id: Mapped[int] = mapped_column(sqlalchemy.ForeignKey("user_account.id")) - # user: Mapped["User"] = relationship(lazy=False, back_populates="addresses") self.requested_at = item.get("requested_at", datetime.now()) self.requested_by = item.get("requested_by") @@ -74,7 +65,7 @@ def __init__(self, item: dict) -> None: self.scraped_at = None self.scraped_times = 0 self.active_stream = item.get("active_stream", {}) - self.streams: Optional[dict[str, Torrent]] = {} + self.streams: Optional[list[Stream]] = [] self.symlinked = False self.symlinked_at = None @@ -168,20 +159,9 @@ def copy_other_media_attr(self, other): self.overseerr_id = getattr(other, "overseerr_id", None) def is_scraped(self): - return len(self.streams) > 0 - - def is_checked_for_availability(self): - """Check if item has been checked for availability.""" - if self.streams: - return all( - stream.get("cached", None) is not None - for stream in self.streams.values() - ) - return False - - def has_complete_metadata(self) -> bool: - """Check if the item has complete metadata.""" - return self.title is not None and self.aired_at is not None + return (len(self.streams) > 0 + and + all(stream.blacklisted == False for stream in self.streams)) def to_dict(self): """Convert item to dictionary (API response)""" @@ -289,9 +269,6 @@ class Movie(MediaItem): "polymorphic_identity": "movie", "polymorphic_load": "inline", } - @orm.reconstructor - def init_on_load(self): - self.streams: Optional[dict[str, Torrent]] = {} def __init__(self, item): self.type = "movie" @@ -312,10 +289,6 @@ class Season(MediaItem): parent_id: Mapped[int] = mapped_column(sqlalchemy.ForeignKey("Show._id"), use_existing_column=True) parent: Mapped["Show"] = relationship(lazy=False, back_populates="seasons", foreign_keys="Season.parent_id") episodes: Mapped[List["Episode"]] = relationship(lazy=False, back_populates="parent", single_parent=True, cascade="all, delete-orphan", foreign_keys="Episode.parent_id") - @orm.reconstructor - def init_on_load(self): - self.streams: Optional[dict[str, Torrent]] = {} - __mapper_args__ = { "polymorphic_identity": "season", "polymorphic_load": "inline", @@ -408,9 +381,6 @@ class Episode(MediaItem): _id: Mapped[int] = mapped_column(sqlalchemy.ForeignKey("MediaItem._id"), primary_key=True) parent_id: Mapped[int] = mapped_column(sqlalchemy.ForeignKey("Season._id"), use_existing_column=True) parent: Mapped["Season"] = relationship(lazy=False, back_populates="episodes", foreign_keys="Episode.parent_id") - @orm.reconstructor - def init_on_load(self): - self.streams: Optional[dict[str, Torrent]] = {} __mapper_args__ = { "polymorphic_identity": "episode", @@ -464,10 +434,7 @@ class Show(MediaItem): __tablename__ = "Show" _id: Mapped[int] = mapped_column(sqlalchemy.ForeignKey("MediaItem._id"), primary_key=True) seasons: Mapped[List["Season"]] = relationship(lazy=False, back_populates="parent", single_parent=True, cascade="all, delete-orphan", foreign_keys="Season.parent_id") - @orm.reconstructor - def init_on_load(self): - self.streams: Optional[dict[str, Torrent]] = {} - + __mapper_args__ = { "polymorphic_identity": "show", "polymorphic_load": "inline", diff --git a/src/program/media/stream.py b/src/program/media/stream.py new file mode 100644 index 00000000..9e61d87d --- /dev/null +++ b/src/program/media/stream.py @@ -0,0 +1,34 @@ +from typing import Optional +from RTN import Torrent +from program.db.db import db +import sqlalchemy +from sqlalchemy.orm import Mapped, mapped_column, relationship + + +class Stream(db.Model): + __tablename__ = "Stream" + _id: Mapped[int] = mapped_column(primary_key=True) + infohash: Mapped[str] = mapped_column(sqlalchemy.String, nullable=False) + raw_title: Mapped[str] = mapped_column(sqlalchemy.String, nullable=False) + parsed_title: Mapped[str] = mapped_column(sqlalchemy.String, nullable=False) + rank: Mapped[int] = mapped_column(sqlalchemy.Integer, nullable=False) + lev_ratio: Mapped[float] = mapped_column(sqlalchemy.Float, nullable=False) + blacklisted: Mapped[bool] = mapped_column(sqlalchemy.Boolean, nullable=False) + + parent_id: Mapped[int] = mapped_column(sqlalchemy.ForeignKey("MediaItem._id")) + parent = relationship("MediaItem", back_populates="streams", cascade="all, delete-orphan", single_parent=True) + + def __init__(self, torrent: Torrent): + self.raw_title = torrent.raw_title + self.infohash = torrent.infohash + self.parsed_title = torrent.data.parsed_title + self.rank = torrent.rank + self.lev_ratio = torrent.lev_ratio + self.blacklisted = False + + def __hash__(self): + return self.infohash + + def __eq__(self, other): + return isinstance(other, Stream) and self.infohash == other.infohash + \ No newline at end of file diff --git a/src/program/scrapers/__init__.py b/src/program/scrapers/__init__.py index 9bcc30aa..e6cdef00 100644 --- a/src/program/scrapers/__init__.py +++ b/src/program/scrapers/__init__.py @@ -5,6 +5,7 @@ from program.media.item import Episode, MediaItem, Movie, Season, Show from program.media.state import States +from program.media.stream import Stream from program.scrapers.annatar import Annatar from program.scrapers.comet import Comet from program.scrapers.jackett import Jackett @@ -82,7 +83,10 @@ def run(self, item: Union[Show, Season, Episode, Movie]) -> Generator[Union[Show sorted_streams = self.scrape(item) # Set the streams and yield the item - item.streams.update(sorted_streams) + + for stream in sorted_streams.values(): + if stream not in item.streams: + item.streams.append(stream) item.set("scraped_at", datetime.now()) item.set("scraped_times", item.scraped_times + 1) @@ -93,7 +97,7 @@ def run(self, item: Union[Show, Season, Episode, Movie]) -> Generator[Union[Show yield item - def scrape(self, item: MediaItem, log = True) -> Dict[str, Torrent]: + def scrape(self, item: MediaItem, log = True) -> Dict[str, Stream]: """Scrape an item.""" threads: List[threading.Thread] = [] results: Dict[str, str] = {} @@ -119,18 +123,18 @@ def run_service(service, item,): if total_results != len(results): logger.debug(f"Scraped {item.log_string} with {total_results} results, removed {total_results - len(results)} duplicate hashes") - sorted_streams: Dict[str, Torrent] = _parse_results(item, results) + sorted_streams: Dict[str, Stream] = _parse_results(item, results) if sorted_streams and (log and settings_manager.settings.debug): item_type = item.type.title() top_results = sorted(sorted_streams.values(), key=lambda x: x.rank, reverse=True)[:10] for sorted_tor in top_results: if isinstance(item, (Movie, Show)): - logger.debug(f"[{item_type}] Parsed '{sorted_tor.data.parsed_title}' with rank {sorted_tor.rank} and ratio {sorted_tor.lev_ratio:.2f}: '{sorted_tor.raw_title}'") + logger.debug(f"[{item_type}] Parsed '{sorted_tor.parsed_title}' with rank {sorted_tor.rank} and ratio {sorted_tor.lev_ratio:.2f}: '{sorted_tor.raw_title}'") if isinstance(item, Season): - logger.debug(f"[{item_type} {item.number}] Parsed '{sorted_tor.data.parsed_title}' with rank {sorted_tor.rank} and ratio {sorted_tor.lev_ratio:.2f}: '{sorted_tor.raw_title}'") + logger.debug(f"[{item_type} {item.number}] Parsed '{sorted_tor.parsed_title}' with rank {sorted_tor.rank} and ratio {sorted_tor.lev_ratio:.2f}: '{sorted_tor.raw_title}'") elif isinstance(item, Episode): - logger.debug(f"[{item_type} {item.parent.number}:{item.number}] Parsed '{sorted_tor.data.parsed_title}' with rank {sorted_tor.rank} and ratio {sorted_tor.lev_ratio:.2f}: '{sorted_tor.raw_title}'") + logger.debug(f"[{item_type} {item.parent.number}:{item.number}] Parsed '{sorted_tor.parsed_title}' with rank {sorted_tor.rank} and ratio {sorted_tor.lev_ratio:.2f}: '{sorted_tor.raw_title}'") return sorted_streams @classmethod diff --git a/src/program/scrapers/shared.py b/src/program/scrapers/shared.py index 559bdbfb..116db3b8 100644 --- a/src/program/scrapers/shared.py +++ b/src/program/scrapers/shared.py @@ -4,6 +4,7 @@ from typing import Dict, Set from program.media.item import Episode, MediaItem, Movie, Season, Show +from program.media.stream import Stream from program.settings.manager import settings_manager from program.settings.versions import models from RTN import RTN, Torrent, sort_torrents @@ -30,7 +31,7 @@ def _get_stremio_identifier(item: MediaItem) -> str: return identifier, scrape_type, imdb_id -def _parse_results(item: MediaItem, results: Dict[str, str]) -> Dict[str, Torrent]: +def _parse_results(item: MediaItem, results: Dict[str, str]) -> Dict[str, Stream]: """Parse the results from the scrapers into Torrent objects.""" torrents: Set[Torrent] = set() processed_infohashes: Set[str] = set() @@ -113,6 +114,10 @@ def _parse_results(item: MediaItem, results: Dict[str, str]) -> Dict[str, Torren if torrents: logger.log("SCRAPER", f"Processed {len(torrents)} matches for {item.log_string}") - return sort_torrents(torrents) + torrents = sort_torrents(torrents) + torrents_dict = {} + for torrent in torrents.values(): + torrents_dict[torrent.infohash] = Stream(torrent) + return torrents_dict return {} diff --git a/src/program/state_transition.py b/src/program/state_transition.py index d8d8143a..e4d875c4 100644 --- a/src/program/state_transition.py +++ b/src/program/state_transition.py @@ -27,6 +27,13 @@ def process_event(existing_item: MediaItem | None, emitted_by: Service, item: Me if existing_item and not TraktIndexer.should_submit(existing_item): return no_further_processing return None, next_service, [item] + + elif item.state == States.Indexed and len(item.streams) > 0: + next_service = Scraping + if item.type == "show": + items_to_submit = [season for season in item.seasons if Scraping.can_we_scrape(season)] + if item.type == "season": + items_to_submit = [episode for episode in item.episodes if Scraping.can_we_scrape(episode)] elif item.state in (States.Indexed, States.PartiallyCompleted): next_service = Scraping diff --git a/src/program/symlink.py b/src/program/symlink.py index f79578b0..6bfdc6bc 100644 --- a/src/program/symlink.py +++ b/src/program/symlink.py @@ -475,7 +475,8 @@ def reset_item(item: Union[Movie, Show, Season, Episode], reset_times: bool = Tr """Reset item attributes for rescraping.""" item.set("file", None) item.set("folder", None) - item.set("streams", {}) + for stream in item.streams.values(): + stream.blacklisted = True item.set("active_stream", {}) if reset_times: item.set("symlinked_times", 0) diff --git a/src/utils/request.py b/src/utils/request.py index b78342f2..1bc8e69d 100644 --- a/src/utils/request.py +++ b/src/utils/request.py @@ -95,13 +95,6 @@ def _make_request( if retry_if_failed: session.mount("http://", _adapter) session.mount("https://", _adapter) - headers = { - "Content-Type": "application/json", - "Accept": "application/json", - "User-Agent": user_agent_factory.get_random_user_agent() - } - if additional_headers: - headers.update(additional_headers) specific_context = specific_rate_limiter if specific_rate_limiter else nullcontext() overall_context = overall_rate_limiter if overall_rate_limiter else nullcontext() @@ -110,7 +103,7 @@ def _make_request( with overall_context: with specific_context: response = session.request( - method, url, headers=headers, data=data, params=params, timeout=timeout, proxies=proxies, json=json + method, url, headers=additional_headers, data=data, params=params, timeout=timeout, proxies=proxies, json=json ) except requests.exceptions.RequestException as e: logger.error(f"Request failed: {e}", exc_info=True)