Skip to content

Commit

Permalink
Fix #342: Async support for push_metrics behind feature push-async
Browse files Browse the repository at this point in the history
  • Loading branch information
adamchalmers committed Sep 18, 2020
1 parent b7be575 commit e60486e
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 9 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ gen = ["protobuf-codegen-pure"]
nightly = ["libc"]
process = ["libc", "procfs"]
push = ["reqwest", "libc", "protobuf"]
push-async = ["reqwest", "libc", "protobuf"]

[dependencies]
cfg-if = "^0.1"
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ This crate provides several optional components which can be enabled via [Cargo

- `process`: Enable [process metrics](https://prometheus.io/docs/instrumenting/writing_clientlibs/#process-metrics) support.

- `push`: Enable [push metrics](https://prometheus.io/docs/instrumenting/pushing/) support.
- `push`: Enable [push metrics](https://prometheus.io/docs/instrumenting/pushing/) support with blocking network calls. Incompatible with `push-async`.

- `push-async`: Enable [push metrics](https://prometheus.io/docs/instrumenting/pushing/) support with async/await. Incompatible with `push`.

### Static Metric

Expand Down
10 changes: 9 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ This library supports four features:
using the pre-generated client.
* `nightly`: Enable nightly only features.
* `process`: For collecting process info.
* `push`: Enable push support.
* `push`: Enable push support using blocking network calls. Incompatible with `push`.
* `push-async`: Enable push support using blocking network calls. Incompatible with `push-async`.
*/

Expand Down Expand Up @@ -163,6 +164,8 @@ mod histogram;
mod metrics;
#[cfg(feature = "push")]
mod push;
#[cfg(feature = "push-async")]
mod push;
mod registry;
mod value;
mod vec;
Expand Down Expand Up @@ -226,5 +229,10 @@ pub use self::push::{
hostname_grouping_key, push_add_collector, push_add_metrics, push_collector, push_metrics,
BasicAuthentication,
};
#[cfg(feature = "push-async")]
pub use self::push::{
hostname_grouping_key, push_add_collector_async, push_add_metrics_async, push_collector_async,
push_metrics_async, BasicAuthentication,
};
pub use self::registry::Registry;
pub use self::registry::{default_registry, gather, register, unregister};
140 changes: 133 additions & 7 deletions src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ lazy_static! {
.build()
.unwrap();
}
#[cfg(feature = "push-async")]
lazy_static! {
static ref ASYNC_HTTP_CLIENT: reqwest::Client = reqwest::Client::builder()
.timeout(REQWEST_TIMEOUT_SEC)
.build()
.unwrap();
}

/// `BasicAuthentication` holder for supporting `push` to Pushgateway endpoints
/// using Basic access authentication.
Expand All @@ -50,6 +57,7 @@ pub struct BasicAuthentication {
/// Note that all previously pushed metrics with the same job and other grouping
/// labels will be replaced with the metrics pushed by this call. (It uses HTTP
/// method 'PUT' to push to the Pushgateway.)
#[cfg(feature = "push")]
pub fn push_metrics<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
Expand All @@ -60,9 +68,24 @@ pub fn push_metrics<S: BuildHasher>(
push(job, grouping, url, mfs, "PUT", basic_auth)
}

/// Functions just like `push_metrics`, except the metrics are pushed
/// asynchronously.
/// Requires the feature `push-async`.
#[cfg(feature = "push-async")]
pub async fn push_metrics_async<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
push_async(job, grouping, url, mfs, "PUT", basic_auth).await
}

/// `push_add_metrics` works like `push_metrics`, but only previously pushed
/// metrics with the same name (and the same job and other grouping labels) will
/// be replaced. (It uses HTTP method 'POST' to push to the Pushgateway.)
#[cfg(feature = "push")]
pub fn push_add_metrics<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
Expand All @@ -73,16 +96,26 @@ pub fn push_add_metrics<S: BuildHasher>(
push(job, grouping, url, mfs, "POST", basic_auth)
}

/// `push_add_metrics_async` works like `push_metrics`, but async.
#[cfg(feature = "push-async")]
pub async fn push_add_metrics_async<'a, S: BuildHasher>(
job: &'a str,
grouping: HashMap<String, String, S>,
url: &'a str,
mfs: Vec<proto::MetricFamily>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
push(job, grouping, url, mfs, "POST", basic_auth)
}

const LABEL_NAME_JOB: &str = "job";

fn push<S: BuildHasher>(
fn configure_push<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
method: &str,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
) -> Result<(String, impl Encoder, Vec<u8>)> {
// Suppress clippy warning needless_pass_by_value.
let grouping = grouping;

Expand Down Expand Up @@ -145,7 +178,18 @@ fn push<S: BuildHasher>(
// Ignore error, `no metrics` and `no name`.
let _ = encoder.encode(&[mf], &mut buf);
}
Ok((push_url, encoder, buf))
}

fn push<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
method: &str,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
let (push_url, encoder, buf) = configure_push(job, grouping, url, mfs)?;
let mut builder = HTTP_CLIENT
.request(
Method::from_str(method).unwrap(),
Expand All @@ -159,18 +203,49 @@ fn push<S: BuildHasher>(
}

let response = builder.send().map_err(|e| Error::Msg(format!("{}", e)))?;
handle_push_response(response.status(), push_url)
}

match response.status() {
/// Requires the feature `push-async`.
#[cfg(feature = "push-async")]
async fn push_async<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
method: &str,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
let (push_url, encoder, buf) = configure_push(job, grouping, url, mfs)?;
let mut builder = ASYNC_HTTP_CLIENT
.request(
Method::from_str(method).unwrap(),
Url::from_str(&push_url).unwrap(),
)
.header(CONTENT_TYPE, encoder.format_type())
.body(buf);

if let Some(BasicAuthentication { username, password }) = basic_auth {
builder = builder.basic_auth(username, Some(password));
}

let response = builder.send().await.map_err(|e| Error::Msg(format!("{}", e)))?;
handle_push_response(response.status(), push_url)
}

fn handle_push_response(status: StatusCode, push_url: String) -> Result<()> {
match status {
StatusCode::ACCEPTED => Ok(()),
StatusCode::OK => Ok(()),
_ => Err(Error::Msg(format!(
"unexpected status code {} while pushing to {}",
response.status(),
status,
push_url
))),
}
}

#[cfg(feature = "push")]
fn push_from_collector<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
Expand All @@ -188,8 +263,28 @@ fn push_from_collector<S: BuildHasher>(
push(job, grouping, url, mfs, method, basic_auth)
}

/// Requires the feature `push-async`.
#[cfg(feature = "push-async")]
async fn push_from_collector_async<'a, S: BuildHasher>(
job: &'a str,
grouping: HashMap<String, String, S>,
url: &'a str,
collectors: Vec<Box<dyn Collector>>,
method: &'a str,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
let registry = Registry::new();
for bc in collectors {
registry.register(bc)?;
}

let mfs = registry.gather();
push_async(job, grouping, url, mfs, method, basic_auth).await
}

/// `push_collector` push metrics collected from the provided collectors. It is
/// a convenient way to push only a few metrics.
#[cfg(feature = "push")]
pub fn push_collector<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
Expand All @@ -200,8 +295,24 @@ pub fn push_collector<S: BuildHasher>(
push_from_collector(job, grouping, url, collectors, "PUT", basic_auth)
}

/// `push_add_collector` works like `push_add_metrics`, it collects from the
/// `push_collector_async` is just an async version of `push_collector`.
/// Pushes metrics collected from the provided collectors. It is
/// a convenient way to push only a few metrics.
/// Requires the feature `push-async`.
#[cfg(feature = "push-async")]
pub async fn push_collector_async<'a, S: BuildHasher>(
job: &'a str,
grouping: HashMap<String, String, S>,
url: &'a str,
collectors: Vec<Box<dyn Collector>>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
push_from_collector_async(job, grouping, url, collectors, "PUT", basic_auth).await
}

/// `push_add_collector` works like `push_add_collector`, it collects from the
/// provided collectors. It is a convenient way to push only a few metrics.
#[cfg(feature = "push")]
pub fn push_add_collector<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
Expand All @@ -212,6 +323,21 @@ pub fn push_add_collector<S: BuildHasher>(
push_from_collector(job, grouping, url, collectors, "POST", basic_auth)
}

/// `push_add_collector_async` works like `push_add_collector`, but async.
/// It collects from the provided collectors. It is a convenient way to push
/// only a few metrics.
/// Requires the feature `push-async`.
#[cfg(feature = "push-async")]
pub async fn push_add_collector_async<'a, S: BuildHasher>(
job: &'a str,
grouping: HashMap<String, String, S>,
url: &'a str,
collectors: Vec<Box<dyn Collector>>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
push_from_collector_async(job, grouping, url, collectors, "POST", basic_auth).await
}

const DEFAULT_GROUP_LABEL_PAIR: (&str, &str) = ("instance", "unknown");

/// `hostname_grouping_key` returns a label map with the only entry
Expand Down

0 comments on commit e60486e

Please sign in to comment.