Skip to content

Commit

Permalink
Better logic for monitored entities (#1074)
Browse files Browse the repository at this point in the history
* chore(services/misc): change order of background jobs

* feat(migrations): add new column to monitored_entity table

* chore(models): add new column to table

* refactor(backend): change name of function

* build(models): add new required dep

* feat(utils): new function to get ids

* feat(backend): do not remove entity from monitored collection if it has had updates

* chore(utils): add logging stmt

* chore(services/misc): change name of function

* chore(models): use correct type for column

* fix(services/misc): change logic for removing monitored metadata

* chore(services): add more deps

* chore(services/cache): emit log after insert

* refactor(backend): move logic to same place

* build(services/cache): add transient dep

* chore(services/cache): add logging

* fix(services/integrations): handle duplicate seen entries being created

* fix(services/integration): use better types

* chore(services/integrations): do not pass extra attributes

* chore(backend): format files

* fix(services/integrations): set correct value for ended on

* chore(services/integration): remove useless newlines
  • Loading branch information
IgnisDa authored Oct 19, 2024
1 parent a6163bf commit cc66f81
Show file tree
Hide file tree
Showing 17 changed files with 125 additions and 46 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/migrations/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ mod m20241004_create_application_cache;
mod m20241006_changes_for_issue_1056;
mod m20241010_changes_for_issue_708;
mod m20241013_changes_for_issue_1052;
mod m20241019_changes_for_issue_929;

pub use m20230410_create_metadata::Metadata as AliasedMetadata;
pub use m20230413_create_person::Person as AliasedPerson;
Expand Down Expand Up @@ -108,6 +109,7 @@ impl MigratorTrait for Migrator {
Box::new(m20241006_changes_for_issue_1056::Migration),
Box::new(m20241010_changes_for_issue_708::Migration),
Box::new(m20241013_changes_for_issue_1052::Migration),
Box::new(m20241019_changes_for_issue_929::Migration),
]
}
}
1 change: 1 addition & 0 deletions crates/migrations/src/m20240904_create_monitored_entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub static MONITORED_ENTITY_VIEW_CREATION_SQL: &str = indoc! { r#"
ute."user_id",
cte."entity_id",
cte."entity_lot",
cte."id" as "collection_to_entity_id",
cte."collection_id" AS "origin_collection_id"
FROM
"collection_to_entity" cte
Expand Down
22 changes: 22 additions & 0 deletions crates/migrations/src/m20241019_changes_for_issue_929.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use sea_orm_migration::prelude::*;

use crate::m20240904_create_monitored_entity::MONITORED_ENTITY_VIEW_CREATION_SQL;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let db = manager.get_connection();
db.execute_unprepared(r#"DROP VIEW "monitored_entity""#)
.await?;
db.execute_unprepared(MONITORED_ENTITY_VIEW_CREATION_SQL)
.await?;
Ok(())
}

async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> {
Ok(())
}
}
1 change: 1 addition & 0 deletions crates/models/database/src/monitored_entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub entity_lot: EntityLot,
pub origin_collection_id: String,
pub collection_to_entity_id: Uuid,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
3 changes: 2 additions & 1 deletion crates/models/database/src/seen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use async_graphql::SimpleObject;
use async_trait::async_trait;
use chrono::NaiveDate;
use chrono::{NaiveDate, Utc};
use educe::Educe;
use enums::{EntityLot, SeenState};
use media_models::{
Expand Down Expand Up @@ -102,6 +102,7 @@ impl ActiveModelBehavior for ActiveModel {
let progress = self.progress.clone().unwrap();
if progress == dec!(100) && state == SeenState::InProgress {
self.state = ActiveValue::Set(SeenState::Completed);
self.finished_on = ActiveValue::Set(Some(Utc::now().date_naive()));
}
if insert {
self.id = ActiveValue::Set(format!("see_{}", nanoid!(12)));
Expand Down
4 changes: 2 additions & 2 deletions crates/providers/src/mal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use convert_case::{Case, Casing};
use dependent_models::SearchResults;
use enums::{MediaLot, MediaSource};
use media_models::{
AnimeSpecifics, MangaSpecifics, MetadataDetails, MetadataImageForMediaDetails, MetadataSearchItem,
PartialMetadataWithoutId,
AnimeSpecifics, MangaSpecifics, MetadataDetails, MetadataImageForMediaDetails,
MetadataSearchItem, PartialMetadataWithoutId,
};
use rand::{seq::SliceRandom, thread_rng};
use reqwest::{
Expand Down
5 changes: 5 additions & 0 deletions crates/services/cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ edition = "2021"
async-graphql = { workspace = true }
chrono = { workspace = true }
common-models = { path = "../../models/common" }
common-utils = { path = "../../utils/common" }
database-models = { path = "../../models/database" }
sea-orm = { workspace = true }
sea-query = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true }

[package.metadata.cargo-machete]
ignored = ["tracing"]
5 changes: 4 additions & 1 deletion crates/services/cache/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use async_graphql::Result;
use chrono::{Duration, Utc};
use common_models::ApplicationCacheKey;
use common_utils::ryot_log;
use database_models::{application_cache, prelude::ApplicationCache};
use sea_orm::{ActiveValue, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use sea_query::OnConflict;
Expand Down Expand Up @@ -40,7 +41,9 @@ impl CacheService {
)
.exec(&self.db)
.await?;
Ok(inserted.last_insert_id)
let insert_id = inserted.last_insert_id;
ryot_log!(debug, "Inserted application cache with id = {insert_id:?}");
Ok(insert_id)
}

pub async fn get(&self, key: ApplicationCacheKey) -> Result<Option<()>> {
Expand Down
2 changes: 1 addition & 1 deletion crates/services/importer/src/jellyfin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ pub async fn import(input: DeployUrlAndKeyAndUsernameImportInput) -> Result<Impo
seen_history: vec![seen],
identifier: tmdb_id,
collections,
..Default::default()
..Default::default()
});
} else {
failed_items.push(ImportFailedItem {
Expand Down
1 change: 1 addition & 0 deletions crates/services/integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl IntegrationService {
None => false,
});
media.seen_history.iter_mut().for_each(|update| {
update.ended_on = Some(Utc::now().date_naive());
if let Some(progress) = update.progress {
if progress > integration.maximum_progress.unwrap() {
ryot_log!(
Expand Down
2 changes: 1 addition & 1 deletion crates/services/integration/src/sink/emby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ impl EmbyIntegration {
identifier,
source: MediaSource::Tmdb,
seen_history: vec![ImportOrExportMediaItemSeen {
provider_watched_on: Some("Emby".to_string()),
progress: Some(position / runtime * dec!(100)),
show_season_number: payload.item.season_number,
show_episode_number: payload.item.episode_number,
provider_watched_on: Some("Emby".to_string()),
..Default::default()
}],
..Default::default()
Expand Down
2 changes: 1 addition & 1 deletion crates/services/integration/src/sink/kodi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ impl KodiIntegration {
identifier: payload.identifier,
seen_history: vec![ImportOrExportMediaItemSeen {
progress: Some(payload.progress),
provider_watched_on: Some("Kodi".to_string()),
show_season_number: payload.show_season_number,
show_episode_number: payload.show_episode_number,
provider_watched_on: Some("Kodi".to_string()),
..Default::default()
}],
..Default::default()
Expand Down
17 changes: 9 additions & 8 deletions crates/services/integration/src/yank/komga.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ impl KomgaIntegration {
}
}

fn calculate_percentage(current_page: i32, total_page: i32) -> Decimal {
fn calculate_percentage(&self, current_page: i32, total_page: i32) -> Decimal {
if total_page == 0 {
return dec!(0);
}
Expand Down Expand Up @@ -375,12 +375,11 @@ impl KomgaIntegration {
force_update: None,
},
ImportOrExportMediaItemSeen {
manga_chapter_number: Some(book.metadata.number.parse().unwrap_or_default()),
progress: Some(Self::calculate_percentage(
book.read_progress.page,
book.media.pages_count,
)),
progress: Some(
self.calculate_percentage(book.read_progress.page, book.media.pages_count),
),
provider_watched_on: Some("Komga".to_string()),
manga_chapter_number: Some(book.metadata.number.parse().unwrap_or_default()),
..Default::default()
},
))
Expand Down Expand Up @@ -422,7 +421,9 @@ impl KomgaIntegration {
})
.collect()
.await;
result.metadata.extend(unique_collection_updates.into_values());
result
.metadata
.extend(unique_collection_updates.into_values());
Ok(())
}

Expand Down Expand Up @@ -500,8 +501,8 @@ impl KomgaIntegration {
result.metadata.push(ImportOrExportMediaItem {
lot: commit.lot,
source: commit.source,
identifier: commit.identifier,
seen_history: vec![hist],
identifier: commit.identifier,
..Default::default()
});
});
Expand Down
41 changes: 22 additions & 19 deletions crates/services/miscellaneous/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ use dependent_models::{
use dependent_utils::{
after_media_seen_tasks, commit_metadata, commit_metadata_group_internal,
commit_metadata_internal, commit_person, create_partial_metadata, deploy_background_job,
deploy_update_metadata_job, get_entities_monitored_by, get_metadata_provider,
get_openlibrary_service, get_tmdb_non_media_service, is_metadata_finished_by_user, post_review,
progress_update, queue_media_state_changed_notification_for_user,
queue_notifications_to_user_platforms, update_metadata_and_notify_users,
deploy_update_metadata_job, get_metadata_provider, get_openlibrary_service,
get_tmdb_non_media_service, get_users_and_cte_monitoring_entity, get_users_monitoring_entity,
is_metadata_finished_by_user, post_review, progress_update,
queue_media_state_changed_notification_for_user, queue_notifications_to_user_platforms,
refresh_collection_to_entity_association, update_metadata_and_notify_users,
};
use enums::{
EntityLot, MediaLot, MediaSource, MetadataToMetadataRelation, SeenState, UserToMediaReason,
Expand Down Expand Up @@ -1992,7 +1993,7 @@ ORDER BY RANDOM() LIMIT 10;
Ok(monitored_by)
}

async fn update_watchlist_metadata_and_queue_notifications(&self) -> Result<()> {
async fn update_monitored_metadata_and_queue_notifications(&self) -> Result<()> {
let meta_map = self.get_monitored_entities(EntityLot::Metadata).await?;
ryot_log!(
debug,
Expand Down Expand Up @@ -2650,7 +2651,7 @@ ORDER BY RANDOM() LIMIT 10;
.collect_vec();
for (metadata_id, notification) in notifications.into_iter() {
let users_to_notify =
get_entities_monitored_by(&metadata_id, EntityLot::Metadata, &self.0.db).await?;
get_users_monitoring_entity(&metadata_id, EntityLot::Metadata, &self.0.db).await?;
for user in users_to_notify {
queue_media_state_changed_notification_for_user(&user, &notification, &self.0)
.await?;
Expand Down Expand Up @@ -2749,16 +2750,20 @@ ORDER BY RANDOM() LIMIT 10;
.unwrap_or_default();
if !notifications.is_empty() {
let users_to_notify =
get_entities_monitored_by(&person_id, EntityLot::Person, &self.0.db).await?;
get_users_and_cte_monitoring_entity(&person_id, EntityLot::Person, &self.0.db)
.await?;
for notification in notifications {
for user_id in users_to_notify.iter() {
for (user_id, cte_id) in users_to_notify.iter() {
queue_media_state_changed_notification_for_user(
user_id,
&notification,
&self.0,
)
.await
.trace_ok();
refresh_collection_to_entity_association(cte_id, &self.0.db)
.await
.trace_ok();
}
}
}
Expand All @@ -2782,7 +2787,7 @@ ORDER BY RANDOM() LIMIT 10;

pub async fn handle_review_posted_event(&self, event: ReviewPostedEvent) -> Result<()> {
let monitored_by =
get_entities_monitored_by(&event.obj_id, event.entity_lot, &self.0.db).await?;
get_users_monitoring_entity(&event.obj_id, event.entity_lot, &self.0.db).await?;
let users = User::find()
.select_only()
.column(user::Column::Id)
Expand Down Expand Up @@ -2856,23 +2861,21 @@ ORDER BY RANDOM() LIMIT 10;
struct CustomQueryResponse {
id: Uuid,
created_on: DateTimeUtc,
last_updated_on: Option<DateTimeUtc>,
last_updated_on: DateTimeUtc,
}
let all_cte = CollectionToEntity::find()
.select_only()
.column(collection_to_entity::Column::Id)
.column(collection_to_entity::Column::CreatedOn)
.column(metadata::Column::LastUpdatedOn)
.left_join(Metadata)
.column(collection_to_entity::Column::LastUpdatedOn)
.inner_join(Collection)
.filter(collection::Column::Name.eq(DefaultCollection::Monitoring.to_string()))
.order_by_asc(collection_to_entity::Column::Id)
.into_model::<CustomQueryResponse>()
.all(&self.0.db)
.await?;
let mut to_delete = vec![];
for cte in all_cte {
let delta = cte.last_updated_on.unwrap_or_else(Utc::now) - cte.created_on;
let delta = cte.last_updated_on - cte.created_on;
if delta.num_days().abs() > self.0.config.media.monitoring_remove_after_days {
to_delete.push(cte.id);
}
Expand Down Expand Up @@ -3059,18 +3062,18 @@ ORDER BY RANDOM() LIMIT 10;

ryot_log!(trace, "Invalidating invalid media import jobs");
self.invalidate_import_jobs().await.trace_ok();
ryot_log!(trace, "Removing stale entities from Monitoring collection");
self.remove_old_entities_from_monitoring_collection()
.await
.trace_ok();
ryot_log!(trace, "Checking for updates for media in Watchlist");
self.update_watchlist_metadata_and_queue_notifications()
self.update_monitored_metadata_and_queue_notifications()
.await
.trace_ok();
ryot_log!(trace, "Checking for updates for monitored people");
self.update_monitored_people_and_queue_notifications()
.await
.trace_ok();
ryot_log!(trace, "Removing stale entities from Monitoring collection");
self.remove_old_entities_from_monitoring_collection()
.await
.trace_ok();
ryot_log!(trace, "Checking and queuing any pending reminders");
self.queue_pending_reminders().await.trace_ok();
ryot_log!(trace, "Recalculating calendar events");
Expand Down
1 change: 1 addition & 0 deletions crates/utils/dependent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ supporting-service = { path = "../../services/supporting" }
tracing = { workspace = true }
traits = { path = "../../traits" }
user-models = { path = "../../models/user" }
uuid = { workspace = true }

[package.metadata.cargo-machete]
ignored = ["tracing"]
Loading

0 comments on commit cc66f81

Please sign in to comment.