Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Addresses #29: Support missing version-hint.txt and provide additional options #63

Merged
merged 8 commits into from
Aug 13, 2024
54 changes: 37 additions & 17 deletions src/common/iceberg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,30 +167,51 @@ IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, File
return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec, skip_schema_inference);
}

// Function to generate a metadata file url
string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, const string &table_version, const string &metadata_compression_codec) {
if (metadata_compression_codec != "gzip") {
return fs.JoinPath(meta_path, "v" + table_version + ".metadata.json");
// Function to generate a metadata file url from version and format string
// default format is "v%s%s.metadata.json" -> v00###-xxxxxxxxx-.gz.metadata.json"
string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, string &table_version, string &metadata_compression_codec, string &version_format = DEFAULT_TABLE_VERSION_FORMAT) {
// TODO: Need to URL Encode table_version
string compression_suffix = "";
string url;
if (metadata_compression_codec == "gzip") {
compression_suffix = ".gz";
}
for(auto try_format : StringUtil::Split(version_format, ',')) {
url = fs.JoinPath(meta_path, StringUtil::Format(try_format, table_version, compression_suffix));
if(fs.FileExists(url)) {
return url;
}
}
return fs.JoinPath(meta_path, "v" + table_version + ".gz.metadata.json");

throw IOException(
"Iceberg metadata file not found for table version '%s' using '%s' compression and format(s): '%s'", table_version, metadata_compression_codec, version_format);
}

string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs, string metadata_compression_codec) {
string metadata_file_path;

string IcebergSnapshot::GetMetaDataPath(const string &path, FileSystem &fs, string metadata_compression_codec, string table_version = DEFAULT_VERSION_HINT_FILE, string version_format = DEFAULT_TABLE_VERSION_FORMAT) {
if (StringUtil::EndsWith(path, ".json")) {
metadata_file_path = path;
return path;
}

auto meta_path = fs.JoinPath(path, "metadata");
string version_hint;
if(StringUtil::EndsWith(table_version, ".text")||StringUtil::EndsWith(table_version, ".txt")) {
version_hint = GetTableVersion(meta_path, fs, table_version);
} else {
auto table_version = GetTableVersion(path, fs);
auto meta_path = fs.JoinPath(path, "metadata");
metadata_file_path = GenerateMetaDataUrl(fs, meta_path, table_version, metadata_compression_codec);
version_hint = table_version;
}
return GenerateMetaDataUrl(fs, meta_path, version_hint, metadata_compression_codec, version_format);
}


string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs, string metadata_compression_codec) {
if (metadata_compression_codec == "gzip") {
return IcebergUtils::GzFileToString(metadata_file_path, fs);
return IcebergUtils::GzFileToString(path, fs);
}
return IcebergUtils::FileToString(metadata_file_path, fs);
return IcebergUtils::FileToString(path, fs);
}


IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id,
vector<yyjson_val *> &schemas, string metadata_compression_codec,
bool skip_schema_inference) {
Expand All @@ -217,9 +238,8 @@ IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t icebe
return ret;
}

string IcebergSnapshot::GetTableVersion(const string &path, FileSystem &fs) {
auto meta_path = fs.JoinPath(path, "metadata");
auto version_file_path = fs.JoinPath(meta_path, "version-hint.text");
string IcebergSnapshot::GetTableVersion(const string &meta_path, FileSystem &fs, string version_file = DEFAULT_VERSION_HINT_FILE) {
auto version_file_path = fs.JoinPath(meta_path, version_file);
auto version_file_content = IcebergUtils::FileToString(version_file_path, fs);

try {
Expand Down Expand Up @@ -288,4 +308,4 @@ yyjson_val *IcebergSnapshot::IcebergSnapshot::FindSnapshotByIdTimestampInternal(
return max_snapshot;
}

} // namespace duckdb
} // namespace duckdb
26 changes: 20 additions & 6 deletions src/iceberg_functions/iceberg_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ static unique_ptr<FunctionData> IcebergMetaDataBind(ClientContext &context, Tabl
bool allow_moved_paths = false;
string metadata_compression_codec = "none";
bool skip_schema_inference = false;
string table_version = DEFAULT_VERSION_HINT_FILE;
string version_name_format = DEFAULT_TABLE_VERSION_FORMAT;

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
Expand All @@ -66,20 +68,26 @@ static unique_ptr<FunctionData> IcebergMetaDataBind(ClientContext &context, Tabl
metadata_compression_codec = StringValue::Get(kv.second);
} else if (loption == "skip_schema_inference") {
skip_schema_inference = BooleanValue::Get(kv.second);
} else if (loption == "version") {
table_version = StringValue::Get(kv.second);
} else if (loption == "version_name_format") {
version_name_format = StringValue::Get(kv.second);
}
}

auto iceberg_meta_path = IcebergSnapshot::GetMetaDataPath(iceberg_path, fs, metadata_compression_codec, table_version, version_name_format);
IcebergSnapshot snapshot_to_scan;
if (input.inputs.size() > 1) {
if (input.inputs[1].type() == LogicalType::UBIGINT) {
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec, skip_schema_inference);
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_meta_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec, skip_schema_inference);
} else if (input.inputs[1].type() == LogicalType::TIMESTAMP) {
snapshot_to_scan =
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec, skip_schema_inference);
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_meta_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec, skip_schema_inference);
} else {
throw InvalidInputException("Unknown argument type in IcebergScanBindReplace.");
}
} else {
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec, skip_schema_inference);
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_meta_path, fs, metadata_compression_codec, skip_schema_inference);
}

ret->iceberg_table =
Expand Down Expand Up @@ -143,23 +151,29 @@ TableFunctionSet IcebergFunctions::GetIcebergMetadataFunction() {

auto fun = TableFunction({LogicalType::VARCHAR}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
fun.named_parameters["version"] = LogicalType::VARCHAR;
fun.named_parameters["version_name_format"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
fun.named_parameters["version"] = LogicalType::VARCHAR;
fun.named_parameters["version_name_format"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
fun.named_parameters["version"] = LogicalType::VARCHAR;
fun.named_parameters["version_name_format"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

return function_set;
Expand Down
19 changes: 16 additions & 3 deletions src/iceberg_functions/iceberg_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
bool skip_schema_inference = false;
string mode = "default";
string metadata_compression_codec = "none";
string table_version = DEFAULT_VERSION_HINT_FILE;
string version_name_format = DEFAULT_TABLE_VERSION_FORMAT;

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
Expand All @@ -229,20 +231,25 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
metadata_compression_codec = StringValue::Get(kv.second);
} else if (loption == "skip_schema_inference") {
skip_schema_inference = BooleanValue::Get(kv.second);
} else if (loption == "version") {
table_version = StringValue::Get(kv.second);
} else if (loption == "version_name_format") {
version_name_format = StringValue::Get(kv.second);
}
}
auto iceberg_meta_path = IcebergSnapshot::GetMetaDataPath(iceberg_path, fs, metadata_compression_codec, table_version, version_name_format);
IcebergSnapshot snapshot_to_scan;
if (input.inputs.size() > 1) {
if (input.inputs[1].type() == LogicalType::UBIGINT) {
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec, skip_schema_inference);
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_meta_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec, skip_schema_inference);
} else if (input.inputs[1].type() == LogicalType::TIMESTAMP) {
snapshot_to_scan =
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec, skip_schema_inference);
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_meta_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec, skip_schema_inference);
} else {
throw InvalidInputException("Unknown argument type in IcebergScanBindReplace.");
}
} else {
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec, skip_schema_inference);
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_meta_path, fs, metadata_compression_codec, skip_schema_inference);
}

IcebergTable iceberg_table = IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec);
Expand Down Expand Up @@ -277,6 +284,8 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() {
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
fun.named_parameters["version"] = LogicalType::VARCHAR;
fun.named_parameters["version_name_format"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, nullptr, nullptr,
Expand All @@ -286,6 +295,8 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() {
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
fun.named_parameters["version"] = LogicalType::VARCHAR;
fun.named_parameters["version_name_format"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, nullptr, nullptr,
Expand All @@ -295,6 +306,8 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() {
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
fun.named_parameters["version"] = LogicalType::VARCHAR;
fun.named_parameters["version_name_format"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

return function_set;
Expand Down
17 changes: 16 additions & 1 deletion src/iceberg_functions/iceberg_snapshots.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ struct IcebergSnaphotsBindData : public TableFunctionData {
IcebergSnaphotsBindData() {};
string filename;
string metadata_compression_codec;
string table_version;
string version_name_format;
bool skip_schema_inference = false;
};

Expand All @@ -29,7 +31,10 @@ struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState
auto global_state = make_uniq<IcebergSnapshotGlobalTableFunctionState>();

FileSystem &fs = FileSystem::GetFileSystem(context);
global_state->metadata_file = IcebergSnapshot::ReadMetaData(bind_data.filename, fs, bind_data.metadata_compression_codec);

auto iceberg_meta_path = IcebergSnapshot::GetMetaDataPath(
bind_data.filename, fs, bind_data.metadata_compression_codec, bind_data.table_version, bind_data.version_name_format);
global_state->metadata_file = IcebergSnapshot::ReadMetaData(iceberg_meta_path, fs, bind_data.metadata_compression_codec);
global_state->metadata_doc =
yyjson_read(global_state->metadata_file.c_str(), global_state->metadata_file.size(), 0);
auto root = yyjson_doc_get_root(global_state->metadata_doc);
Expand All @@ -50,19 +55,27 @@ static unique_ptr<FunctionData> IcebergSnapshotsBind(ClientContext &context, Tab
auto bind_data = make_uniq<IcebergSnaphotsBindData>();

string metadata_compression_codec = "none";
string table_version = DEFAULT_VERSION_HINT_FILE;
string version_name_format = DEFAULT_TABLE_VERSION_FORMAT;
bool skip_schema_inference = false;

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
if (loption == "metadata_compression_codec") {
metadata_compression_codec = StringValue::Get(kv.second);
} else if (loption == "version") {
table_version = StringValue::Get(kv.second);
} else if (loption == "version_name_format") {
version_name_format = StringValue::Get(kv.second);
} else if (loption == "skip_schema_inference") {
skip_schema_inference = BooleanValue::Get(kv.second);
}
}
bind_data->filename = input.inputs[0].ToString();
bind_data->metadata_compression_codec = metadata_compression_codec;
bind_data->skip_schema_inference = skip_schema_inference;
bind_data->table_version = table_version;
bind_data->version_name_format = version_name_format;

names.emplace_back("sequence_number");
return_types.emplace_back(LogicalType::UBIGINT);
Expand Down Expand Up @@ -115,6 +128,8 @@ TableFunctionSet IcebergFunctions::GetIcebergSnapshotsFunction() {
TableFunction table_function({LogicalType::VARCHAR}, IcebergSnapshotsFunction, IcebergSnapshotsBind,
IcebergSnapshotGlobalTableFunctionState::Init);
table_function.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
table_function.named_parameters["version"] = LogicalType::VARCHAR;
table_function.named_parameters["version_name_format"] = LogicalType::VARCHAR;
table_function.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
function_set.AddFunction(table_function);
return function_set;
Expand Down
19 changes: 13 additions & 6 deletions src/include/iceberg_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ using namespace duckdb_yyjson;

namespace duckdb {

// First arg is version string, arg is either empty or ".gz" if gzip
// Allows for both "v###.gz.metadata.json" and "###.metadata.json" styles
static string DEFAULT_TABLE_VERSION_FORMAT = "v%s%s.metadata.json,%s%s.metadata.json";

static string DEFAULT_VERSION_HINT_FILE = "version-hint.text";

struct IcebergColumnDefinition {
public:
static IcebergColumnDefinition ParseFromJson(yyjson_val *val);
Expand Down Expand Up @@ -61,19 +67,20 @@ class IcebergSnapshot {
vector<IcebergColumnDefinition> schema;
string metadata_compression_codec = "none";

static IcebergSnapshot GetLatestSnapshot(const string &path, FileSystem &fs, string GetSnapshotByTimestamp, bool skip_schema_inference);
static IcebergSnapshot GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, string GetSnapshotByTimestamp, bool skip_schema_inference);
static IcebergSnapshot GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string GetSnapshotByTimestamp, bool skip_schema_inference);
static IcebergSnapshot GetLatestSnapshot(const string &path, FileSystem &fs, string metadata_compression_codec, bool skip_schema_inference);
static IcebergSnapshot GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, string metadata_compression_codec, bool skip_schema_inference);
static IcebergSnapshot GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string metadata_compression_codec, bool skip_schema_inference);

static IcebergSnapshot ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id,
vector<yyjson_val *> &schemas, string metadata_compression_codec, bool skip_schema_inference);
static string ReadMetaData(const string &path, FileSystem &fs, string GetSnapshotByTimestamp);
static string GetMetaDataPath(const string &path, FileSystem &fs, string metadata_compression_codec, string table_version, string version_format);
static string ReadMetaData(const string &path, FileSystem &fs, string metadata_compression_codec);
static yyjson_val *GetSnapshots(const string &path, FileSystem &fs, string GetSnapshotByTimestamp);
static unique_ptr<SnapshotParseInfo> GetParseInfo(yyjson_doc &metadata_json);

protected:
//! Internal JSON parsing functions
static string GetTableVersion(const string &path, FileSystem &fs);
static string GetTableVersion(const string &path, FileSystem &fs, string version_format);
static yyjson_val *FindLatestSnapshotInternal(yyjson_val *snapshots);
static yyjson_val *FindSnapshotByIdInternal(yyjson_val *snapshots, idx_t target_id);
static yyjson_val *FindSnapshotByIdTimestampInternal(yyjson_val *snapshots, timestamp_t timestamp);
Expand Down Expand Up @@ -124,4 +131,4 @@ struct IcebergTable {
string path;
};

} // namespace duckdb
} // namespace duckdb
Loading
Loading