Skip to content

Commit

Permalink
fix: plex watchlist updated to work with new api changes. added db gu…
Browse files Browse the repository at this point in the history
…ards. improved trakt id detection. changed rd blacklisting to only blacklist on movie/episode items or on empty rd cache
  • Loading branch information
dreulavelle committed Jul 27, 2024
1 parent 76fdd89 commit ce074b3
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 77 deletions.
12 changes: 6 additions & 6 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 12 additions & 7 deletions src/controllers/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pydantic
from fastapi import APIRouter, Request
from program.content.overseerr import Overseerr
from program.indexers.trakt import get_imdbid_from_tmdb
from program.indexers.trakt import get_imdbid_from_tmdb, get_imdbid_from_tvdb
from program.media.item import MediaItem
from requests import RequestException
from utils.logger import logger
Expand Down Expand Up @@ -37,13 +37,18 @@ async def overseerr(request: Request) -> Dict[str, Any]:
imdb_id = req.media.imdbId
if not imdb_id:
try:
imdb_id = get_imdbid_from_tmdb(req.media.tmdbId)
_type = req.media.media_type
if _type == "tv":
_type = "show"
imdb_id = get_imdbid_from_tmdb(req.media.tmdbId, type=_type)
if not imdb_id or not imdb_id.startswith("tt"):
imdb_id = get_imdbid_from_tvdb(req.media.tvdbId, type=_type)
if not imdb_id or not imdb_id.startswith("tt"):
logger.error(f"Failed to get imdb_id from Overseerr: {req.media.tmdbId}")
return {"success": False, "message": "Failed to get imdb_id from Overseerr", "title": req.subject}
except RequestException:
logger.error(f"Failed to get imdb_id from TMDB: {req.media.tmdbId}")
return {"success": False, "message": "Failed to get imdb_id from TMDB", "title": req.subject}
if not imdb_id:
logger.error(f"Failed to get imdb_id from TMDB: {req.media.tmdbId}")
return {"success": False, "message": "Failed to get imdb_id from TMDB", "title": req.subject}
logger.error(f"Failed to get imdb_id from Overseerr: {req.media.tmdbId}")
return {"success": False, "message": "Failed to get imdb_id from Overseerr", "title": req.subject}

overseerr: Overseerr = request.app.program.services[Overseerr]
if not overseerr.initialized:
Expand Down
7 changes: 5 additions & 2 deletions src/program/content/overseerr.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def run(self):

try:
response = get(
self.settings.url + f"/api/v1/request?take={10000}&filter=approved",
self.settings.url + f"/api/v1/request?take={10000}&filter=approved&sort=added",
additional_headers=self.headers,
)
except (ConnectionError, RetryError, MaxRetryError) as e:
Expand Down Expand Up @@ -134,8 +134,11 @@ def get_imdb_id(self, data) -> str:
for id_attr, fetcher in alternate_ids:
external_id_value = getattr(response.data.externalIds, id_attr, None)
if external_id_value:
_type = data.media_type
if _type == "tv":
_type = "show"
try:
new_imdb_id: Union[str, None] = fetcher(external_id_value)
new_imdb_id: Union[str, None] = fetcher(external_id_value, type=_type)
if not new_imdb_id:
continue
return new_imdb_id
Expand Down
76 changes: 50 additions & 26 deletions src/program/content/plex_watchlist.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
"""Plex Watchlist Module"""
from types import SimpleNamespace
import xml.etree.ElementTree as ET

from typing import Generator, Union

from program.media.item import Episode, MediaItem, Movie, Season, Show
from program.settings.manager import settings_manager
from program.indexers.trakt import get_imdbid_from_tvdb
from requests import HTTPError
from utils.logger import logger
from utils.request import get, ping
from plexapi.myplex import MyPlexAccount
from requests import Session
import xmltodict


class PlexWatchlist:
Expand All @@ -17,6 +23,8 @@ def __init__(self):
self.rss_enabled = False
self.settings = settings_manager.settings.content.plex_watchlist
self.token = settings_manager.settings.updaters.plex.token
self.account = None
self.session = Session()
self.initialized = self.validate()
if not self.initialized:
return
Expand All @@ -30,24 +38,29 @@ def validate(self):
if not self.token:
logger.error("Plex token is not set!")
return False
try:
self.account = MyPlexAccount(self.session, token=self.token)
except Exception as e:
logger.error(f"Unable to authenticate Plex account: {e}")
return False
if self.settings.rss:
for rss_url in self.settings.rss:
try:
response = ping(rss_url)
response.response.raise_for_status()
self.rss_enabled = True
return True
except HTTPError as e:
if e.response.status_code == 404:
logger.warning(f"Plex RSS URL {rss_url} is Not Found. Please check your RSS URL in settings.")
return False
else:
logger.warning(
f"Plex RSS URL {rss_url} is not reachable (HTTP status code: {e.response.status_code})."
)
return False
except Exception as e:
logger.error(f"Failed to validate Plex RSS URL {rss_url}: {e}", exc_info=True)
logger.warning("None of the provided RSS URLs are reachable. Falling back to using user Watchlist.")
return False
return False
return True

def run(self) -> Generator[Union[Movie, Show, Season, Episode], None, None]:
Expand Down Expand Up @@ -76,38 +89,49 @@ def _get_items_from_rss(self) -> Generator[MediaItem, None, None]:
"""Fetch media from Plex RSS Feeds."""
for rss_url in self.settings.rss:
try:
response = get(rss_url, timeout=60)
if not response.is_ok:
response = self.session.get(rss_url, timeout=60)
if not response.ok:
logger.error(f"Failed to fetch Plex RSS feed from {rss_url}: HTTP {response.status_code}")
continue
if not hasattr(response.data, "items"):
logger.error("Invalid response or missing items attribute in response data.")
continue
results = response.data.items
for item in results:
yield self._extract_imdb_ids(item.guids)

xmltodict_data = xmltodict.parse(response.content)
items = xmltodict_data.get("rss", {}).get("channel", {}).get("item", [])
for item in items:
guid_text = item.get("guid", {}).get("#text", "")
guid_id = guid_text.split("//")[-1] if guid_text else ""
if not guid_id or guid_id in self.recurring_items:
continue
if guid_id.startswith("tt") and guid_id not in self.recurring_items:
yield guid_id
elif guid_id:
imdb_id: str = get_imdbid_from_tvdb(guid_id)
if not imdb_id or not imdb_id.startswith("tt") or imdb_id in self.recurring_items:
continue
yield imdb_id
else:
logger.log("NOT_FOUND", f"Failed to extract IMDb ID from {item['title']}")
continue
except Exception as e:
logger.error(f"An unexpected error occurred while fetching Plex RSS feed from {rss_url}: {e}")
continue

def _get_items_from_watchlist(self) -> Generator[MediaItem, None, None]:
"""Fetch media from Plex watchlist"""
filter_params = "includeFields=title,year,ratingkey&includeElements=Guid&sort=watchlistedAt:desc"
url = f"https://metadata.provider.plex.tv/library/sections/watchlist/all?X-Plex-Token={self.token}&{filter_params}"
response = get(url)
if not response.is_ok or not hasattr(response.data, "MediaContainer"):
logger.error("Invalid response or missing MediaContainer in response data.")
return
media_container = getattr(response.data, "MediaContainer", None)
if not media_container or not hasattr(media_container, "Metadata"):
logger.log("NOT_FOUND", "MediaContainer is missing Metadata attribute.")
return
for item in media_container.Metadata:
if hasattr(item, "ratingKey") and item.ratingKey:
imdb_id = self._ratingkey_to_imdbid(item.ratingKey)
if imdb_id:
# filter_params = "includeFields=title,year,ratingkey&includeElements=Guid&sort=watchlistedAt:desc"
# url = f"https://metadata.provider.plex.tv/library/sections/watchlist/all?X-Plex-Token={self.token}&{filter_params}"
# response = get(url)
items = self.account.watchlist()
for item in items:
if hasattr(item, "guids") and item.guids:
imdb_id = next((guid.id.split("//")[-1] for guid in item.guids if guid.id.startswith("imdb://")), None)
if imdb_id and imdb_id in self.recurring_items:
continue
elif imdb_id.startswith("tt"):
yield imdb_id
else:
logger.log("NOT_FOUND", f"Unable to extract IMDb ID from {item.title} ({item.year}) with data id: {imdb_id}")
else:
logger.log("NOT_FOUND", f"{item.title} ({item.year}) is missing ratingKey attribute from Plex")
logger.log("NOT_FOUND", f"{item.title} ({item.year}) is missing guids attribute from Plex")

@staticmethod
def _ratingkey_to_imdbid(ratingKey: str) -> str:
Expand Down
38 changes: 30 additions & 8 deletions src/program/db/db_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,50 @@
from program.types import Event
from sqlalchemy import func, select
from sqlalchemy.orm import joinedload
from sqlalchemy.exc import NoResultFound, IntegrityError, InvalidRequestError
from utils.logger import logger

from .db import db


def _ensure_item_exists_in_db(item:MediaItem) -> bool:
def _ensure_item_exists_in_db(item: MediaItem) -> bool:
if isinstance(item, (Movie, Show)):
with db.Session() as session:
return session.execute(select(func.count(MediaItem._id)).where(MediaItem.imdb_id==item.imdb_id)).scalar_one() != 0
return session.execute(select(func.count(MediaItem._id)).where(MediaItem.imdb_id == item.imdb_id)).scalar_one() != 0
return item._id is not None

def _get_item_type_from_db(item: MediaItem) -> str:
with db.Session() as session:
if item._id is None:
return session.execute(select(MediaItem.type).where( (MediaItem.imdb_id==item.imdb_id ) & ( (MediaItem.type == "show") | (MediaItem.type == "movie") ) )).scalar_one()
return session.execute(select(MediaItem.type).where(MediaItem._id==item._id)).scalar_one()

try:
if item._id is None:
return session.execute(select(MediaItem.type).where((MediaItem.imdb_id == item.imdb_id) & ((MediaItem.type == "show") | (MediaItem.type == "movie")))).scalar_one()
return session.execute(select(MediaItem.type).where(MediaItem._id == item._id)).scalar_one()
except NoResultFound as e:
logger.exception(f"No Result Found in db for {item.log_string} with id {item._id}: {e}")
except Exception as e:
logger.exception(f"Failed to get item type from db for item: {item.log_string} with id {item._id} - {e}")

def _store_item(item: MediaItem):
if isinstance(item, (Movie, Show, Season, Episode)) and item._id is not None:
with db.Session() as session:
session.merge(item)
session.commit()
try:
session.merge(item)
session.commit()
except IntegrityError as e:
logger.exception(f"IntegrityError: {e}. Attempting to update existing item.")
logger.warning(f"Attempting rollback of session for item: {item.log_string}")
session.rollback()
existing_item = session.query(MediaItem).filter_by(_id=item._id).one()
for key, value in item.__dict__.items():
if key != '_sa_instance_state' and key != '_id':
setattr(existing_item, key, value)
logger.warning(f"Committing changes to existing item: {item.log_string}")
session.commit()
except InvalidRequestError as e:
logger.exception(f"InvalidRequestError: {e}. Could not update existing item.")
session.rollback()
except Exception as e:
logger.exception(f"Failed to update existing item: {item.log_string} - {e}")
else:
with db.Session() as session:
_check_for_and_run_insertion_required(session, item)
Expand Down
31 changes: 22 additions & 9 deletions src/program/downloaders/realdebrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ def validate(self) -> bool:
def run(self, item: MediaItem) -> bool:
"""Download media item from real-debrid.com"""
return_value = False
if not item:
return return_value
if self.is_cached(item) and not self._is_downloaded(item):
self._download_item(item)
return_value = True
Expand All @@ -129,7 +131,7 @@ def log_item(item: MediaItem) -> None:

def is_cached(self, item: MediaItem) -> bool:
"""Check if item is cached on real-debrid.com"""
if not item.get("streams", {}):
if not item.get("streams", []):
return False

def _chunked(lst: List, n: int) -> Generator[List, None, None]:
Expand All @@ -140,9 +142,13 @@ def _chunked(lst: List, n: int) -> Generator[List, None, None]:
logger.log("DEBRID", f"Processing {len(item.streams)} streams for {item.log_string}")

processed_stream_hashes = set()
filtered_streams = [stream.infohash for stream in item.streams if stream.infohash and stream.infohash not in processed_stream_hashes]
filtered_streams = [
stream.infohash for stream in item.streams
if stream.infohash and stream.infohash not in processed_stream_hashes
and not stream.blacklisted
]
if not filtered_streams:
logger.log("NOT_FOUND", f"No streams found from filtering: {item.log_string}")
logger.log("NOT_FOUND", f"No streams found from filtering out processed and blacklisted hashes for: {item.log_string}")
return False

for stream_chunk in _chunked(filtered_streams, 5):
Expand All @@ -152,27 +158,34 @@ def _chunked(lst: List, n: int) -> Generator[List, None, None]:
if response.is_ok and response.data and isinstance(response.data, dict):
if self._evaluate_stream_response(response.data, processed_stream_hashes, item):
return True
processed_stream_hashes.update(stream_chunk)
except Exception as e:
logger.exception(f"Error checking cache for streams: {str(e)}", exc_info=True)
continue

if item.type == "movie" or item.type == "episode":
for hash in filtered_streams:
stream = next((stream for stream in item.streams if stream.infohash == hash), None)
if stream and not stream.blacklisted:
stream.blacklisted = True

logger.log("NOT_FOUND", f"No wanted cached streams found for {item.log_string} out of {len(filtered_streams)}")
return False

def _evaluate_stream_response(self, data, processed_stream_hashes, item):
def _evaluate_stream_response(self, data: dict, processed_stream_hashes: set, item: MediaItem) -> bool:
"""Evaluate the response data from the stream availability check."""
for stream_hash, provider_list in data.items():
if stream_hash in processed_stream_hashes:
stream = next((stream for stream in item.streams if stream.infohash == stream_hash), None)
if not stream or stream.blacklisted:
continue

if not provider_list or not provider_list.get("rd"):
stream.blacklisted = True
continue
processed_stream_hashes.add(stream_hash)

if self._process_providers(item, provider_list, stream_hash):
logger.debug(f"Finished processing providers - selecting {stream_hash} for downloading")
return True
else:
stream = next(stream for stream in item.streams if stream.infohash == stream_hash)
stream.blacklisted = True
return False

def _process_providers(self, item: MediaItem, provider_list: dict, stream_hash: str) -> bool:
Expand Down
Loading

0 comments on commit ce074b3

Please sign in to comment.