diff --git a/graph/src/firehose/endpoints.rs b/graph/src/firehose/endpoints.rs index 014ac483179..1a935b1956c 100644 --- a/graph/src/firehose/endpoints.rs +++ b/graph/src/firehose/endpoints.rs @@ -4,6 +4,7 @@ use crate::{ blockchain::BlockPtr, cheap_clone::CheapClone, components::store::BlockNumber, + data::value::Word, endpoint::{ConnectionType, EndpointMetrics, Provider, RequestLabels}, firehose::decode_firehose_block, prelude::{anyhow, debug, info}, @@ -16,7 +17,13 @@ use futures03::StreamExt; use http::uri::{Scheme, Uri}; use itertools::Itertools; use slog::Logger; -use std::{collections::BTreeMap, fmt::Display, ops::ControlFlow, sync::Arc, time::Duration}; +use std::{ + collections::{BTreeMap, HashMap}, + fmt::Display, + ops::ControlFlow, + sync::Arc, + time::Duration, +}; use tonic::codegen::InterceptedService; use tonic::{ codegen::CompressionEncoding, @@ -494,18 +501,21 @@ impl FirehoseNetworks { } } - /// Returns a `Vec` of tuples where the first element of the tuple is - /// the chain's id and the second one is an endpoint for this chain. - /// There can be mulitple tuple with the same chain id but with different - /// endpoint where multiple providers exist for a single chain id. - pub fn flatten(&self) -> Vec<(String, Arc)> { + /// Returns a `HashMap` where the key is the chain's id and the key is an endpoint for this chain. + /// There can be mulitple keys with the same chain id but with different + /// endpoint where multiple providers exist for a single chain id. Providers with the same + /// label do not need to be tested individually, if one is working, every other endpoint in the + /// pool should also work. + pub fn flatten(&self) -> HashMap<(String, Word), Arc> { self.networks .iter() .flat_map(|(chain_id, firehose_endpoints)| { - firehose_endpoints - .0 - .iter() - .map(move |endpoint| (chain_id.clone(), endpoint.clone())) + firehose_endpoints.0.iter().map(move |endpoint| { + ( + (chain_id.clone(), endpoint.provider.clone()), + endpoint.clone(), + ) + }) }) .collect() } diff --git a/node/src/chain.rs b/node/src/chain.rs index 105669a7fe5..6c9946bb66b 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -324,7 +324,7 @@ where .flatten() .into_iter() .map(|(chain_id, endpoint)| (chain_id, endpoint, logger.clone())) - .map(|(chain_id, endpoint, logger)| async move { + .map(|((chain_id, _), endpoint, logger)| async move { let logger = logger.new(o!("provider" => endpoint.provider.to_string())); info!( logger, "Connecting to Firehose to get chain identifier"; diff --git a/node/src/config.rs b/node/src/config.rs index f23ad6fe843..6007a289ca5 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -2,6 +2,7 @@ use graph::{ anyhow::Error, blockchain::BlockchainKind, firehose::{SubgraphLimit, SUBGRAPHS_PER_CONN}, + itertools::Itertools, prelude::{ anyhow::{anyhow, bail, Context, Result}, info, @@ -518,8 +519,14 @@ fn default_blockchain_kind() -> BlockchainKind { impl Chain { fn validate(&mut self) -> Result<()> { - // `Config` validates that `self.shard` references a configured shard + let mut labels = self.providers.iter().map(|p| &p.label).collect_vec(); + labels.sort(); + labels.dedup(); + if labels.len() != self.providers.len() { + return Err(anyhow!("Provider labels must be unique")); + } + // `Config` validates that `self.shard` references a configured shard for provider in self.providers.iter_mut() { provider.validate()? } @@ -1147,7 +1154,7 @@ where #[cfg(test)] mod tests { - use crate::config::Web3Rule; + use crate::config::{ChainSection, Web3Rule}; use super::{ Chain, Config, FirehoseProvider, Provider, ProviderDetails, Transport, Web3Provider, @@ -1702,4 +1709,55 @@ mod tests { assert!(SubgraphLimit::Unlimited > SubgraphLimit::Limit(10)); assert!(SubgraphLimit::Limit(10) > SubgraphLimit::Disabled); } + + #[test] + fn duplicated_labels_are_not_allowed_within_chain() { + let mut actual = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + [mainnet] + shard = "vip" + provider = [ + { label = "mainnet1", url = "http://127.0.0.1", features = [], headers = { Authorization = "Bearer foo" } }, + { label = "mainnet1", url = "http://127.0.0.1", features = [ "archive", "traces" ] } + ] + "#, + ) + .unwrap(); + + let err = actual.validate(); + assert_eq!(true, err.is_err()); + let err = err.unwrap_err(); + assert_eq!( + true, + err.to_string().contains("unique"), + "result: {:?}", + err + ); + } + + #[test] + fn duplicated_labels_are_not_allowed() { + let mut actual = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + [mainnet] + shard = "vip" + provider = [ + { label = "mainnet1", url = "http://127.0.0.1", features = [], headers = { Authorization = "Bearer foo" } }, + { label = "mainnet2", url = "http://127.0.0.1", features = [ "archive", "traces" ] } + ] + [mainnet2] + shard = "vip" + provider = [ + { label = "mainnet1", url = "http://127.0.0.1", features = [], headers = { Authorization = "Bearer foo" } }, + { label = "mainnet2", url = "http://127.0.0.1", features = [ "archive", "traces" ] } + ] + "#, + ) + .unwrap(); + + let result = actual.validate(); + assert_eq!(true, result.is_ok(), "error: {:?}", result.unwrap_err()); + } }