Skip to content

Commit

Permalink
Handle conn pool initialization gracefully
Browse files Browse the repository at this point in the history
- Duplicated labels no longer allowed on config
- Endpoints get tested/connected to per provider label instead of per endpoint
  • Loading branch information
mangas committed Jul 24, 2023
1 parent ba3a680 commit 9757b0a
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 13 deletions.
30 changes: 20 additions & 10 deletions graph/src/firehose/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
blockchain::BlockPtr,
cheap_clone::CheapClone,
components::store::BlockNumber,
data::value::Word,
endpoint::{ConnectionType, EndpointMetrics, Provider, RequestLabels},
firehose::decode_firehose_block,
prelude::{anyhow, debug, info},
Expand All @@ -16,7 +17,13 @@ use futures03::StreamExt;
use http::uri::{Scheme, Uri};
use itertools::Itertools;
use slog::Logger;
use std::{collections::BTreeMap, fmt::Display, ops::ControlFlow, sync::Arc, time::Duration};
use std::{
collections::{BTreeMap, HashMap},
fmt::Display,
ops::ControlFlow,
sync::Arc,
time::Duration,
};
use tonic::codegen::InterceptedService;
use tonic::{
codegen::CompressionEncoding,
Expand Down Expand Up @@ -494,18 +501,21 @@ impl FirehoseNetworks {
}
}

/// Returns a `Vec` of tuples where the first element of the tuple is
/// the chain's id and the second one is an endpoint for this chain.
/// There can be mulitple tuple with the same chain id but with different
/// endpoint where multiple providers exist for a single chain id.
pub fn flatten(&self) -> Vec<(String, Arc<FirehoseEndpoint>)> {
/// Returns a `HashMap` where the key is the chain's id and the key is an endpoint for this chain.
/// There can be mulitple keys with the same chain id but with different
/// endpoint where multiple providers exist for a single chain id. Providers with the same
/// label do not need to be tested individually, if one is working, every other endpoint in the
/// pool should also work.
pub fn flatten(&self) -> HashMap<(String, Word), Arc<FirehoseEndpoint>> {
self.networks
.iter()
.flat_map(|(chain_id, firehose_endpoints)| {
firehose_endpoints
.0
.iter()
.map(move |endpoint| (chain_id.clone(), endpoint.clone()))
firehose_endpoints.0.iter().map(move |endpoint| {
(
(chain_id.clone(), endpoint.provider.clone()),
endpoint.clone(),
)
})
})
.collect()
}
Expand Down
2 changes: 1 addition & 1 deletion node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ where
.flatten()
.into_iter()
.map(|(chain_id, endpoint)| (chain_id, endpoint, logger.clone()))
.map(|(chain_id, endpoint, logger)| async move {
.map(|((chain_id, _), endpoint, logger)| async move {
let logger = logger.new(o!("provider" => endpoint.provider.to_string()));
info!(
logger, "Connecting to Firehose to get chain identifier";
Expand Down
62 changes: 60 additions & 2 deletions node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use graph::{
anyhow::Error,
blockchain::BlockchainKind,
firehose::{SubgraphLimit, SUBGRAPHS_PER_CONN},
itertools::Itertools,
prelude::{
anyhow::{anyhow, bail, Context, Result},
info,
Expand Down Expand Up @@ -518,8 +519,14 @@ fn default_blockchain_kind() -> BlockchainKind {

impl Chain {
fn validate(&mut self) -> Result<()> {
// `Config` validates that `self.shard` references a configured shard
let mut labels = self.providers.iter().map(|p| &p.label).collect_vec();
labels.sort();
labels.dedup();
if labels.len() != self.providers.len() {
return Err(anyhow!("Provider labels must be unique"));
}

// `Config` validates that `self.shard` references a configured shard
for provider in self.providers.iter_mut() {
provider.validate()?
}
Expand Down Expand Up @@ -1147,7 +1154,7 @@ where
#[cfg(test)]
mod tests {

use crate::config::Web3Rule;
use crate::config::{ChainSection, Web3Rule};

use super::{
Chain, Config, FirehoseProvider, Provider, ProviderDetails, Transport, Web3Provider,
Expand Down Expand Up @@ -1702,4 +1709,55 @@ mod tests {
assert!(SubgraphLimit::Unlimited > SubgraphLimit::Limit(10));
assert!(SubgraphLimit::Limit(10) > SubgraphLimit::Disabled);
}

#[test]
fn duplicated_labels_are_not_allowed_within_chain() {
let mut actual = toml::from_str::<ChainSection>(
r#"
ingestor = "block_ingestor_node"
[mainnet]
shard = "vip"
provider = [
{ label = "mainnet1", url = "http://127.0.0.1", features = [], headers = { Authorization = "Bearer foo" } },
{ label = "mainnet1", url = "http://127.0.0.1", features = [ "archive", "traces" ] }
]
"#,
)
.unwrap();

let err = actual.validate();
assert_eq!(true, err.is_err());
let err = err.unwrap_err();
assert_eq!(
true,
err.to_string().contains("unique"),
"result: {:?}",
err
);
}

#[test]
fn duplicated_labels_are_not_allowed() {
let mut actual = toml::from_str::<ChainSection>(
r#"
ingestor = "block_ingestor_node"
[mainnet]
shard = "vip"
provider = [
{ label = "mainnet1", url = "http://127.0.0.1", features = [], headers = { Authorization = "Bearer foo" } },
{ label = "mainnet2", url = "http://127.0.0.1", features = [ "archive", "traces" ] }
]
[mainnet2]
shard = "vip"
provider = [
{ label = "mainnet1", url = "http://127.0.0.1", features = [], headers = { Authorization = "Bearer foo" } },
{ label = "mainnet2", url = "http://127.0.0.1", features = [ "archive", "traces" ] }
]
"#,
)
.unwrap();

let result = actual.validate();
assert_eq!(true, result.is_ok(), "error: {:?}", result.unwrap_err());
}
}

0 comments on commit 9757b0a

Please sign in to comment.