Skip to content

Commit

Permalink
feat: improve influxdb v2 api compability (#1831)
Browse files Browse the repository at this point in the history
* feat: support influxdb v2 api

* cr
  • Loading branch information
fengys1996 authored Jun 27, 2023
1 parent 313121f commit 99f0479
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 11 deletions.
5 changes: 3 additions & 2 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use tower_http::auth::AsyncRequireAuthorizationLayer;
use tower_http::trace::TraceLayer;

use self::authorize::HttpAuth;
use self::influxdb::{influxdb_health, influxdb_ping, influxdb_write};
use self::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
use crate::auth::UserProviderRef;
use crate::configurator::ConfiguratorRef;
use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu};
Expand Down Expand Up @@ -598,7 +598,8 @@ impl HttpServer {

fn route_influxdb<S>(&self, influxdb_handler: InfluxdbLineProtocolHandlerRef) -> Router<S> {
Router::new()
.route("/write", routing::post(influxdb_write))
.route("/write", routing::post(influxdb_write_v1))
.route("/api/v2/write", routing::post(influxdb_write_v2))
.route("/ping", routing::get(influxdb_ping))
.route("/health", routing::get(influxdb_health))
.with_state(influxdb_handler)
Expand Down
55 changes: 46 additions & 9 deletions src/servers/src/http/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,35 +41,70 @@ pub async fn influxdb_health() -> Result<impl IntoResponse> {
}

#[axum_macros::debug_handler]
pub async fn influxdb_write(
pub async fn influxdb_write_v1(
State(handler): State<InfluxdbLineProtocolHandlerRef>,
Query(mut params): Query<HashMap<String, String>>,
lines: String,
) -> Result<impl IntoResponse> {
let db = params
.remove("db")
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let _timer = timer!(
crate::metrics::METRIC_HTTP_INFLUXDB_WRITE_ELAPSED,
&[(crate::metrics::METRIC_DB_LABEL, db.clone())]
);
let (catalog, schema) = parse_catalog_and_schema_from_client_database_name(&db);
let ctx = Arc::new(QueryContext::with(catalog, schema));

let precision = params
.get("precision")
.map(|val| parse_time_precision(val))
.transpose()?;

influxdb_write(&db, precision, lines, handler).await
}

#[axum_macros::debug_handler]
pub async fn influxdb_write_v2(
State(handler): State<InfluxdbLineProtocolHandlerRef>,
Query(mut params): Query<HashMap<String, String>>,
lines: String,
) -> Result<impl IntoResponse> {
let db = params
.remove("bucket")
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());

let precision = params
.get("precision")
.map(|val| parse_time_precision(val))
.transpose()?;

influxdb_write(&db, precision, lines, handler).await
}

pub async fn influxdb_write(
db: &str,
precision: Option<Precision>,
lines: String,
handler: InfluxdbLineProtocolHandlerRef,
) -> Result<impl IntoResponse> {
let _timer = timer!(
crate::metrics::METRIC_HTTP_INFLUXDB_WRITE_ELAPSED,
&[(crate::metrics::METRIC_DB_LABEL, db.to_string())]
);

let (catalog, schema) = parse_catalog_and_schema_from_client_database_name(db);
let ctx = Arc::new(QueryContext::with(catalog, schema));

let request = InfluxdbRequest { precision, lines };

handler.exec(&request, ctx).await?;

Ok((StatusCode::NO_CONTENT, ()))
}

fn parse_time_precision(value: &str) -> Result<Precision> {
// Precision conversion needs to be compatible with influxdb v1 v2 api.
// For details, see the Influxdb documents.
// https://docs.influxdata.com/influxdb/v1.8/tools/api/#apiv2write-http-endpoint
// https://docs.influxdata.com/influxdb/v1.8/tools/api/#write-http-endpoint
match value {
"n" => Ok(Precision::Nanosecond),
"u" => Ok(Precision::Microsecond),
"n" | "ns" => Ok(Precision::Nanosecond),
"u" | "us" => Ok(Precision::Microsecond),
"ms" => Ok(Precision::Millisecond),
"s" => Ok(Precision::Second),
"m" => Ok(Precision::Minute),
Expand All @@ -90,7 +125,9 @@ mod tests {
#[test]
fn test_parse_time_precision() {
assert_eq!(Precision::Nanosecond, parse_time_precision("n").unwrap());
assert_eq!(Precision::Nanosecond, parse_time_precision("ns").unwrap());
assert_eq!(Precision::Microsecond, parse_time_precision("u").unwrap());
assert_eq!(Precision::Microsecond, parse_time_precision("us").unwrap());
assert_eq!(Precision::Millisecond, parse_time_precision("ms").unwrap());
assert_eq!(Precision::Second, parse_time_precision("s").unwrap());
assert_eq!(Precision::Minute, parse_time_precision("m").unwrap());
Expand Down

0 comments on commit 99f0479

Please sign in to comment.