Skip to content

Commit

Permalink
[FEAT] Native globbing for other backends (#1460)
Browse files Browse the repository at this point in the history
Adds and tests globbing capabilities for:

1. Local
2. GCS
3. HTTP

Azure support is also added, but not tested yet

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Oct 4, 2023
1 parent 1d4248a commit cdbcec4
Show file tree
Hide file tree
Showing 17 changed files with 865 additions and 818 deletions.
65 changes: 42 additions & 23 deletions src/daft-io/src/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ use crate::{
};
use common_io_config::AzureConfig;

const AZURE_DELIMITER: &str = "/";
const DEFAULT_GLOB_FANOUT_LIMIT: usize = 1024;

#[derive(Debug, Snafu)]
enum Error {
// Input errors.
Expand Down Expand Up @@ -170,23 +173,25 @@ impl AzureBlobSource {
protocol: &str,
container_name: &str,
prefix: &str,
delimiter: &str,
posix: bool,
) -> BoxStream<super::Result<FileMetadata>> {
let container_client = self.blob_client.container_client(container_name);

// Clone and own some references that we need for the lifetime of the stream.
let protocol = protocol.to_string();
let container_name = container_name.to_string();
let prefix = prefix.to_string();
let delimiter = delimiter.to_string();

// Blob stores expose listing by prefix and delimiter,
// but this is not the exact same as a unix-like LS behaviour
// (e.g. /somef is a prefix of /somefile, but you cannot ls /somef)
// To use prefix listing as LS, we need to ensure the path given is exactly a directory or a file, not a prefix.

// It turns out Azure list_blobs("path/") will match both a file at "path" and a folder at "path/", which is exactly what we need.
let prefix_with_delimiter = format!("{}{delimiter}", prefix.trim_end_matches(&delimiter));
let prefix_with_delimiter = format!(
"{}{AZURE_DELIMITER}",
prefix.trim_end_matches(&AZURE_DELIMITER)
);
let full_path = format!("{}://{}{}", protocol, container_name, prefix);
let full_path_with_trailing_delimiter = format!(
"{}://{}{}",
Expand All @@ -199,7 +204,7 @@ impl AzureBlobSource {
&protocol,
&container_name,
&prefix_with_delimiter,
&delimiter,
&posix,
)
.await;

Expand Down Expand Up @@ -241,15 +246,15 @@ impl AzureBlobSource {
// To check whether the prefix actually exists, check whether it exists as a result one directory above.
// (Azure does not return marker files for empty directories.)
let upper_dir = prefix // "/upper/blah/"
.trim_end_matches(&delimiter) // "/upper/blah"
.trim_end_matches(|c: char| c.to_string() != delimiter); // "/upper/"
.trim_end_matches(&AZURE_DELIMITER) // "/upper/blah"
.trim_end_matches(|c: char| c.to_string() != AZURE_DELIMITER); // "/upper/"

let upper_results_stream = self._list_directory_delimiter_stream(
&container_client,
&protocol,
&container_name,
upper_dir,
&delimiter,
&posix,
).await;

// At this point, we have a stream of Result<FileMetadata>.
Expand Down Expand Up @@ -296,7 +301,7 @@ impl AzureBlobSource {
protocol: &str,
container_name: &str,
prefix: &str,
delimiter: &str,
posix: &bool,
) -> BoxStream<super::Result<FileMetadata>> {
// Calls Azure list_blobs with the prefix
// and returns the result flattened and standardized into FileMetadata.
Expand All @@ -307,11 +312,14 @@ impl AzureBlobSource {
let prefix = prefix.to_string();

// Paginated response stream from Azure API.
let responses_stream = container_client
.list_blobs()
.delimiter(delimiter.to_string())
.prefix(prefix.clone())
.into_stream();
let mut responses_stream = container_client.list_blobs().prefix(prefix.clone());

// Setting delimiter will trigger "directory-mode" which is a posix-like ls for the current directory
if *posix {
responses_stream = responses_stream.delimiter(AZURE_DELIMITER.to_string());
}

let responses_stream = responses_stream.into_stream();

// Map each page of results to a page of standardized FileMetadata.
responses_stream
Expand Down Expand Up @@ -373,6 +381,10 @@ impl AzureBlobSource {

#[async_trait]
impl ObjectSource for AzureBlobSource {
fn delimiter(&self) -> &'static str {
AZURE_DELIMITER
}

async fn get(&self, uri: &str, range: Option<Range<usize>>) -> super::Result<GetResult> {
let parsed = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?;
let container = match parsed.host_str() {
Expand Down Expand Up @@ -427,20 +439,28 @@ impl ObjectSource for AzureBlobSource {
Ok(metadata.blob.properties.content_length as usize)
}

async fn glob(
self: Arc<Self>,
glob_path: &str,
fanout_limit: Option<usize>,
page_size: Option<i32>,
) -> super::Result<BoxStream<super::Result<FileMetadata>>> {
use crate::object_store_glob::glob;

// Ensure fanout_limit is not None to prevent runaway concurrency
let fanout_limit = fanout_limit.or(Some(DEFAULT_GLOB_FANOUT_LIMIT));

glob(self, glob_path, fanout_limit, page_size.or(Some(1000))).await
}

async fn iter_dir(
&self,
uri: &str,
delimiter: &str,
posix: bool,
_page_size: Option<i32>,
_limit: Option<usize>,
) -> super::Result<BoxStream<super::Result<FileMetadata>>> {
let uri = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?;

if !posix {
todo!("Prefix-listing is not yet implemented for Azure");
}

// path can be root (buckets) or path prefix within a bucket.
let container = {
// "Container" is Azure's name for Bucket.
Expand Down Expand Up @@ -469,7 +489,7 @@ impl ObjectSource for AzureBlobSource {
Some(container_name) => {
let prefix = uri.path();
Ok(self
.list_directory_stream(protocol, container_name, prefix, delimiter)
.list_directory_stream(protocol, container_name, prefix, posix)
.await)
}
}
Expand All @@ -478,10 +498,9 @@ impl ObjectSource for AzureBlobSource {
async fn ls(
&self,
path: &str,
delimiter: &str,
posix: bool,
continuation_token: Option<&str>,
page_size: Option<i32>,
_page_size: Option<i32>,
) -> super::Result<LSResult> {
// It looks like the azure rust library API
// does not currently allow using the continuation token:
Expand All @@ -496,7 +515,7 @@ impl ObjectSource for AzureBlobSource {
}?;

let files = self
.iter_dir(path, delimiter, posix, page_size, None)
.iter_dir(path, posix, None)
.await?
.try_collect::<Vec<_>>()
.await?;
Expand Down
196 changes: 0 additions & 196 deletions src/daft-io/src/glob.rs

This file was deleted.

Loading

0 comments on commit cdbcec4

Please sign in to comment.