Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Move ObjectStoreRegistry to datafusion_execution crate #5478

Merged
merged 4 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 50 additions & 1 deletion datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct PartitionedFile {
pub partition_values: Vec<ScalarValue>,
/// An optional file range for a more fine-grained parallel execution
pub range: Option<FileRange>,
/// An optional field for user defined per object metadata
/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
}

Expand Down Expand Up @@ -103,3 +103,52 @@ impl From<ObjectMeta> for PartitionedFile {
}
}
}

#[cfg(test)]
mod tests {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these tests are of ListingTableUrl which is in core/src/datasource/listing I moved them here

use datafusion_execution::object_store::ObjectStoreRegistry;
use object_store::local::LocalFileSystem;

use super::*;

#[test]
fn test_object_store_listing_url() {
let listing = ListingTableUrl::parse("file:///").unwrap();
let store = listing.object_store();
assert_eq!(store.as_str(), "file:///");

let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
let store = listing.object_store();
assert_eq!(store.as_str(), "s3://bucket/");
}

#[test]
fn test_get_by_url_hdfs() {
let sut = ObjectStoreRegistry::default();
sut.register_store("hdfs", "localhost:8020", Arc::new(LocalFileSystem::new()));
let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
sut.get_by_url(&url).unwrap();
}

#[test]
fn test_get_by_url_s3() {
let sut = ObjectStoreRegistry::default();
sut.register_store("s3", "bucket", Arc::new(LocalFileSystem::new()));
let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
sut.get_by_url(&url).unwrap();
}

#[test]
fn test_get_by_url_file() {
let sut = ObjectStoreRegistry::default();
let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
sut.get_by_url(&url).unwrap();
}

#[test]
fn test_get_by_url_local() {
let sut = ObjectStoreRegistry::default();
let url = ListingTableUrl::parse("../").unwrap();
sut.get_by_url(&url).unwrap();
}
}
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ pub mod file_format;
pub mod listing;
pub mod listing_table_factory;
pub mod memory;
pub mod object_store;
pub mod streaming;
pub mod view;

// backwards compatibility
pub use datafusion_execution::object_store;

use futures::Stream;

pub use self::datasource::TableProvider;
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/src/execution/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ use crate::{
execution::disk_manager::{DiskManager, DiskManagerConfig},
};

use crate::datasource::object_store::ObjectStoreRegistry;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the point of this PR is to remove the dependency here on datasource

The other references to datasource are removed in #5477

use crate::execution::memory_pool::{GreedyMemoryPool, MemoryPool, UnboundedMemoryPool};
use datafusion_common::DataFusionError;
use datafusion_execution::{
memory_pool::{GreedyMemoryPool, MemoryPool, UnboundedMemoryPool},
object_store::ObjectStoreRegistry,
};
use object_store::ObjectStore;
use std::fmt::{Debug, Formatter};
use std::path::PathBuf;
Expand Down
3 changes: 3 additions & 0 deletions datafusion/execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ path = "src/lib.rs"


[dependencies]
dashmap = "5.4.0"
datafusion-common = { path = "../common", version = "19.0.0" }
datafusion-expr = { path = "../expr", version = "19.0.0" }
hashbrown = { version = "0.13", features = ["raw"] }
log = "^0.4"
object_store = "0.5.4"
parking_lot = "0.12"
rand = "0.8"
tempfile = "3"
url = "2.2"
1 change: 1 addition & 0 deletions datafusion/execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@

pub mod disk_manager;
pub mod memory_pool;
pub mod object_store;
pub mod registry;
Original file line number Diff line number Diff line change
Expand Up @@ -217,22 +217,12 @@ impl ObjectStoreRegistry {
#[cfg(test)]
mod tests {
use super::*;
use crate::datasource::listing::ListingTableUrl;
use std::sync::Arc;

#[test]
fn test_object_store_url() {
let listing = ListingTableUrl::parse("file:///").unwrap();
let store = listing.object_store();
assert_eq!(store.as_str(), "file:///");

let file = ObjectStoreUrl::parse("file://").unwrap();
assert_eq!(file.as_str(), "file:///");

let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
let store = listing.object_store();
assert_eq!(store.as_str(), "s3://bucket/");

let url = ObjectStoreUrl::parse("s3://bucket").unwrap();
assert_eq!(url.as_str(), "s3://bucket/");

Expand All @@ -255,34 +245,4 @@ mod tests {
ObjectStoreUrl::parse("s3://username:password@host:123/foo").unwrap_err();
assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo");
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this into the datasource/listing module

#[test]
fn test_get_by_url_hdfs() {
let sut = ObjectStoreRegistry::default();
sut.register_store("hdfs", "localhost:8020", Arc::new(LocalFileSystem::new()));
let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
sut.get_by_url(&url).unwrap();
}

#[test]
fn test_get_by_url_s3() {
let sut = ObjectStoreRegistry::default();
sut.register_store("s3", "bucket", Arc::new(LocalFileSystem::new()));
let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
sut.get_by_url(&url).unwrap();
}

#[test]
fn test_get_by_url_file() {
let sut = ObjectStoreRegistry::default();
let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
sut.get_by_url(&url).unwrap();
}

#[test]
fn test_get_by_url_local() {
let sut = ObjectStoreRegistry::default();
let url = ListingTableUrl::parse("../").unwrap();
sut.get_by_url(&url).unwrap();
}
}