Skip to content

Commit

Permalink
Merge branch 'main' into fix-detect-region
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuanwo authored Sep 26, 2023
2 parents aff2309 + 2da70d6 commit 2d29a63
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 95 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ OpenDAL is an active open-source project. We are always open to people who want
- [RisingWave](https://github.com/risingwavelabs/risingwave): A Distributed SQL Database for Stream Processing
- [Vector](https://github.com/vectordotdev/vector): A high-performance observability data pipeline.
- [OctoBase](https://github.com/toeverything/OctoBase): the open-source database behind [AFFiNE](https://github.com/toeverything/affine), local-first, yet collaborative.
- [Pants](https://github.com/pantsbuild/pants): A fast, scalable, user-friendly build system for codebases of all sizes.

## License

Expand Down
4 changes: 2 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ services-oss = [
"reqsign?/reqwest_request",
]
services-persy = ["dep:persy"]
services-postgresql = ["dep:tokio-postgres"]
services-postgresql = ["dep:tokio-postgres", "dep:bb8", "dep:bb8-postgres"]
services-redb = ["dep:redb"]
services-redis = ["dep:redis"]
services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"]
Expand Down Expand Up @@ -275,7 +275,7 @@ tokio-postgres = { version = "0.7.8", optional = true }
tracing = { version = "0.1", optional = true }
uuid = { version = "1", features = ["serde", "v4"] }
mysql_async = { version = "0.32.2", optional = true }

bb8-postgres = { version = "0.8.1", optional = true }

[dev-dependencies]
criterion = { version = "0.4", features = ["async", "async_tokio"] }
Expand Down
17 changes: 11 additions & 6 deletions core/src/services/azdls/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use super::error::parse_error;
use super::pager::AzdlsPager;
use super::writer::AzdlsWriter;
use crate::raw::*;
use crate::services::azdls::writer::AzdlsWriters;
use crate::*;

/// Known endpoint suffix Azure Data Lake Storage Gen2 URI syntax.
Expand Down Expand Up @@ -170,7 +171,7 @@ impl Builder for AzdlsBuilder {
.with_operation("Builder::build")
.with_context("service", Scheme::Azdls)),
}?;
debug!("backend use endpoint {}", &filesystem);
debug!("backend use endpoint {}", &endpoint);

let client = if let Some(client) = self.http_client.take() {
client
Expand Down Expand Up @@ -230,7 +231,7 @@ pub struct AzdlsBackend {
impl Accessor for AzdlsBackend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
type Writer = oio::OneShotWriter<AzdlsWriter>;
type Writer = AzdlsWriters;
type BlockingWriter = ();
type Pager = AzdlsPager;
type BlockingPager = ();
Expand All @@ -248,6 +249,7 @@ impl Accessor for AzdlsBackend {
read_with_range: true,

write: true,
write_can_append: true,
create_dir: true,
delete: true,
rename: true,
Expand Down Expand Up @@ -299,10 +301,13 @@ impl Accessor for AzdlsBackend {
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
Ok((
RpWrite::default(),
oio::OneShotWriter::new(AzdlsWriter::new(self.core.clone(), args, path.to_string())),
))
let w = AzdlsWriter::new(self.core.clone(), args.clone(), path.to_string());
let w = if args.append() {
AzdlsWriters::Two(oio::AppendObjectWriter::new(w))
} else {
AzdlsWriters::One(oio::OneShotWriter::new(w))
};
Ok((RpWrite::default(), w))
}

async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
Expand Down
8 changes: 5 additions & 3 deletions core/src/services/azdls/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,18 +203,20 @@ impl AzdlsCore {
pub fn azdls_update_request(
&self,
path: &str,
size: Option<usize>,
size: Option<u64>,
position: u64,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);

// - close: Make this is the final action to this file.
// - flush: Flush the file directly.
let url = format!(
"{}/{}/{}?action=append&close=true&flush=true&position=0",
"{}/{}/{}?action=append&close=true&flush=true&position={}",
self.endpoint,
self.filesystem,
percent_encode_path(&p)
percent_encode_path(&p),
position
);

let mut req = Request::patch(&url);
Expand Down
43 changes: 42 additions & 1 deletion core/src/services/azdls/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ use crate::raw::oio::WriteBuf;
use crate::raw::*;
use crate::*;

pub type AzdlsWriters =
oio::TwoWaysWriter<oio::OneShotWriter<AzdlsWriter>, oio::AppendObjectWriter<AzdlsWriter>>;

pub struct AzdlsWriter {
core: Arc<AzdlsCore>,

Expand Down Expand Up @@ -65,7 +68,8 @@ impl oio::OneShotWrite for AzdlsWriter {
let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
let mut req = self.core.azdls_update_request(
&self.path,
Some(bs.len()),
Some(bs.len() as u64),
0,
AsyncBody::ChunkedBytes(bs),
)?;

Expand All @@ -85,3 +89,40 @@ impl oio::OneShotWrite for AzdlsWriter {
}
}
}

#[async_trait]
impl oio::AppendObjectWrite for AzdlsWriter {
async fn offset(&self) -> Result<u64> {
let resp = self.core.azdls_get_properties(&self.path).await?;

let status = resp.status();
let headers = resp.headers();

match status {
StatusCode::OK => Ok(parse_content_length(headers)?.unwrap_or_default()),
StatusCode::NOT_FOUND => Ok(0),
_ => Err(parse_error(resp).await?),
}
}

async fn append(&self, offset: u64, size: u64, body: AsyncBody) -> Result<()> {
let mut req = self
.core
.azdls_update_request(&self.path, Some(size), offset, body)?;

self.core.sign(&mut req).await?;

let resp = self.core.send(req).await?;

let status = resp.status();
match status {
StatusCode::OK | StatusCode::ACCEPTED => {
resp.into_body().consume().await?;
Ok(())
}
_ => Err(parse_error(resp)
.await?
.with_operation("Backend::azdls_update_request")),
}
}
}
73 changes: 55 additions & 18 deletions core/src/services/ghac/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,26 @@ const GITHUB_REPOSITORY: &str = "GITHUB_REPOSITORY";
/// The github API version that used by OpenDAL.
const GITHUB_API_VERSION: &str = "2022-11-28";

fn value_or_env(
explicit_value: Option<String>,
env_var_name: &str,
operation: &'static str,
) -> Result<String> {
if let Some(value) = explicit_value {
return Ok(value);
}

env::var(env_var_name).map_err(|err| {
let text = format!(
"{} not found, maybe not in github action environment?",
env_var_name
);
Error::new(ErrorKind::ConfigInvalid, &text)
.with_operation(operation)
.set_source(err)
})
}

/// GitHub Action Cache Services support.
///
/// # Capabilities
Expand All @@ -83,8 +103,8 @@ const GITHUB_API_VERSION: &str = "2022-11-28";
///
/// Refer to [Caching dependencies to speed up workflows](https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows) for more information.
///
/// To make this service work as expected, please make sure the following
/// environment has been setup correctly:
/// To make this service work as expected, please make sure to either call `endpoint` and `token` to
/// configure the URL and credentials, or that the following environment has been setup correctly:
///
/// - `ACTIONS_CACHE_URL`
/// - `ACTIONS_RUNTIME_TOKEN`
Expand Down Expand Up @@ -151,6 +171,8 @@ pub struct GhacBuilder {
root: Option<String>,
version: Option<String>,
enable_create_simulation: bool,
endpoint: Option<String>,
runtime_token: Option<String>,

http_client: Option<HttpClient>,
}
Expand Down Expand Up @@ -191,6 +213,31 @@ impl GhacBuilder {
self
}

/// Set the endpoint for ghac service.
///
/// For example, this is provided as the `ACTIONS_CACHE_URL` environment variable by the GHA runner.
///
/// Default: the value of the `ACTIONS_CACHE_URL` environment variable.
pub fn endpoint(&mut self, endpoint: &str) -> &mut Self {
if !endpoint.is_empty() {
self.endpoint = Some(endpoint.to_string())
}
self
}

/// Set the runtime token for ghac service.
///
/// For example, this is provided as the `ACTIONS_RUNTIME_TOKEN` environment variable by the GHA
/// runner.
///
/// Default: the value of the `ACTIONS_RUNTIME_TOKEN` environment variable.
pub fn runtime_token(&mut self, runtime_token: &str) -> &mut Self {
if !runtime_token.is_empty() {
self.runtime_token = Some(runtime_token.to_string())
}
self
}

/// Specify the http client that used by this service.
///
/// # Notes
Expand Down Expand Up @@ -238,22 +285,12 @@ impl Builder for GhacBuilder {
root,
enable_create_simulation: self.enable_create_simulation,

cache_url: env::var(ACTIONS_CACHE_URL).map_err(|err| {
Error::new(
ErrorKind::ConfigInvalid,
"ACTIONS_CACHE_URL not found, maybe not in github action environment?",
)
.with_operation("Builder::build")
.set_source(err)
})?,
catch_token: env::var(ACTIONS_RUNTIME_TOKEN).map_err(|err| {
Error::new(
ErrorKind::ConfigInvalid,
"ACTIONS_RUNTIME_TOKEN not found, maybe not in github action environment?",
)
.with_operation("Builder::build")
.set_source(err)
})?,
cache_url: value_or_env(self.endpoint.take(), ACTIONS_CACHE_URL, "Builder::build")?,
catch_token: value_or_env(
self.runtime_token.take(),
ACTIONS_RUNTIME_TOKEN,
"Builder::build",
)?,
version: self
.version
.clone()
Expand Down
Loading

0 comments on commit 2d29a63

Please sign in to comment.