Skip to content

Commit

Permalink
Change how pipelines are created
Browse files Browse the repository at this point in the history
Also introduces base ClientOptions, which all our other SDKs have.
  • Loading branch information
heaths committed Jun 4, 2021
1 parent 4075f13 commit a6e592d
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 80 deletions.
12 changes: 12 additions & 0 deletions sdk/core/src/client_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use crate::policies::{Policy, TelemetryOptions};
use std::sync::Arc;

/// Options passed clients to customer policies, telemetry, etc.
#[derive(Clone, Debug, Default)]
pub struct ClientOptions {
// TODO: Expose retry options and transport overrides.
pub per_call_policies: Vec<Arc<dyn Policy>>,
pub per_retry_policies: Vec<Arc<dyn Policy>>,

pub telemetry: TelemetryOptions,
}
2 changes: 2 additions & 0 deletions sdk/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ extern crate serde_derive;
mod macros;

mod bytes_stream;
pub mod client_options;
mod constants;
mod context;
mod errors;
Expand All @@ -35,6 +36,7 @@ use std::fmt::Debug;
use uuid::Uuid;

pub use bytes_stream::*;
pub use client_options::ClientOptions;
pub use constants::*;
pub use context::Context;
pub use errors::*;
Expand Down
29 changes: 20 additions & 9 deletions sdk/core/src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
use crate::policies::{Policy, PolicyResult, TelemetryPolicy};
use crate::{Context, Request, Response};
use crate::{ClientOptions, Context, Request, Response};
use std::sync::Arc;

/// Execution pipeline.
///
/// A pipeline follows a precise flow:
///
/// 1. Per call policies are executed. Per call policies can fail and bail out of the pipeline
/// 1. Client library-specified per-call policies are executed. Per-call policies can fail and bail out of the pipeline
/// immediately.
/// 2. Retry policy. It allows to re-execute the following policies.
/// 3. Per retry policies. Per retry polices are always executed at least once but are re-executed
/// 2. User-specified per-call policies are executed.
/// 3. Telemetry policy.
/// 4. Retry policy. It allows to re-execute the following policies.
/// 5. Client library-specified per-retry policies. Per-retry polices are always executed at least once but are re-executed
/// in case of retries.
/// 4. Transport policy. Transport policy is always the last policy and is the policy that
/// 6. User-specified per-retry policies are executed.
/// 7. Transport policy. Transport policy is always the last policy and is the policy that
/// actually constructs the `Response` to be passed up the pipeline.
///
/// A pipeline is immutable. In other words a policy can either succeed and call the following
Expand All @@ -24,21 +27,29 @@ pub struct Pipeline {
}

impl Pipeline {
/// Creates a new pipeline given the client library crate name and version,
/// alone with user-specified and client library-specified policies.
///
/// Crates can simply pass `option_env!("CARGO_PKG_NAME")` and `option_env!("CARGO_PKG_VERSION")` for the
/// `crate_name` and `crate_version` arguments respectively.
pub fn new(
crate_name: Option<&'static str>,
crate_version: Option<&'static str>,
options: &ClientOptions,
per_call_policies: Vec<Arc<dyn Policy>>,
retry: Arc<dyn Policy>,
per_retry_policies: Vec<Arc<dyn Policy>>,
transport_policy: Arc<dyn Policy>,
) -> Self {
let mut pipeline: Vec<Arc<dyn Policy>> =
Vec::with_capacity(per_call_policies.len() + per_retry_policies.len() + 3);

// TODO: Create pipeline from ClientOptions which should contain user-specified policies + client-added policies.
pipeline.push(Arc::new(TelemetryPolicy::default()));
Vec::with_capacity(options.per_call_policies.len() + per_call_policies.len() + options.per_retry_policies.len() + per_retry_policies.len() + 3);

pipeline.extend_from_slice(&per_call_policies);
pipeline.extend_from_slice(&options.per_call_policies);
pipeline.push(Arc::new(TelemetryPolicy::new(crate_name, crate_version, &options.telemetry)));
pipeline.push(retry);
pipeline.extend_from_slice(&per_retry_policies);
pipeline.extend_from_slice(&options.per_retry_policies);
pipeline.push(transport_policy);

Self { pipeline }
Expand Down
127 changes: 56 additions & 71 deletions sdk/core/src/policies/telemetry_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,52 @@ use std::sync::Arc;

#[derive(Clone, Debug, Default)]
pub struct TelemetryOptions {
application_id: Option<String>,
}

impl TelemetryOptions {
pub fn new(application_id: Option<String>) -> Self {
Self { application_id }
}
pub application_id: Option<String>,
}

#[derive(Clone, Debug)]
pub struct TelemetryPolicy {
header: String,
}

impl TelemetryPolicy {
pub fn new(options: TelemetryOptions) -> Self {
Self::with_environment::<Env>(options)
/// Sets the User-Agent header with useful information in a typical format for Azure SDKs.
///
/// Client libraries should create a `TelemetryPolicy` using `option_env!()` like so:
/// ```
/// use azure_core::policies::{TelemetryOptions, TelemetryPolicy};
/// let policy = TelemetryPolicy::new(option_env!("CARGO_PKG_NAME"), option_env!("CARGO_PKG_VERSION"), &TelemetryOptions::default());
/// ```
impl<'a> TelemetryPolicy {
pub fn new(
crate_name: Option<&'a str>,
crate_version: Option<&'a str>,
options: &TelemetryOptions,
) -> Self {
Self::new_with_rustc_version(
crate_name,
crate_version,
option_env!("AZSDK_RUSTC_VERSION"),
options,
)
}

fn with_environment<T: Environment>(options: TelemetryOptions) -> Self {
fn new_with_rustc_version(
crate_name: Option<&'a str>,
crate_version: Option<&'a str>,
rustc_version: Option<&'static str>,
options: &TelemetryOptions,
) -> Self {
const UNKNOWN: &'static str = "unknown";
let crate_name = T::crate_name().unwrap_or(UNKNOWN);
let crate_version = T::crate_version().unwrap_or(UNKNOWN);
let rustc_version = T::rustc_version().unwrap_or(UNKNOWN);
let mut crate_name = crate_name.unwrap_or(UNKNOWN);
let crate_version = crate_version.unwrap_or(UNKNOWN);
let rustc_version = rustc_version.unwrap_or(UNKNOWN);
let platform_info = format!("({}; {}; {})", rustc_version, OS, ARCH,);
let header = match options.application_id {

if let Some(name) = crate_name.strip_prefix("azure_") {
crate_name = name;
}

let header = match &options.application_id {
Some(application_id) => format!(
"{} azsdk-rust-{}/{} {}",
application_id, crate_name, crate_version, platform_info
Expand All @@ -47,29 +67,6 @@ impl TelemetryPolicy {
}
}

impl Default for TelemetryPolicy {
fn default() -> Self {
TelemetryPolicy::new(TelemetryOptions::default())
}
}

trait Environment {
fn crate_name() -> Option<&'static str> {
option_env!("CARGO_PKG_NAME")
}

fn crate_version() -> Option<&'static str> {
option_env!("CARGO_PKG_VERSION")
}

fn rustc_version() -> Option<&'static str> {
option_env!("AZSDK_RUSTC_VERSION")
}
}

struct Env;
impl Environment for Env {}

#[async_trait::async_trait]
impl Policy for TelemetryPolicy {
async fn send(
Expand All @@ -90,54 +87,42 @@ impl Policy for TelemetryPolicy {
mod test {
use super::*;

// tests assume cargo + rustc
const CRATE_NAME: &'static str = env!("CARGO_PKG_NAME");
const CRATE_VERSION: &'static str = env!("CARGO_PKG_VERSION");
const RUSTC_VERSION: &'static str = env!("AZSDK_RUSTC_VERSION");

struct EmptyEnv;
impl Environment for EmptyEnv {
fn crate_name() -> Option<&'static str> {
None
}

fn crate_version() -> Option<&'static str> {
None
}

fn rustc_version() -> Option<&'static str> {
None
}
}

#[test]
fn test_default() {
let policy = TelemetryPolicy::default();
fn test_without_application_id() {
let policy = TelemetryPolicy::new_with_rustc_version(
Some("azure_test"), // Tests that "azure_" is removed.
Some("1.2.3"),
Some("4.5.6"),
&TelemetryOptions::default(),
);
assert_eq!(
policy.header,
format!(
"azsdk-rust-{}/{} ({}; {}; {})",
CRATE_NAME, CRATE_VERSION, RUSTC_VERSION, OS, ARCH
)
format!("azsdk-rust-test/1.2.3 (4.5.6; {}; {})", OS, ARCH)
);
}

#[test]
fn test_with_application_id() {
let options = TelemetryOptions::new(Some("test".to_string()));
let policy = TelemetryPolicy::new(options);
let options = TelemetryOptions {
application_id: Some("my_app".to_string()),
};
let policy = TelemetryPolicy::new_with_rustc_version(
Some("test"),
Some("1.2.3"),
Some("4.5.6"),
&options,
);
assert_eq!(
policy.header,
format!(
"test azsdk-rust-{}/{} ({}; {}; {})",
CRATE_NAME, CRATE_VERSION, RUSTC_VERSION, OS, ARCH
)
format!("my_app azsdk-rust-test/1.2.3 (4.5.6; {}; {})", OS, ARCH)
);
}

#[test]
fn test_missing_env() {
let policy = TelemetryPolicy::with_environment::<EmptyEnv>(TelemetryOptions::default());
// Would simulate if option_env!("CARGO_PKG_NAME"), for example, returned None.
let policy =
TelemetryPolicy::new_with_rustc_version(None, None, None, &TelemetryOptions::default());
assert_eq!(
policy.header,
format!("azsdk-rust-unknown/unknown (unknown; {}; {})", OS, ARCH)
Expand Down
6 changes: 6 additions & 0 deletions sdk/cosmos/src/clients/cosmos_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::resources::ResourceType;
use crate::{headers::*, CosmosError};
use crate::{requests, ReadonlyString};

use azure_core::client_options::ClientOptions;
use azure_core::pipeline::Pipeline;
use azure_core::policies::{LinearRetryPolicy, Policy, TransportOptions, TransportPolicy};
use azure_core::Context;
Expand Down Expand Up @@ -32,6 +33,7 @@ pub struct CosmosClient {
}
/// TODO
pub struct CosmosOptions {
options: ClientOptions,
retry: Arc<dyn Policy>,
transport: TransportOptions,
}
Expand All @@ -40,6 +42,7 @@ impl CosmosOptions {
/// TODO
pub fn with_client(client: Arc<dyn HttpClient>) -> Self {
Self {
options: ClientOptions::default(),
retry: Arc::new(LinearRetryPolicy::default()), // this defaults to linear backoff
transport: TransportOptions::new(client),
}
Expand All @@ -52,6 +55,9 @@ fn new_pipeline_from_options(options: CosmosOptions) -> Pipeline {
let per_retry_policies = Vec::new();
let transport_policy = TransportPolicy::new(options.transport);
Pipeline::new(
option_env!("CARGO_PKG_NAME"),
option_env!("CARGO_PKG_VERSION"),
&options.options,
per_call_policies,
options.retry,
per_retry_policies,
Expand Down

0 comments on commit a6e592d

Please sign in to comment.