Skip to content

Commit

Permalink
Add methods for individual checks
Browse files Browse the repository at this point in the history
  • Loading branch information
Spoked authored and Spoked committed Jan 14, 2024
1 parent c57e6a4 commit f1fc80a
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 93 deletions.
2 changes: 1 addition & 1 deletion backend/program/content/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def __init__(self, media_items):
self.initialized = False
self.key = "content"
self.running = False
self.sm = ServiceManager(media_items, False, Mdblist, Overseerr, PlexWatchlist)
self.sm = ServiceManager(media_items, False, Overseerr, Mdblist, PlexWatchlist)
if not self.validate():
logger.error("You have no content services enabled, please enable at least one!")
return
Expand Down
3 changes: 1 addition & 2 deletions backend/program/scrapers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from utils.service_manager import ServiceManager
from utils.settings import settings_manager as settings
from utils.logger import logger
from utils.parser import parser
from .torrentio import Torrentio
from .orionoid import Orionoid
from .jackett import Jackett
Expand All @@ -19,7 +18,7 @@ def __init__(self, _):
self.key = "scraping"
self.initialized = False
self.settings = ScrapingConfig(**settings.get(self.key))
self.sm = ServiceManager(None, False, Torrentio)
self.sm = ServiceManager(None, False, Torrentio, Orionoid, Jackett)
if not any(service.initialized for service in self.sm.services):
logger.error(
"You have no scraping services enabled, please enable at least one!"
Expand Down
59 changes: 30 additions & 29 deletions backend/program/scrapers/jackett.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,34 +68,35 @@ def _scrape_item(self, item):

def api_scrape(self, item):
"""Wrapper for torrentio scrape method"""
query = ""
if item.type == "movie":
query = f"&t=movie&imdbid={item.imdb_id}"
if item.type == "season":
query = f"&t=tv-search&imdbid={item.parent.imdb_id}&season={item.number}"
if item.type == "episode":
query = f"&t=tv-search&imdbid={item.parent.parent.imdb_id}&season={item.parent.number}&ep={item.number}"
with self.minute_limiter:
query = ""
if item.type == "movie":
query = f"&t=movie&imdbid={item.imdb_id}"
if item.type == "season":
query = f"&t=tv-search&imdbid={item.parent.imdb_id}&season={item.number}"
if item.type == "episode":
query = f"&t=tv-search&imdbid={item.parent.parent.imdb_id}&season={item.parent.number}&ep={item.number}"

url = (
f"{self.settings.url}/api/v2.0/indexers/all/results/torznab?apikey={self.api_key}{query}"
)
try:
response = get(url=url, retry_if_failed=False, timeout=60)
if response.is_ok:
data = {}
if not hasattr(response.data['rss']['channel'], "item"):
return {}
for stream in response.data['rss']['channel']['item']:
title = stream.get('title')
for attr in stream.get('torznab:attr', []):
if attr.get('@name') == 'infohash':
infohash = attr.get('@value')
if parser.parse(title) and infohash:
data[infohash] = {"name": title}
# TODO: Sort data using parser and user preferences
if len(data) > 0:
return data
return {}
except ReadTimeout:
logger.debug("Jackett timed out for %s", item.log_string)
url = (
f"{self.settings.url}/api/v2.0/indexers/all/results/torznab?apikey={self.api_key}{query}"
)
try:
with self.second_limiter:
response = get(url=url, retry_if_failed=False, timeout=60)
if response.is_ok:
data = {}
if not hasattr(response.data['rss']['channel'], "item"):
return {}
for stream in response.data['rss']['channel']['item']:
title = stream.get('title')
for attr in stream.get('torznab:attr', []):
if attr.get('@name') == 'infohash':
infohash = attr.get('@value')
if parser.parse(title) and infohash:
data[infohash] = {"name": title}
if len(data) > 0:
return parser.sort_streams(data)
return {}
except ReadTimeout:
logger.debug("Jackett timed out for %s", item.log_string)
return {}
8 changes: 3 additions & 5 deletions backend/program/scrapers/orionoid.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,8 @@ def api_scrape(self, item):
data = {}
for stream in response.data.data.streams:
title = stream.file.name
infoHash = stream.file.hash
if parser.parse(title) and infoHash:
data[infoHash] = {"name": title}
# TODO: Sort data using parser and user preferences
if parser.parse(title) and stream.file.hash:
data[stream.file.hash] = {"name": title}
if len(data) > 0:
return data
return parser.sort_streams(data)
return {}
5 changes: 3 additions & 2 deletions backend/program/scrapers/torrentio.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,14 @@ def api_scrape(self, item):
response = get(f"{url}.json", retry_if_failed=False)
if response.is_ok:
data = {}
if len(response.data.streams) == 0:
return data
for stream in response.data.streams:
title = stream.title.split("\n👤")[0]
if parser.parse(title):
data[stream.infoHash] = {
"name": title,
}
# TODO: Sort data using parser and user preferences
if len(data) > 0:
return parser.sort_and_filter_streams(data)
return parser.sort_streams(data)
return {}
131 changes: 77 additions & 54 deletions backend/utils/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,46 +9,48 @@ class ParserConfig(BaseModel):
language: List[str]
include_4k: bool
highest_quality: bool
repack_proper: bool
dual_audio: bool # This sometimes doesnt work depending on if other audio is in the title
av1_audio: bool


class Parser:

def __init__(self):
self.settings = ParserConfig(**settings_manager.get("parser"))
self.language = self.settings.language
self.language = self.settings.language or ["English"]
self.resolution = ["1080p", "720p"]
self.unwanted_codec = ["H.265 Main 10", "H.265", "H.263", "Xvid"] # Bad for transcoding
self.quality = [None, "Blu-ray", "WEB-DL", "WEBRip", "HDRip",
"HDTVRip", "BDRip", "Pay-Per-View Rip"]
self.unwanted_quality = ["Cam", "Telesync", "Telecine", "Screener",
"DVDSCR", "Workprint", "DVD-Rip", "TVRip",
"VODRip", "DVD-R", "DSRip", "BRRip"]
self.audio = [None, "AAC", "AAC 2.0", "FLAC", "Custom"]
self.quality = [None, "Blu-ray", "WEB-DL", "WEBRip", "HDRip",
"HDTVRip", "BDRip", "Pay-Per-View Rip"]
self.audio = [None, "AAC", "AAC 2.0", "FLAC", "AVC", "Custom"]
self.network = ["Apple TV+", "Amazon Studios", "Netflix",
"Nickelodeon", "YouTube Premium", "Disney Plus",
"DisneyNOW", "HBO Max", "HBO", "Hulu Networks",
"DC Universe", "Adult Swim", "Comedy Central",
"Peacock", "AMC", "PBS", "Crunchyroll"] # Will probably be used later in `Versions`
"Peacock", "AMC", "PBS", "Crunchyroll", "Syndication"] # Will probably be used later in `Versions`
self.validate_settings()

def validate_settings(self):
if self.settings.include_4k or self.settings.highest_quality:
self.resolution += ["2160p", "4K"]
if self.settings.highest_quality:
self.resolution += ["UHD"]
self.resolution = ["UHD", "2160p", "4K", "1080p", "720p"]
self.unwanted_codec -= ["H.265 Main 10", "H.265"]
self.audio += ["Dolby TrueHD", "Dolby Atmos",
"Dolby Digital EX", "Dolby Digital Plus",
"Dolby Digital Plus 5.1", "Dolby Digital Plus 7.1"
"DTS-HD MA", "DTS-HD MA", "DTS-HD",
"DTS-HD MA", "DTS-HD MA", "DTS-HD", "DTS-HD MA 5.1"
"DTS-EX", "DTS:X", "DTS", "5.1", "7.1"]
self.unwanted_codec -= ["H.265 Main 10", "H.265"]
elif self.settings.include_4k:
self.resolution = ["2160p", "4K", "1080p", "720p"]
else:
self.resolution = ["1080p", "720p"]
if self.settings.dual_audio:
self.audio += ["Dual"]
if not self.settings.av1_audio:
self.unwanted_codec += ["AV1"] # Not all devices support this
# if self.settings.low_resolution:
# self.resolution += ["480p", "360p"] # This needs work. Should check item.year as well?
self.unwanted_codec += ["AV1"] # Not all devices support this codec

def _parse(self, string):
parse = PTN.parse(string)
Expand All @@ -65,11 +67,13 @@ def _parse(self, string):

season = parse.get("season")
audio = parse.get("audio")
codec = parse.get("codec")
resolution = parse.get("resolution")
quality = parse.get("quality")
subtitles = parse.get("subtitles")
language = parse.get("language")
hdr = parse.get("hdr")
upscaled = parse.get("upscaled")
remastered = parse.get("remastered")
proper = parse.get("proper")
repack = parse.get("repack")
Expand All @@ -79,70 +83,89 @@ def _parse(self, string):
extended = parse.get("extended")

return {
"episodes": episodes or [],
"resolution": resolution or [],
"quality": quality or [],
"audio": audio or None,
"hdr": hdr or None,
"remastered": remastered or None,
"proper": proper or None,
"repack": repack or None,
"season": season,
"episodes": episodes or [],
"codec": codec or [],
"audio": audio or [],
"hdr": hdr or False,
"upscaled": upscaled or False,
"remastered": remastered or False,
"proper": proper or False,
"repack": repack or False,
"subtitles": subtitles or [],
"language": language or [],
"remux": remux or None,
"remux": remux or False,
"extended": extended,
"season": season,
}

def episodes(self, string):
def episodes(self, string) -> List[int]:
parse = self._parse(string)
return parse["episodes"]

def episodes_in_season(self, season, string):
def episodes_in_season(self, season, string) -> List[int]:
parse = self._parse(string)
if parse["season"] == season:
return parse["episodes"]
return []

def sort_dual_audio(self, string):
"""Check if content has dual audio."""
# TODO: This could use improvement.. untested.
parse = self._parse(string)
if parse["audio"] == "Dual":
return True
elif re.search(r"((dual.audio)|(english|eng)\W+(dub|audio))", string, flags=re.IGNORECASE):
return True
def _is_4k(self, string) -> bool:
"""Check if content is `4k`."""
if self.settings.include_4k:
parsed = self._parse(string)
return parsed.get("resolution", False) in ["4K"]

def _is_highest_quality(self, string) -> bool:
"""Check if content is `highest quality`."""
if self.settings.highest_quality:
parsed = self._parse(string)
return any([
parsed.get("hdr", False),
parsed.get("remux", False),
parsed.get("audio", False) in self.audio,
parsed.get("resolution", False) in ["4K", "UHD"]
])

def _is_repack_or_proper(self, string) -> bool:
"""Check if content is `repack` or `proper`."""
if self.settings.repack_proper:
parsed = self._parse(string)
return any([
parsed.get("proper", False),
parsed.get("repack", False),
])

def _is_dual_audio(self, string) -> bool:
"""Check if content is `dual audio`."""
if self.settings.dual_audio:
parsed = self._parse(string)
return parsed.get("audio") == "Dual" or \
re.search(r"((dual.audio)|(english|eng)\W+(dub|audio))", string, flags=re.IGNORECASE) is not None
else:
return False

def remove_unwanted(self, string):
"""Filter out unwanted content."""
# TODO: This could use improvement.. untested.
parse = self._parse(string)
return not any([
parse["quality"] in self.unwanted_quality,
parse["codec"] in self.unwanted_codec
])

def sort_and_filter_streams(self, streams: dict) -> dict:
"""Sorts and filters streams based on user preferences"""
# TODO: Sort scraped data based on user preferences
# instead of scraping one item at a time.
filtered_sorted_streams = []
for info_hash, filename in streams.items():
title = filename.get("name", "")
if self.remove_unwanted(title):
filtered_sorted_streams.append((info_hash, filename, self.has_dual_audio(title)))
filtered_sorted_streams.sort(key=lambda x: x[2], reverse=True)
sorted_data = {info_hash: name for info_hash, name, _ in filtered_sorted_streams}
return sorted_data

def parse(self, string):
def sort_streams(self, streams: dict) -> dict:
"""Sorts streams based on user preferences."""
def sorting_key(item):
_, stream = item
title = stream['name']
return (
self._is_dual_audio(title),
self._is_repack_or_proper(title),
self._is_highest_quality(title),
self._is_4k(title)
)
sorted_streams = sorted(streams.items(), key=sorting_key, reverse=True)
return dict(sorted_streams)

def parse(self, string) -> bool:
parse = self._parse(string)
return (
parse["resolution"] in self.resolution
and parse["language"] in self.language
and not parse["quality"] in self.unwanted_quality
and not parse["codec"] in self.unwanted_codec
)

parser = Parser()

0 comments on commit f1fc80a

Please sign in to comment.