Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Telemetry improvements (#1886)
Browse files Browse the repository at this point in the history
* Fix typo

* Support multiple telemetry endpoints and verbosity levels

* Bump substrate-telemetry version

* Telemetrify Aura consensus

* Telemetrify Grandpa

* Fix CI version conflicts

* Implement style remarks

* Fix fixture

* Implement style remarks

* Clone only when necessary

* Get rid of Arc for URL

* Handle connection issues better
  • Loading branch information
cmichi authored and bkchr committed Feb 28, 2019
1 parent 6f4c995 commit f429a0c
Show file tree
Hide file tree
Showing 18 changed files with 287 additions and 68 deletions.
14 changes: 10 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions core/cli/src/informant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use tokio::timer::Interval;
use sysinfo::{get_current_pid, ProcessExt, System, SystemExt};
use network::{SyncState, SyncProvider};
use client::{backend::Backend, BlockchainEvents};
use substrate_telemetry::telemetry;
use substrate_telemetry::*;
use log::{debug, info, warn};

use runtime_primitives::generic::BlockId;
Expand Down Expand Up @@ -86,6 +86,7 @@ pub fn start<C>(service: &Service<C>, exit: ::exit_future::Exit, handle: TaskExe
} else { (0.0, 0) };

telemetry!(
SUBSTRATE_INFO;
"system.interval";
"status" => format!("{}{}", status, target),
"peers" => num_peers,
Expand Down Expand Up @@ -144,7 +145,7 @@ pub fn start<C>(service: &Service<C>, exit: ::exit_future::Exit, handle: TaskExe
let txpool = service.transaction_pool();
let display_txpool_import = txpool.import_notification_stream().for_each(move |_| {
let status = txpool.status();
telemetry!("txpool.import"; "ready" => status.ready, "future" => status.future);
telemetry!(SUBSTRATE_INFO; "txpool.import"; "ready" => status.ready, "future" => status.future);
Ok(())
});

Expand Down
7 changes: 4 additions & 3 deletions core/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use log::info;
use lazy_static::lazy_static;

use futures::Future;
use substrate_telemetry::TelemetryEndpoints;

const MAX_NODE_NAME_LENGTH: usize = 32;

Expand Down Expand Up @@ -401,9 +402,9 @@ where

// Override telemetry
if cli.no_telemetry {
config.telemetry_url = None;
} else if let Some(url) = cli.telemetry_url {
config.telemetry_url = Some(url);
config.telemetry_endpoints = None;
} else if !cli.telemetry_endpoints.is_empty() {
config.telemetry_endpoints = Some(TelemetryEndpoints::new(cli.telemetry_endpoints));
}

Ok(config)
Expand Down
23 changes: 20 additions & 3 deletions core/cli/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,11 @@ pub struct RunCmd {
#[structopt(long = "no-telemetry")]
pub no_telemetry: bool,

/// The URL of the telemetry server to connect to
#[structopt(long = "telemetry-url", value_name = "TELEMETRY_URL")]
pub telemetry_url: Option<String>,
/// The URL of the telemetry server to connect to. This flag can be passed multiple times
/// as a mean to specify multiple telemetry endpoints. Verbosity levels range from 0-9, with
/// 0 denoting the least verbosity. If no verbosity level is specified the default is 0.
#[structopt(long = "telemetry-url", value_name = "URL VERBOSITY", parse(try_from_str = "parse_telemetry_endpoints"))]
pub telemetry_endpoints: Vec<(String, u8)>,

/// The means of execution used when calling into the runtime while syncing blocks.
#[structopt(
Expand Down Expand Up @@ -239,6 +241,21 @@ pub struct RunCmd {
pub pool_config: TransactionPoolParams,
}

/// Default to verbosity level 0, if none is provided.
fn parse_telemetry_endpoints(s: &str) -> Result<(String, u8), Box<std::error::Error>> {
let pos = s.find(' ');
match pos {
None => {
Ok((s.to_owned(), 0))
},
Some(pos_) => {
let verbosity = s[pos_ + 1..].parse()?;
let url = s[..pos_].parse()?;
Ok((url, verbosity))
}
}
}

impl_augment_clap!(RunCmd);
impl_get_log_filter!(RunCmd);

Expand Down
6 changes: 3 additions & 3 deletions core/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use crate::in_mem;
use crate::block_builder::{self, api::BlockBuilder as BlockBuilderAPI};
use crate::genesis;
use consensus;
use substrate_telemetry::telemetry;
use substrate_telemetry::*;

use log::{info, trace, warn};
use error_chain::bail;
Expand Down Expand Up @@ -729,7 +729,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
fork_choice,
);

telemetry!("block.import";
telemetry!(SUBSTRATE_INFO; "block.import";
"height" => height,
"best" => ?hash,
"origin" => ?origin
Expand Down Expand Up @@ -859,7 +859,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
warn!(" Header {:?}", header);
warn!(" Native result {:?}", native_result);
warn!(" Wasm result {:?}", wasm_result);
telemetry!("block.execute.consensus_failure";
telemetry!(SUBSTRATE_INFO; "block.execute.consensus_failure";
"hash" => ?hash,
"origin" => ?origin,
"header" => ?header
Expand Down
1 change: 1 addition & 0 deletions core/consensus/aura/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ aura_primitives = { package = "substrate-consensus-aura-primitives", path = "pri
inherents = { package = "substrate-inherents", path = "../../inherents" }
srml-consensus = { path = "../../../srml/consensus" }
srml-aura = { path = "../../../srml/aura" }
substrate-telemetry = { path = "../../telemetry" }
futures = "0.1.17"
tokio = "0.1.7"
parking_lot = "0.7.1"
Expand Down
34 changes: 33 additions & 1 deletion core/consensus/aura/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use srml_aura::{
InherentType as AuraInherent, AuraInherentData,
timestamp::{TimestampInherentData, InherentType as TimestampInherent, InherentError as TIError}
};
use substrate_telemetry::*;

use aura_slots::{CheckedHeader, SlotWorker, SlotInfo, SlotCompatible};

Expand Down Expand Up @@ -265,12 +266,18 @@ impl<B: Block, C, E, I, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, SO> whe
chain_head.hash(),
e
);
telemetry!(CONSENSUS_WARN; "aura.unable_fetching_authorities";
"slot" => ?chain_head.hash(), "err" => ?e
);
return Box::new(future::ok(()));
}
};

if self.sync_oracle.is_offline() && authorities.len() > 1 {
debug!(target: "aura", "Skipping proposal slot. Waiting for the netork.");
debug!(target: "aura", "Skipping proposal slot. Waiting for the network.");
telemetry!(CONSENSUS_DEBUG; "aura.skipping_proposal_slot";
"authorities_len" => authorities.len()
);
return Box::new(future::ok(()));
}

Expand All @@ -282,12 +289,18 @@ impl<B: Block, C, E, I, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, SO> whe
slot_num,
timestamp
);
telemetry!(CONSENSUS_DEBUG; "aura.starting_authorship";
"slot_num" => slot_num, "timestamp" => timestamp
);

// we are the slot author. make a block and sign it.
let proposer = match env.init(&chain_head, &authorities) {
Ok(p) => p,
Err(e) => {
warn!("Unable to author block in slot {:?}: {:?}", slot_num, e);
telemetry!(CONSENSUS_WARN; "aura.unable_authoring_block";
"slot" => slot_num, "err" => ?e
);
return Box::new(future::ok(()))
}
};
Expand Down Expand Up @@ -315,6 +328,9 @@ impl<B: Block, C, E, I, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, SO> whe
"Discarding proposal for slot {}; block production took too long",
slot_num
);
telemetry!(CONSENSUS_INFO; "aura.discarding_proposal_took_too_long";
"slot" => slot_num
);
return
}

Expand Down Expand Up @@ -348,10 +364,18 @@ impl<B: Block, C, E, I, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, SO> whe
import_block.post_header().hash(),
pre_hash
);
telemetry!(CONSENSUS_INFO; "aura.pre_sealed_block";
"header_num" => ?header_num,
"hash_now" => ?import_block.post_header().hash(),
"hash_previously" => ?pre_hash
);

if let Err(e) = block_import.import_block(import_block, None) {
warn!(target: "aura", "Error with block built on {:?}: {:?}",
parent_hash, e);
telemetry!(CONSENSUS_WARN; "aura.err_with_block_built_on";
"hash" => ?parent_hash, "err" => ?e
);
}
})
.map_err(|e| consensus_common::ErrorKind::ClientImport(format!("{:?}", e)).into())
Expand Down Expand Up @@ -456,6 +480,9 @@ impl<C, E> AuraVerifier<C, E>
"halting for block {} seconds in the future",
diff
);
telemetry!(CONSENSUS_INFO; "aura.halting_for_future_block";
"diff" => ?diff
);
thread::sleep(Duration::from_secs(diff));
Ok(())
},
Expand Down Expand Up @@ -504,6 +531,7 @@ impl<C, E> AuraVerifier<C, E>
"halting for block {} seconds in the future",
diff
);
telemetry!(CONSENSUS_INFO; "aura.halting_for_future_block"; "diff" => ?diff);
thread::sleep(Duration::from_secs(diff));
Ok(())
},
Expand Down Expand Up @@ -589,6 +617,7 @@ impl<B: Block, C, E> Verifier<B> for AuraVerifier<C, E> where
}

trace!(target: "aura", "Checked {:?}; importing.", pre_header);
telemetry!(CONSENSUS_TRACE; "aura.checked_and_importing"; "pre_header" => ?pre_header);

extra_verification.into_future().wait()?;

Expand All @@ -608,6 +637,9 @@ impl<B: Block, C, E> Verifier<B> for AuraVerifier<C, E> where
}
CheckedHeader::Deferred(a, b) => {
debug!(target: "aura", "Checking {:?} failed; {:?}, {:?}.", hash, a, b);
telemetry!(CONSENSUS_DEBUG; "aura.header_too_far_in_future";
"hash" => ?hash, "a" => ?a, "b" => ?b
);
Err(format!("Header {:?} rejected: too far in the future", hash))
}
}
Expand Down
1 change: 1 addition & 0 deletions core/finality-grandpa/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ parity-codec-derive = "3.0"
runtime_primitives = { package = "sr-primitives", path = "../sr-primitives" }
consensus_common = { package = "substrate-consensus-common", path = "../consensus/common" }
substrate-primitives = { path = "../primitives" }
substrate-telemetry = { path = "../telemetry" }
client = { package = "substrate-client", path = "../client" }
network = { package = "substrate-network", path = "../network" }
service = { package = "substrate-service", path = "../service", optional = true }
Expand Down
Loading

0 comments on commit f429a0c

Please sign in to comment.