Skip to content

Commit

Permalink
feat: impl basic auth (#1531)
Browse files Browse the repository at this point in the history
## Rationale
Close #929

## Detailed Changes
- Added file authentication
- Modify the query and write paths, and add authentication

## Test Plan
- Existed tests
- Manual tests

---------

Co-authored-by: jiacai2050 <[email protected]>
  • Loading branch information
baojinri and jiacai2050 authored May 15, 2024
1 parent cf5ec10 commit 71d261d
Show file tree
Hide file tree
Showing 20 changed files with 324 additions and 31 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ workspace = true
arrow = { workspace = true }
arrow_ext = { workspace = true }
async-trait = { workspace = true }
base64 = { workspace = true }
bytes = { workspace = true }
catalog = { workspace = true }
clru = { workspace = true }
Expand Down
37 changes: 37 additions & 0 deletions src/proxy/src/auth/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use serde::{Deserialize, Serialize};

pub mod with_file;

/// Header of authorization
pub const AUTHORIZATION: &str = "authorization";

#[derive(Debug, Clone, Deserialize, Serialize, Default)]
pub enum AuthType {
#[default]
#[serde(rename = "file")]
File,
}

#[derive(Debug, Clone, Deserialize, Serialize, Default)]
pub struct Config {
pub enable: bool,
pub auth_type: AuthType,
pub source: String,
}
136 changes: 136 additions & 0 deletions src/proxy/src/auth/with_file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::{
collections::HashMap,
fs::File,
io::{self, BufRead},
path::Path,
};

use generic_error::BoxError;
use snafu::{OptionExt, ResultExt};
use tonic::service::Interceptor;

use crate::{
auth::AUTHORIZATION,
error::{Internal, InternalNoCause, Result},
};

#[derive(Debug, Clone, Default)]
pub struct AuthWithFile {
enable: bool,
file_path: String,
// name -> password
users: HashMap<String, String>,
}

impl AuthWithFile {
pub fn new(enable: bool, file_path: String) -> Self {
Self {
enable,
file_path,
users: HashMap::new(),
}
}

// Load a csv format config
pub fn load_credential(&mut self) -> Result<()> {
if !self.enable {
return Ok(());
}

let path = Path::new(&self.file_path);
if !path.exists() {
return InternalNoCause {
msg: format!("file not existed: {:?}", path),
}
.fail();
}

let file = File::open(path).box_err().context(Internal {
msg: "failed to open file",
})?;
let reader = io::BufReader::new(file);

for line in reader.lines() {
let line = line.box_err().context(Internal {
msg: "failed to read line",
})?;
let (username, password) = line.split_once(',').with_context(|| InternalNoCause {
msg: format!("invalid line: {:?}", line),
})?;
self.users
.insert(username.to_string(), password.to_string());
}

Ok(())
}

// TODO: currently we only support basic auth
// This function should return Result
pub fn identify(&self, input: Option<String>) -> bool {
if !self.enable {
return true;
}

let input = match input {
Some(v) => v,
None => return false,
};
let input = match input.split_once("Basic ") {
Some((_, encoded)) => match base64::decode(encoded) {
Ok(v) => v,
Err(_e) => return false,
},
None => return false,
};
let input = match std::str::from_utf8(&input) {
Ok(v) => v,
Err(_e) => return false,
};
match input.split_once(':') {
Some((user, pass)) => self
.users
.get(user)
.map(|expected| expected == pass)
.unwrap_or_default(),
None => false,
}
}
}

pub fn get_authorization<T>(req: &tonic::Request<T>) -> Option<String> {
req.metadata()
.get(AUTHORIZATION)
.and_then(|value| value.to_str().ok().map(String::from))
}

impl Interceptor for AuthWithFile {
fn call(
&mut self,
request: tonic::Request<()>,
) -> std::result::Result<tonic::Request<()>, tonic::Status> {
// TODO: extract username from request
let authorization = get_authorization(&request);
if self.identify(authorization) {
Ok(request)
} else {
Err(tonic::Status::unauthenticated("unauthenticated"))
}
}
}
9 changes: 9 additions & 0 deletions src/proxy/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub struct RequestContext {
pub timeout: Option<Duration>,
/// Request id
pub request_id: RequestId,
/// authorization
pub authorization: Option<String>,
}

impl RequestContext {
Expand All @@ -69,6 +71,7 @@ pub struct Builder {
catalog: String,
schema: String,
timeout: Option<Duration>,
authorization: Option<String>,
}

impl Builder {
Expand All @@ -87,6 +90,11 @@ impl Builder {
self
}

pub fn authorization(mut self, authorization: Option<String>) -> Self {
self.authorization = authorization;
self
}

pub fn build(self) -> Result<RequestContext> {
ensure!(!self.catalog.is_empty(), MissingCatalog);
ensure!(!self.schema.is_empty(), MissingSchema);
Expand All @@ -96,6 +104,7 @@ impl Builder {
schema: self.schema,
timeout: self.timeout,
request_id: RequestId::next_id(),
authorization: self.authorization,
})
}
}
13 changes: 11 additions & 2 deletions src/proxy/src/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use tonic::{
transport::{self, Channel},
};

use crate::FORWARDED_FROM;
use crate::{auth::AUTHORIZATION, FORWARDED_FROM};

#[derive(Debug, Snafu)]
pub enum Error {
Expand Down Expand Up @@ -206,6 +206,7 @@ pub struct ForwardRequest<Req> {
pub table: String,
pub req: tonic::Request<Req>,
pub forwarded_from: Option<String>,
pub authorization: Option<String>,
}

impl Forwarder<DefaultClientBuilder> {
Expand Down Expand Up @@ -283,6 +284,7 @@ impl<B: ClientBuilder> Forwarder<B> {
table,
req,
forwarded_from,
authorization,
} = forward_req;

let req_pb = RouteRequestPb {
Expand All @@ -309,7 +311,7 @@ impl<B: ClientBuilder> Forwarder<B> {
}
};

self.forward_with_endpoint(endpoint, req, forwarded_from, do_rpc)
self.forward_with_endpoint(endpoint, req, forwarded_from, authorization, do_rpc)
.await
}

Expand All @@ -318,6 +320,7 @@ impl<B: ClientBuilder> Forwarder<B> {
endpoint: Endpoint,
mut req: tonic::Request<Req>,
forwarded_from: Option<String>,
authorization: Option<String>,
do_rpc: F,
) -> Result<ForwardResult<Resp, Err>>
where
Expand Down Expand Up @@ -351,6 +354,11 @@ impl<B: ClientBuilder> Forwarder<B> {
self.local_endpoint.to_string().parse().unwrap(),
);

if let Some(authorization) = authorization {
req.metadata_mut()
.insert(AUTHORIZATION, authorization.parse().unwrap());
}

let client = self.get_or_create_client(&endpoint).await?;
match do_rpc(client, req, &endpoint).await {
Err(e) => {
Expand Down Expand Up @@ -503,6 +511,7 @@ mod tests {
table: table.to_string(),
req: query_request.into_request(),
forwarded_from: None,
authorization: None,
}
};

Expand Down
1 change: 1 addition & 0 deletions src/proxy/src/grpc/prom_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ impl Proxy {
msg: "Missing context",
code: StatusCode::BAD_REQUEST,
})?;

let schema = req_ctx.database;
let catalog = self.instance.catalog_manager.default_catalog_name();

Expand Down
1 change: 1 addition & 0 deletions src/proxy/src/grpc/sql_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ impl Proxy {
table: req.tables[0].clone(),
req: req.clone().into_request(),
forwarded_from: ctx.forwarded_from.clone(),
authorization: ctx.authorization.clone(),
};
let do_query = |mut client: StorageServiceClient<Channel>,
request: tonic::Request<SqlQueryRequest>,
Expand Down
19 changes: 8 additions & 11 deletions src/proxy/src/http/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@
//! It converts write request to gRPC write request, and
//! translates query request to SQL for execution.

use std::{
collections::HashMap,
result::Result as StdResult,
time::{Duration, Instant},
};
use std::{collections::HashMap, result::Result as StdResult, time::Instant};

use async_trait::async_trait;
use catalog::consts::DEFAULT_CATALOG;
Expand Down Expand Up @@ -83,7 +79,7 @@ impl Proxy {
}),
table_requests: write_table_requests,
};
let ctx = ProxyContext::new(ctx.timeout, None);
let ctx = ProxyContext::new(ctx.timeout, None, ctx.authorization);

match self.handle_write_internal(ctx, table_request).await {
Ok(result) => {
Expand Down Expand Up @@ -178,22 +174,23 @@ impl Proxy {
/// another HoraeDB instance.
pub async fn handle_prom_grpc_query(
&self,
timeout: Option<Duration>,
ctx: ProxyContext,
req: PrometheusRemoteQueryRequest,
) -> Result<PrometheusRemoteQueryResponse> {
let ctx = req.context.context(ErrNoCause {
let req_ctx = req.context.context(ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: "request context is missing",
})?;
let database = ctx.database.to_string();
let database = req_ctx.database.to_string();
let query = Query::decode(req.query.as_ref())
.box_err()
.context(Internal {
msg: "decode query failed",
})?;
let metric = find_metric(&query.matchers)?;
let builder = RequestContext::builder()
.timeout(timeout)
.timeout(ctx.timeout)
.authorization(ctx.authorization)
.schema(database)
// TODO: support different catalog
.catalog(DEFAULT_CATALOG.to_string());
Expand Down Expand Up @@ -235,7 +232,7 @@ impl RemoteStorage for Proxy {
query: query.encode_to_vec(),
};
if let Some(resp) = self
.maybe_forward_prom_remote_query(metric.clone(), remote_req)
.maybe_forward_prom_remote_query(ctx, metric.clone(), remote_req)
.await
.map_err(|e| {
error!("Forward prom remote query failed, err:{e}");
Expand Down
2 changes: 1 addition & 1 deletion src/proxy/src/http/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl Proxy {
req: Request,
) -> Result<Output> {
let schema = &ctx.schema;
let ctx = Context::new(ctx.timeout, None);
let ctx = Context::new(ctx.timeout, None, ctx.authorization.clone());

let query_res = self
.handle_sql(
Expand Down
2 changes: 1 addition & 1 deletion src/proxy/src/influxdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl Proxy {
}),
table_requests: write_table_requests,
};
let proxy_context = Context::new(ctx.timeout, None);
let proxy_context = Context::new(ctx.timeout, None, ctx.authorization);

match self
.handle_write_internal(proxy_context, table_request)
Expand Down
Loading

0 comments on commit 71d261d

Please sign in to comment.