Skip to content

Commit

Permalink
chore: remove workspace and collab listener (#893)
Browse files Browse the repository at this point in the history
  • Loading branch information
appflowy authored Oct 17, 2024
1 parent 3623d9f commit ffee47d
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 65 deletions.
20 changes: 9 additions & 11 deletions services/appflowy-collaborate/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use std::time::Duration;

use access_control::casbin::collab::{CollabAccessControlImpl, RealtimeCollabAccessControlImpl};
use access_control::casbin::notification::spawn_listen_on_workspace_member_change;

use access_control::casbin::workspace::WorkspaceAccessControlImpl;
use actix::Supervisor;
use actix_web::dev::Server;
Expand All @@ -23,7 +23,6 @@ use appflowy_ai_client::client::AppFlowyAIClient;

use crate::api::{collab_scope, ws_scope};

use crate::collab::notification::spawn_listen_on_collab_member_change;
use crate::collab::storage::CollabStorageImpl;
use crate::command::{CLCommandReceiver, CLCommandSender};
use crate::config::{Config, DatabaseSetting};
Expand Down Expand Up @@ -119,15 +118,14 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
let pg_listeners = Arc::new(PgListeners::new(&pg_pool).await?);
let access_control =
AccessControl::new(pg_pool.clone(), metrics.access_control_metrics.clone()).await?;
let collab_member_listener = pg_listeners.subscribe_collab_member_change();
let workspace_member_listener = pg_listeners.subscribe_workspace_member_change();

spawn_listen_on_workspace_member_change(workspace_member_listener, access_control.clone());
spawn_listen_on_collab_member_change(
pg_pool.clone(),
collab_member_listener,
access_control.clone(),
);
// let collab_member_listener = pg_listeners.subscribe_collab_member_change();
// let workspace_member_listener = pg_listeners.subscribe_workspace_member_change();
// spawn_listen_on_workspace_member_change(workspace_member_listener, access_control.clone());
// spawn_listen_on_collab_member_change(
// pg_pool.clone(),
// collab_member_listener,
// access_control.clone(),
// );

let collab_access_control = CollabAccessControlImpl::new(access_control.clone());
let workspace_access_control = WorkspaceAccessControlImpl::new(access_control.clone());
Expand Down
32 changes: 3 additions & 29 deletions services/appflowy-collaborate/src/pg_listener.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,16 @@
use crate::collab::notification::CollabMemberNotification;
use access_control::casbin::notification::WorkspaceMemberNotification;
use anyhow::Error;
use database::listener::PostgresDBListener;
use database::pg_row::AFUserNotification;
use sqlx::PgPool;
use tokio::sync::broadcast;

pub struct PgListeners {
user_listener: UserListener,
workspace_member_listener: WorkspaceMemberListener,
collab_member_listener: CollabMemberListener,
}

impl PgListeners {
pub async fn new(pg_pool: &PgPool) -> Result<Self, Error> {
let user_listener = UserListener::new(pg_pool, "af_user_channel").await?;

let workspace_member_listener =
WorkspaceMemberListener::new(pg_pool, "af_workspace_member_channel").await?;

let collab_member_listener =
CollabMemberListener::new(pg_pool, "af_collab_member_channel").await?;

Ok(Self {
user_listener,
workspace_member_listener,
collab_member_listener,
})
}

pub fn subscribe_workspace_member_change(
&self,
) -> broadcast::Receiver<WorkspaceMemberNotification> {
self.workspace_member_listener.notify.subscribe()
}

pub fn subscribe_collab_member_change(&self) -> broadcast::Receiver<CollabMemberNotification> {
self.collab_member_listener.notify.subscribe()
Ok(Self { user_listener })
}

pub fn subscribe_user_change(&self, uid: i64) -> tokio::sync::mpsc::Receiver<AFUserNotification> {
Expand All @@ -55,6 +29,6 @@ impl PgListeners {
}
}

pub type CollabMemberListener = PostgresDBListener<CollabMemberNotification>;
// pub type CollabMemberListener = PostgresDBListener<CollabMemberNotification>;
// pub type WorkspaceMemberListener = PostgresDBListener<WorkspaceMemberNotification>;
pub type UserListener = PostgresDBListener<AFUserNotification>;
pub type WorkspaceMemberListener = PostgresDBListener<WorkspaceMemberNotification>;
26 changes: 1 addition & 25 deletions src/biz/pg_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,15 @@ use appflowy_collaborate::collab::notification::CollabMemberNotification;
use database::listener::PostgresDBListener;
use database::pg_row::AFUserNotification;
use sqlx::PgPool;
use tokio::sync::broadcast;

pub struct PgListeners {
user_listener: UserListener,
workspace_member_listener: WorkspaceMemberListener,
collab_member_listener: CollabMemberListener,
}

impl PgListeners {
pub async fn new(pg_pool: &PgPool) -> Result<Self, Error> {
let user_listener = UserListener::new(pg_pool, "af_user_channel").await?;

let workspace_member_listener =
WorkspaceMemberListener::new(pg_pool, "af_workspace_member_channel").await?;

let collab_member_listener =
CollabMemberListener::new(pg_pool, "af_collab_member_channel").await?;

Ok(Self {
user_listener,
workspace_member_listener,
collab_member_listener,
})
}

pub fn subscribe_workspace_member_change(
&self,
) -> broadcast::Receiver<WorkspaceMemberNotification> {
self.workspace_member_listener.notify.subscribe()
}

pub fn subscribe_collab_member_change(&self) -> broadcast::Receiver<CollabMemberNotification> {
self.collab_member_listener.notify.subscribe()
Ok(Self { user_listener })
}

pub fn subscribe_user_change(&self, uid: i64) -> tokio::sync::mpsc::Receiver<AFUserNotification> {
Expand Down

0 comments on commit ffee47d

Please sign in to comment.