Skip to content

Commit

Permalink
node: Make sure that graphman unassign/reassign send store events
Browse files Browse the repository at this point in the history
Without them, it's possible that subgraphs get indexed by two nodes.
  • Loading branch information
lutter committed Apr 19, 2022
1 parent 13879b5 commit 5fa34f8
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 12 deletions.
14 changes: 10 additions & 4 deletions node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use graph_node::{
MetricsContext,
};
use graph_store_postgres::{
connection_pool::ConnectionPool, BlockStore, Shard, Store, SubgraphStore, SubscriptionManager,
PRIMARY_SHARD,
connection_pool::ConnectionPool, BlockStore, NotificationSender, Shard, Store, SubgraphStore,
SubscriptionManager, PRIMARY_SHARD,
};

use graph_node::config::{self, Config as Cfg};
Expand Down Expand Up @@ -514,6 +514,10 @@ impl Context {
self.node_id.clone()
}

fn notification_sender(&self) -> Arc<NotificationSender> {
Arc::new(NotificationSender::new(self.registry.clone()))
}

fn primary_pool(self) -> ConnectionPool {
let primary = self.config.primary_store();
let pool = StoreBuilder::main_pool(
Expand Down Expand Up @@ -750,10 +754,12 @@ async fn main() {
Remove { name } => commands::remove::run(ctx.subgraph_store(), name),
Create { name } => commands::create::run(ctx.subgraph_store(), name),
Unassign { deployment } => {
commands::assign::unassign(ctx.primary_pool(), &deployment).await
let sender = ctx.notification_sender();
commands::assign::unassign(ctx.primary_pool(), &sender, &deployment).await
}
Reassign { deployment, node } => {
commands::assign::reassign(ctx.primary_pool(), &deployment, node)
let sender = ctx.notification_sender();
commands::assign::reassign(ctx.primary_pool(), &sender, &deployment, node)
}
Rewind {
force,
Expand Down
26 changes: 18 additions & 8 deletions node/src/manager/commands/assign.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use graph::prelude::{anyhow::anyhow, Error, NodeId};
use graph_store_postgres::{command_support::catalog, connection_pool::ConnectionPool};
use graph::prelude::{anyhow::anyhow, Error, NodeId, StoreEvent};
use graph_store_postgres::{
command_support::catalog, connection_pool::ConnectionPool, NotificationSender,
};

use crate::manager::deployment::DeploymentSearch;

pub async fn unassign(primary: ConnectionPool, search: &DeploymentSearch) -> Result<(), Error> {
pub async fn unassign(
primary: ConnectionPool,
sender: &NotificationSender,
search: &DeploymentSearch,
) -> Result<(), Error> {
let locator = search.locate_unique(&primary)?;

let conn = primary.get()?;
Expand All @@ -14,13 +20,15 @@ pub async fn unassign(primary: ConnectionPool, search: &DeploymentSearch) -> Res
.ok_or_else(|| anyhow!("failed to locate site for {locator}"))?;

println!("unassigning {locator}");
conn.unassign_subgraph(&site)?;
let changes = conn.unassign_subgraph(&site)?;
conn.send_store_event(sender, &StoreEvent::new(changes))?;

Ok(())
}

pub fn reassign(
primary: ConnectionPool,
sender: &NotificationSender,
search: &DeploymentSearch,
node: String,
) -> Result<(), Error> {
Expand All @@ -33,20 +41,22 @@ pub fn reassign(
let site = conn
.locate_site(locator.clone())?
.ok_or_else(|| anyhow!("failed to locate site for {locator}"))?;
match conn.assigned_node(&site)? {
let changes = match conn.assigned_node(&site)? {
Some(cur) => {
if cur == node {
println!("deployment {locator} is already assigned to {cur}");
vec![]
} else {
println!("reassigning {locator} to {node} (was {cur})");
conn.reassign_subgraph(&site, &node)?;
conn.reassign_subgraph(&site, &node)?
}
}
None => {
println!("assigning {locator} to {node}");
conn.assign_subgraph(&site, &node)?;
conn.assign_subgraph(&site, &node)?
}
}
};
conn.send_store_event(sender, &StoreEvent::new(changes))?;

Ok(())
}

0 comments on commit 5fa34f8

Please sign in to comment.