diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index 7354e83de51..a6c3276a537 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -19,8 +19,8 @@ use graph_node::{ MetricsContext, }; use graph_store_postgres::{ - connection_pool::ConnectionPool, BlockStore, Shard, Store, SubgraphStore, SubscriptionManager, - PRIMARY_SHARD, + connection_pool::ConnectionPool, BlockStore, NotificationSender, Shard, Store, SubgraphStore, + SubscriptionManager, PRIMARY_SHARD, }; use graph_node::config::{self, Config as Cfg}; @@ -514,6 +514,10 @@ impl Context { self.node_id.clone() } + fn notification_sender(&self) -> Arc { + Arc::new(NotificationSender::new(self.registry.clone())) + } + fn primary_pool(self) -> ConnectionPool { let primary = self.config.primary_store(); let pool = StoreBuilder::main_pool( @@ -750,10 +754,12 @@ async fn main() { Remove { name } => commands::remove::run(ctx.subgraph_store(), name), Create { name } => commands::create::run(ctx.subgraph_store(), name), Unassign { deployment } => { - commands::assign::unassign(ctx.primary_pool(), &deployment).await + let sender = ctx.notification_sender(); + commands::assign::unassign(ctx.primary_pool(), &sender, &deployment).await } Reassign { deployment, node } => { - commands::assign::reassign(ctx.primary_pool(), &deployment, node) + let sender = ctx.notification_sender(); + commands::assign::reassign(ctx.primary_pool(), &sender, &deployment, node) } Rewind { force, diff --git a/node/src/manager/commands/assign.rs b/node/src/manager/commands/assign.rs index 26baccd1466..aa045a1357f 100644 --- a/node/src/manager/commands/assign.rs +++ b/node/src/manager/commands/assign.rs @@ -1,9 +1,15 @@ -use graph::prelude::{anyhow::anyhow, Error, NodeId}; -use graph_store_postgres::{command_support::catalog, connection_pool::ConnectionPool}; +use graph::prelude::{anyhow::anyhow, Error, NodeId, StoreEvent}; +use graph_store_postgres::{ + command_support::catalog, connection_pool::ConnectionPool, NotificationSender, +}; use crate::manager::deployment::DeploymentSearch; -pub async fn unassign(primary: ConnectionPool, search: &DeploymentSearch) -> Result<(), Error> { +pub async fn unassign( + primary: ConnectionPool, + sender: &NotificationSender, + search: &DeploymentSearch, +) -> Result<(), Error> { let locator = search.locate_unique(&primary)?; let conn = primary.get()?; @@ -14,13 +20,15 @@ pub async fn unassign(primary: ConnectionPool, search: &DeploymentSearch) -> Res .ok_or_else(|| anyhow!("failed to locate site for {locator}"))?; println!("unassigning {locator}"); - conn.unassign_subgraph(&site)?; + let changes = conn.unassign_subgraph(&site)?; + conn.send_store_event(sender, &StoreEvent::new(changes))?; Ok(()) } pub fn reassign( primary: ConnectionPool, + sender: &NotificationSender, search: &DeploymentSearch, node: String, ) -> Result<(), Error> { @@ -33,20 +41,22 @@ pub fn reassign( let site = conn .locate_site(locator.clone())? .ok_or_else(|| anyhow!("failed to locate site for {locator}"))?; - match conn.assigned_node(&site)? { + let changes = match conn.assigned_node(&site)? { Some(cur) => { if cur == node { println!("deployment {locator} is already assigned to {cur}"); + vec![] } else { println!("reassigning {locator} to {node} (was {cur})"); - conn.reassign_subgraph(&site, &node)?; + conn.reassign_subgraph(&site, &node)? } } None => { println!("assigning {locator} to {node}"); - conn.assign_subgraph(&site, &node)?; + conn.assign_subgraph(&site, &node)? } - } + }; + conn.send_store_event(sender, &StoreEvent::new(changes))?; Ok(()) }