Skip to content

Commit

Permalink
feat: make sure table locations are unique (#335)
Browse files Browse the repository at this point in the history
* make sure table locations are unique

* locations never have trailing slash

* remove cache due to issues with deletions, replace stringy locations by typed locations
  • Loading branch information
twuebi authored Sep 17, 2024
1 parent 8e7e825 commit 543db50
Show file tree
Hide file tree
Showing 26 changed files with 704 additions and 420 deletions.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ reqwest = { version = "^0.12", default-features = false, features = [
"json",
"rustls-tls",
] }
iceberg = { git = "https://github.com/hansetag/iceberg-rust.git", rev = "f971738", features = [
iceberg = { git = "https://github.com/hansetag/iceberg-rust.git", rev = "47bfdda", features = [
"storage-all",
] }
typed-builder = "^0.19.1"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- the old text_pattern_ops index was not useful for our only query which used something like:
-- select 1 from tabular where '123' like location || '%';
-- so we drop it and create a new btree index instead that we'll use for exact match queries
drop index if exists tabular_location_idx;
create index tabular_namespace_id_location_idx on tabular (namespace_id, location);
2 changes: 1 addition & 1 deletion crates/iceberg-catalog/src/catalog/commit_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::service::{ErrorModel, Result};
/// Apply the commits to table metadata.
pub(super) fn apply_commit(
metadata: TableMetadata,
metadata_location: &Option<String>,
metadata_location: &Option<Location>,
requirements: &[TableRequirement],
updates: Vec<TableUpdate>,
) -> Result<TableMetadata> {
Expand Down
36 changes: 33 additions & 3 deletions crates/iceberg-catalog/src/catalog/io.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::api::{ErrorModel, Result};
use crate::service::storage::path_utils;
use futures::stream::BoxStream;
use futures::StreamExt;
use iceberg::io::FileIO;
use iceberg_ext::catalog::rest::IcebergErrorResponse;
use iceberg_ext::configs::Location;
Expand Down Expand Up @@ -91,6 +93,33 @@ pub(crate) async fn remove_all(file_io: &FileIO, location: &Location) -> Result<
Ok(())
}

pub(crate) const DEFAULT_LIST_LOCATION_PAGE_SIZE: usize = 1000;

pub(crate) async fn list_location<'a>(
file_io: &'a FileIO,
location: &'a Location,
page_size: Option<usize>,
) -> Result<BoxStream<'a, std::result::Result<Vec<String>, IoError>>, IoError> {
let location = path_utils::reduce_scheme_string(location.as_str(), false);
tracing::debug!("Listing location: {}", location);
let entries = file_io
.list_paginated(
format!("{}/", location.trim_end_matches('/')).as_str(),
true,
page_size.unwrap_or(DEFAULT_LIST_LOCATION_PAGE_SIZE),
)
.await
.map_err(IoError::List)?
.map(|res| match res {
Ok(entries) => Ok(entries
.into_iter()
.map(|it| it.path().to_string())
.collect()),
Err(e) => Err(IoError::List(e)),
});
Ok(entries.boxed())
}

#[derive(thiserror::Error, Debug, strum::IntoStaticStr)]
pub enum IoError {
#[error("Failed to create file. Please check the storage credentials.")]
Expand All @@ -113,6 +142,8 @@ pub enum IoError {
FileDelete(#[source] iceberg::Error),
#[error("Failed to remove all files in location. Please check the storage credentials.")]
FileRemoveAll(#[source] iceberg::Error),
#[error("Failed to list files in location. Please check the storage credentials.")]
List(#[source] iceberg::Error),
}

impl IoError {
Expand All @@ -134,9 +165,8 @@ impl From<IoError> for IcebergErrorResponse {
| IoError::FileClose(_)
| IoError::FileWrite(_)
| IoError::FileWriterCreation(_)
| IoError::FileCreation(_) => {
ErrorModel::failed_dependency(message, typ, Some(boxed)).into()
}
| IoError::FileCreation(_)
| IoError::List(_) => ErrorModel::failed_dependency(message, typ, Some(boxed)).into(),

IoError::FileCompression(_) | IoError::Write(_) | IoError::Serialization(_) => {
ErrorModel::internal(message, typ, Some(boxed)).into()
Expand Down
52 changes: 21 additions & 31 deletions crates/iceberg-catalog/src/catalog/s3_signer/sign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use super::super::CatalogServer;
use super::error::SignError;
use crate::catalog::require_warehouse_id;
use crate::request_metadata::RequestMetadata;
use crate::service::caches::LocationCache;
use crate::service::secrets::SecretStore;
use crate::service::storage::{S3Location, S3Profile, StorageCredential};
use crate::service::{auth::AuthZHandler, Catalog, ListFlags, State};
Expand Down Expand Up @@ -61,7 +60,6 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
let include_staged = true;

let parsed_url = s3_utils::parse_s3_url(&request_url)?;
let table_location = &parsed_url.location.to_string();

// Unfortunately there is currently no way to pass information about warehouse_id & table_id
// to this function from a get_table or create_table process without exchanging the token.
Expand All @@ -70,10 +68,10 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
// We are looking for the path in the database, which allows us to also work with AuthN solutions
// that do not support custom data in tokens. Perspectively, we should
// try to get per-table signer.uri support in Spark.
let table_id = if let Ok(table_id) = require_table_id(table) {
table_id
let (table_id, table_metadata) = if let Ok(table_id) = require_table_id(table.clone()) {
(table_id, None)
} else {
C::get_table_id_by_s3_location_cached(
let table_metadata = C::get_table_metadata_by_s3_location(
warehouse_id,
parsed_url.location.location(),
ListFlags {
Expand All @@ -94,7 +92,9 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
.r#type("InvalidLocation".to_string())
.source(Some(Box::new(e.error)))
.build()
})?
})?;

(table_metadata.table_id, Some(table_metadata))
};

// First check - fail fast if requested table is not allowed.
Expand All @@ -117,31 +117,21 @@ impl<C: Catalog, A: AuthZHandler, S: SecretStore>
metadata_location: _,
storage_secret_ident,
storage_profile,
} = match C::get_table_metadata_by_id(
warehouse_id,
table_id,
ListFlags {
include_staged,
include_deleted: true,
include_active: true,
},
state.v1_state.catalog.clone(),
)
.await
{
Ok(ok) => ok,
Err(err) => {
// If the table is not found, we can remove the location from the cache
if let 404 = err.error.code {
state
.v1_state
.catalog
.clone()
.remove_location(table_location.as_str())
.await;
}
return Err(err);
}
} = if let Some(table_metadata) = table_metadata {
table_metadata
} else {
C::get_table_metadata_by_id(
warehouse_id,
table_id,
ListFlags {
include_staged,
// we were able to resolve the table to id so we know the table is not deleted
include_deleted: false,
include_active: true,
},
state.v1_state.catalog,
)
.await?
};

let extend_err = |mut e: IcebergErrorResponse| {
Expand Down
Loading

0 comments on commit 543db50

Please sign in to comment.