Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: start refactor on id handling #787

Merged
merged 51 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
a38677a
feat: start refactor on id handling
dreulavelle Oct 13, 2024
0ee9e0a
Merge branch 'main' into refactor/id_handling
dreulavelle Oct 13, 2024
3ecca79
fix: more fixes towards handling ids
dreulavelle Oct 13, 2024
ed3cec0
fix: more work done
dreulavelle Oct 14, 2024
fc20717
fix: improved retry lib for logging
dreulavelle Oct 14, 2024
a2d2eb8
fix: update deps and add psutil
dreulavelle Oct 14, 2024
24119af
fix: more work on getting new content added
dreulavelle Oct 14, 2024
8ee6565
fix: fix retry/reset endpoints
dreulavelle Oct 14, 2024
89d0101
fix: use event instead of add_item on retry endpoint
dreulavelle Oct 14, 2024
e4accec
fix: issue with duplicate symlink detection on db init
davidemarcoli Oct 14, 2024
2380635
fix: more work.. need to
dreulavelle Oct 15, 2024
76ddb8a
Merge branch 'main' into refactor/id_handling
dreulavelle Oct 15, 2024
aa9487d
fix: fixed session management when processing threads
dreulavelle Oct 16, 2024
cca83aa
fix: updated add_item
dreulavelle Oct 16, 2024
1679f31
fix: fix event processing with item_id handling
Oct 16, 2024
1758f7b
fix: attempt #1 at duplicates
dreulavelle Oct 17, 2024
3a7bc06
fix: attempt #2. use same removal process as remove endpoint
dreulavelle Oct 17, 2024
3a371bd
fix: check type first and delete duplicate shows appropriately
dreulavelle Oct 17, 2024
c4287b3
fix: repurpose deletions to work on any item
dreulavelle Oct 17, 2024
f04b697
fix: revert duplicate changes. change retry lib back to normal, set t…
dreulavelle Oct 17, 2024
a44c3ec
refactor: rename private methods to public, remove filtering items in…
Oct 17, 2024
b79c0c6
chore: rename store_item and add prefixes to threads to indentify
Oct 17, 2024
9fa042e
chore: Events to hold service, not service name, fixes multiple notif…
Oct 17, 2024
a24f511
fix: stopiter error fix
dreulavelle Oct 17, 2024
dba1689
fix: add logging...
dreulavelle Oct 17, 2024
553579e
fix: whoops
dreulavelle Oct 17, 2024
c213680
feat: start refactor on id handling
dreulavelle Oct 13, 2024
49d4533
fix: more fixes towards handling ids
dreulavelle Oct 13, 2024
47d37e6
fix: more work done
dreulavelle Oct 14, 2024
d16a71b
fix: improved retry lib for logging
dreulavelle Oct 14, 2024
e2a5774
fix: update deps and add psutil
dreulavelle Oct 14, 2024
3784795
fix: more work on getting new content added
dreulavelle Oct 14, 2024
163c9b4
fix: fix retry/reset endpoints
dreulavelle Oct 14, 2024
1b98548
fix: use event instead of add_item on retry endpoint
dreulavelle Oct 14, 2024
9d81885
fix: issue with duplicate symlink detection on db init
davidemarcoli Oct 14, 2024
304fc6d
fix: more work.. need to
dreulavelle Oct 15, 2024
649920c
fix: fixed session management when processing threads
dreulavelle Oct 16, 2024
e624ca1
fix: updated add_item
dreulavelle Oct 16, 2024
04e7c80
fix: fix event processing with item_id handling
Oct 16, 2024
069fad2
fix: attempt #1 at duplicates
dreulavelle Oct 17, 2024
1e4e34f
fix: attempt #2. use same removal process as remove endpoint
dreulavelle Oct 17, 2024
ebd3f8e
fix: check type first and delete duplicate shows appropriately
dreulavelle Oct 17, 2024
907be80
fix: repurpose deletions to work on any item
dreulavelle Oct 17, 2024
309167d
fix: revert duplicate changes. change retry lib back to normal, set t…
dreulavelle Oct 17, 2024
42bbd8b
refactor: rename private methods to public, remove filtering items in…
Oct 17, 2024
3ac18cf
chore: rename store_item and add prefixes to threads to indentify
Oct 17, 2024
f27577d
chore: Events to hold service, not service name, fixes multiple notif…
Oct 17, 2024
8f18ff1
fix: stopiter error fix
dreulavelle Oct 17, 2024
2044752
fix: add logging...
dreulavelle Oct 17, 2024
c01cc2f
fix: whoops
dreulavelle Oct 17, 2024
ac8b537
merged main into latest
dreulavelle Oct 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,050 changes: 541 additions & 509 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ subliminal = "^2.2.1"
rank-torrent-name = "^1.0.2"
jsonschema = "^4.23.0"
scalar-fastapi = "^1.0.3"
psutil = "^6.0.0"

[tool.poetry.group.dev.dependencies]
pyright = "^1.1.352"
Expand Down
34 changes: 18 additions & 16 deletions src/controllers/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ async def get_items_by_imdb_ids(request: Request, imdb_ids: str) -> list[dict]:

class ResetResponse(BaseModel):
message: str
ids: list[str]
ids: list[int]


@router.post(
Expand All @@ -284,7 +284,7 @@ async def reset_items(request: Request, ids: str) -> ResetResponse:
media_items_generator = get_media_items_by_ids(ids)
for media_item in media_items_generator:
try:
request.app.program.em.cancel_job(media_item)
request.app.program.em.cancel_job(media_item._id)
clear_streams(media_item)
reset_media_item(media_item)
except ValueError as e:
Expand All @@ -300,7 +300,7 @@ async def reset_items(request: Request, ids: str) -> ResetResponse:

class RetryResponse(BaseModel):
message: str
ids: list[str]
ids: list[int]


@router.post(
Expand All @@ -310,13 +310,15 @@ class RetryResponse(BaseModel):
operation_id="retry_items",
)
async def retry_items(request: Request, ids: str) -> RetryResponse:
"""Re-add items to the queue"""
ids = handle_ids(ids)
try:
media_items_generator = get_media_items_by_ids(ids)
for media_item in media_items_generator:
request.app.program.em.cancel_job(media_item)
request.app.program.em.cancel_job(media_item._id)
await asyncio.sleep(0.1) # Ensure cancellation is processed
request.app.program.em.add_item(media_item)
# request.app.program.em.add_item(media_item)
request.app.program.em.add_event(Event("RetryItem", media_item._id))
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))

Expand All @@ -340,26 +342,26 @@ async def remove_item(request: Request, ids: str) -> RemoveResponse:
media_items: list[int] = get_parent_ids(ids)
if not media_items:
return HTTPException(status_code=404, detail="Item(s) not found")

for media_item in media_items:
logger.debug(f"Removing item with ID {media_item}")
request.app.program.em.cancel_job_by_id(media_item)
for item_id in media_items:
logger.debug(f"Removing item with ID {item_id}")
request.app.program.em.cancel_job(item_id)
await asyncio.sleep(0.2) # Ensure cancellation is processed
clear_streams_by_id(media_item)
clear_streams_by_id(item_id)

symlink_service = request.app.program.services.get(Symlinker)
if symlink_service:
symlink_service.delete_item_symlinks_by_id(media_item)
symlink_service.delete_item_symlinks_by_id(item_id)

with db.Session() as session:
requested_id = session.execute(select(MediaItem.requested_id).where(MediaItem._id == media_item)).scalar_one()
requested_id = session.execute(select(MediaItem.requested_id).where(MediaItem._id == item_id)).scalar_one()
if requested_id:
logger.debug(f"Deleting request from Overseerr with ID {requested_id}")
Overseerr.delete_request(requested_id)

logger.debug(f"Deleting item from database with ID {media_item}")
delete_media_item_by_id(media_item)
logger.info(f"Successfully removed item with ID {media_item}")
logger.debug(f"Deleting item from database with ID {item_id}")
delete_media_item_by_id(item_id)
logger.info(f"Successfully removed item with ID {item_id}")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))

Expand Down Expand Up @@ -489,7 +491,7 @@ def set_torrent_rd(request: Request, id: int, torrent_id: str) -> SetTorrentRDRe

session.commit()

request.app.program.em.add_event(Event("Symlinker", item))
request.app.program.em.add_event(Event("Symlinker", item._id))

return {
"success": True,
Expand Down
18 changes: 4 additions & 14 deletions src/controllers/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pydantic
from fastapi import APIRouter, Request
from program.content.overseerr import Overseerr
from program.db.db_functions import _ensure_item_exists_in_db
from program.db.db_functions import ensure_item_exists_in_db
from program.indexers.trakt import get_imdbid_from_tmdb, get_imdbid_from_tvdb
from program.media.item import MediaItem
from requests import RequestException
Expand Down Expand Up @@ -40,22 +40,12 @@ async def overseerr(request: Request) -> Dict[str, Any]:
logger.error("Overseerr not initialized")
return {"success": False, "message": "Overseerr not initialized"}

try:
new_item = MediaItem({"imdb_id": imdb_id, "requested_by": "overseerr", "requested_id": req.request.request_id})
except Exception as e:
logger.error(f"Failed to create item for {imdb_id}: {e}")
return {"success": False, "message": str(e)}

if _ensure_item_exists_in_db(new_item) or imdb_id in overseerr.recurring_items:
new_item = MediaItem({"imdb_id": imdb_id, "requested_by": "overseerr", "requested_id": req.request.request_id})
if ensure_item_exists_in_db(new_item):
logger.log("API", "Request already in queue or already exists in the database")
return {"success": True}
else:
overseerr.recurring_items.add(imdb_id)

try:
request.app.program.em.add_item(new_item)
except Exception as e:
logger.error(f"Failed to add item for {imdb_id}: {e}")
request.app.program.em.add_item(new_item, service="Overseerr")

return {"success": True}

Expand Down
27 changes: 26 additions & 1 deletion src/program/content/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,32 @@
# from typing import Generator
# from program.media.item import MediaItem

from .listrr import Listrr
from .mdblist import Mdblist
from .overseerr import Overseerr
from .plex_watchlist import PlexWatchlist
from .trakt import TraktContent

__all__ = ["Listrr", "Mdblist", "Overseerr", "PlexWatchlist", "TraktContent"]
__all__ = ["Listrr", "Mdblist", "Overseerr", "PlexWatchlist", "TraktContent"]

# class Requester:
# def __init__(self):
# self.key = "content"
# self.initialized = False
# self.services = {
# Listrr: Listrr(),
# Mdblist: Mdblist(),
# Overseerr: Overseerr(),
# PlexWatchlist: PlexWatchlist(),
# TraktContent: TraktContent()
# }
# self.initialized = self.validate()
# if not self.initialized:
# return

# def validate(self):
# return any(service.initialized for service in self.services.values())

# def run(self, item: MediaItem) -> Generator[MediaItem, None, None]:
# """Index newly requested items."""
# yield item
12 changes: 2 additions & 10 deletions src/program/content/listrr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from requests.exceptions import HTTPError

from program.db.db_functions import _filter_existing_items
from program.indexers.trakt import get_imdbid_from_tmdb
from program.media.item import MediaItem
from program.settings.manager import settings_manager
Expand All @@ -22,7 +21,6 @@ def __init__(self):
self.initialized = self.validate()
if not self.initialized:
return
self.recurring_items: set[str] = set()
logger.success("Listrr initialized!")

def validate(self) -> bool:
Expand Down Expand Up @@ -67,14 +65,8 @@ def run(self) -> Generator[MediaItem, None, None]:
return

listrr_items = movie_items + show_items
non_existing_items = _filter_existing_items(listrr_items)
new_non_recurring_items = [item for item in non_existing_items if item.imdb_id not in self.recurring_items]
self.recurring_items.update([item.imdb_id for item in new_non_recurring_items])

if new_non_recurring_items:
logger.info(f"Fetched {len(new_non_recurring_items)} new items from Listrr")

yield new_non_recurring_items
logger.info(f"Fetched {len(listrr_items)} items from Listrr")
yield listrr_items

def _get_items_from_Listrr(self, content_type, content_lists) -> list[MediaItem]: # noqa: C901, PLR0912
"""Fetch unique IMDb IDs from Listrr for a given type and list of content."""
Expand Down
11 changes: 2 additions & 9 deletions src/program/content/mdblist.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from typing import Generator

from program.db.db_functions import _filter_existing_items
from program.media.item import MediaItem
from program.settings.manager import settings_manager
from utils.logger import logger
Expand All @@ -21,7 +20,6 @@ def __init__(self):
return
self.requests_per_2_minutes = self._calculate_request_time()
self.rate_limiter = RateLimiter(self.requests_per_2_minutes, 120, True)
self.recurring_items = set()
logger.success("mdblist initialized")

def validate(self):
Expand Down Expand Up @@ -63,13 +61,8 @@ def run(self) -> Generator[MediaItem, None, None]:
except RateLimitExceeded:
pass

non_existing_items = _filter_existing_items(items_to_yield)
new_non_recurring_items = [item for item in non_existing_items if item.imdb_id not in self.recurring_items and isinstance(item, MediaItem)]
self.recurring_items.update([item.imdb_id for item in new_non_recurring_items])

if new_non_recurring_items:
logger.info(f"Found {len(new_non_recurring_items)} new items to fetch")
yield new_non_recurring_items
logger.info(f"Fetched {len(items_to_yield)} items from mdblist.com")
yield items_to_yield

def _calculate_request_time(self):
limits = my_limits(self.settings.api_key).limits
Expand Down
10 changes: 2 additions & 8 deletions src/program/content/overseerr.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from requests.exceptions import ConnectionError, RetryError
from urllib3.exceptions import MaxRetryError, NewConnectionError

from program.db.db_functions import _filter_existing_items
from program.indexers.trakt import get_imdbid_from_tmdb
from program.media.item import MediaItem
from program.settings.manager import settings_manager
Expand All @@ -24,7 +23,6 @@ def __init__(self):
self.run_once = False
if not self.initialized:
return
self.recurring_items: set[str] = set()
logger.success("Overseerr initialized!")

def validate(self) -> bool:
Expand Down Expand Up @@ -58,18 +56,14 @@ def run(self):
return

overseerr_items: list[MediaItem] = self.get_media_requests()
non_existing_items = _filter_existing_items(overseerr_items)
new_non_recurring_items = [item for item in non_existing_items if item.imdb_id not in self.recurring_items and isinstance(item, MediaItem)]
self.recurring_items.update([item.imdb_id for item in new_non_recurring_items])

if self.settings.use_webhook:
logger.debug("Webhook is enabled. Running Overseerr once before switching to webhook only mode")
self.run_once = True

if new_non_recurring_items:
logger.info(f"Fetched {len(new_non_recurring_items)} new items from Overseerr")
logger.info(f"Fetched {len(overseerr_items)} items from overseerr")

yield new_non_recurring_items
yield overseerr_items

def get_media_requests(self) -> list[MediaItem]:
"""Get media requests from `Overseerr`"""
Expand Down
10 changes: 2 additions & 8 deletions src/program/content/plex_watchlist.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from requests import HTTPError, Session
from urllib3 import HTTPConnectionPool

from program.db.db_functions import _filter_existing_items
from program.media.item import Episode, MediaItem, Movie, Season, Show
from program.settings.manager import settings_manager
from utils.logger import logger
Expand All @@ -25,7 +24,6 @@ def __init__(self):
self.initialized = self.validate()
if not self.initialized:
return
self.recurring_items: set[str] = set() # set of imdb ids
logger.success("Plex Watchlist initialized!")

def validate(self):
Expand Down Expand Up @@ -70,14 +68,10 @@ def run(self) -> Generator[MediaItem, None, None]:

plex_items: set[str] = set(watchlist_items) | set(rss_items)
items_to_yield: list[MediaItem] = [MediaItem({"imdb_id": imdb_id, "requested_by": self.key}) for imdb_id in plex_items if imdb_id and imdb_id.startswith("tt")]
non_existing_items = _filter_existing_items(items_to_yield)
new_non_recurring_items = [item for item in non_existing_items if item.imdb_id not in self.recurring_items and isinstance(item, MediaItem)]
self.recurring_items.update([item.imdb_id for item in new_non_recurring_items])

if new_non_recurring_items:
logger.info(f"Found {len(new_non_recurring_items)} new items to fetch")
logger.info(f"Fetched {len(items_to_yield)} items from plex watchlist")
yield items_to_yield

yield new_non_recurring_items

def _get_items_from_rss(self) -> list[str]:
"""Fetch media from Plex RSS Feeds."""
Expand Down
18 changes: 2 additions & 16 deletions src/program/content/trakt.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from requests import RequestException

from program.db.db_functions import _filter_existing_items
from program.media.item import MediaItem
from program.settings.manager import settings_manager
from utils.logger import logger
Expand All @@ -30,8 +29,6 @@ def __init__(self):
if not self.initialized:
return
self.next_run_time = 0
self.recurring_items = set()
self.items_already_seen = set()
self.missing()
logger.success("Trakt initialized!")

Expand Down Expand Up @@ -93,19 +90,8 @@ def run(self):
if not items_to_yield:
return

non_existing_items = _filter_existing_items(items_to_yield)
new_non_recurring_items = [
item
for item in non_existing_items
if item.imdb_id not in self.recurring_items
and isinstance(item, MediaItem)
]
self.recurring_items.update(item.imdb_id for item in new_non_recurring_items)

if new_non_recurring_items:
logger.log("TRAKT", f"Found {len(new_non_recurring_items)} new items to fetch")

yield new_non_recurring_items
logger.info(f"Fetched {len(items_to_yield)} items from trakt")
yield items_to_yield

def _get_watchlist(self, watchlist_users: list) -> list:
"""Get IMDb IDs from Trakt watchlist"""
Expand Down
2 changes: 1 addition & 1 deletion src/program/db/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from utils import data_dir_path
from utils.logger import logger

engine_options={
engine_options = {
"pool_size": 25, # Prom: Set to 1 when debugging sql queries
"max_overflow": 25, # Prom: Set to 0 when debugging sql queries
"pool_pre_ping": True, # Prom: Set to False when debugging sql queries
Expand Down
Loading
Loading