From f33b2e35e4d532125db8d6b22133fdc302c162d9 Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Fri, 25 Aug 2023 17:28:15 +0200 Subject: [PATCH] adds support for v1 spec, including test --- CMakeLists.txt | 4 +- Makefile | 8 +- .../test_data_generator/generate_iceberg.py | 19 ++- .../{updates => updates_v1}/q01.sql | 1 - .../{updates => updates_v1}/q02.sql | 0 .../{updates => updates_v1}/q03.sql | 0 .../test_data_generator/updates_v1/q04.sql | 3 + .../test_data_generator/updates_v1/q05.sql | 3 + .../test_data_generator/updates_v2/q01.sql | 16 +++ .../test_data_generator/updates_v2/q02.sql | 3 + .../test_data_generator/updates_v2/q03.sql | 2 + .../{updates => updates_v2}/q04.sql | 0 .../{updates => updates_v2}/q05.sql | 0 src/common/iceberg.cpp | 69 +++++++---- src/iceberg_functions/iceberg_snapshots.cpp | 5 +- .../iceberg_manifest_entry_partial_v1.hpp | 117 ++++++++++++++++++ .../iceberg_manifest_file_partial_v1.hpp | 64 ++++++++++ src/include/iceberg_metadata.hpp | 8 +- src/include/iceberg_types.hpp | 51 +++++++- ...iceberg_scan_generated_data_0_01.test_slow | 33 ++++- ..._scan_generated_data_0_01_remote.test_slow | 4 +- vcpkg.json | 9 +- 22 files changed, 368 insertions(+), 51 deletions(-) rename scripts/test_data_generator/{updates => updates_v1}/q01.sql (97%) rename scripts/test_data_generator/{updates => updates_v1}/q02.sql (100%) rename scripts/test_data_generator/{updates => updates_v1}/q03.sql (100%) create mode 100644 scripts/test_data_generator/updates_v1/q04.sql create mode 100644 scripts/test_data_generator/updates_v1/q05.sql create mode 100644 scripts/test_data_generator/updates_v2/q01.sql create mode 100644 scripts/test_data_generator/updates_v2/q02.sql create mode 100644 scripts/test_data_generator/updates_v2/q03.sql rename scripts/test_data_generator/{updates => updates_v2}/q04.sql (100%) rename scripts/test_data_generator/{updates => updates_v2}/q05.sql (100%) create mode 100644 src/include/avro_codegen/iceberg_manifest_entry_partial_v1.hpp create mode 100644 src/include/avro_codegen/iceberg_manifest_file_partial_v1.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 3960159..2419e5c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -75,9 +75,9 @@ target_link_libraries( # Link dependencies into extension target_link_libraries(${EXTENSION_NAME} PUBLIC optimized avro_static_release - debug avro_static_release) + debug avro_static_debug) target_link_libraries(${TARGET_NAME}_loadable_extension optimized - avro_static_release debug avro_static_release) + avro_static_release debug avro_static_debug) install( TARGETS ${EXTENSION_NAME} ${TARGET_NAME}_loadable_extension diff --git a/Makefile b/Makefile index bf258b9..8ce5142 100644 --- a/Makefile +++ b/Makefile @@ -109,11 +109,11 @@ update: git submodule update --remote --merge data: data_clean - python3 scripts/test_data_generator/generate_iceberg.py 0.01 data/iceberg/generated_0_01 + python3 scripts/test_data_generator/generate_iceberg.py 0.01 data/iceberg/generated_spec1_0_01 1 + python3 scripts/test_data_generator/generate_iceberg.py 0.01 data/iceberg/generated_spec2_0_01 2 -data_large: data_clean - python3 scripts/test_data_generator/generate_iceberg.py 0.01 data/iceberg/generated_0_01 - python3 scripts/test_data_generator/generate_iceberg.py 1 data/iceberg/generated_1 +data_large: data data_clean + python3 scripts/test_data_generator/generate_iceberg.py 1 data/iceberg/generated_spec2_1 2 data_clean: rm -rf data/iceberg/generated_* \ No newline at end of file diff --git a/scripts/test_data_generator/generate_iceberg.py b/scripts/test_data_generator/generate_iceberg.py index 60edb8e..81fd177 100755 --- a/scripts/test_data_generator/generate_iceberg.py +++ b/scripts/test_data_generator/generate_iceberg.py @@ -7,12 +7,14 @@ from pyspark import SparkContext from pathlib import Path -if (len(sys.argv) != 3): - print("Usage: generate_iceberg.py ") +if (len(sys.argv) != 4 ): + print("Usage: generate_iceberg.py ") exit(1) SCALE = sys.argv[1] DEST_PATH = sys.argv[2] +ICEBERG_SPEC_VERSION = sys.argv[3] + PARQUET_SRC_FILE = f'{DEST_PATH}/base_file/file.parquet' TABLE_NAME = "iceberg_catalog.pyspark_iceberg_table"; CWD = os.getcwd() @@ -43,17 +45,24 @@ ### Create Iceberg table from dataset ### spark.read.parquet(PARQUET_SRC_FILE).createOrReplaceTempView('parquet_file_view'); -spark.sql(f"CREATE or REPLACE TABLE {TABLE_NAME} TBLPROPERTIES ('format-version'='2', 'write.update.mode'='merge-on-read') AS SELECT * FROM parquet_file_view"); + +if ICEBERG_SPEC_VERSION == '1': + spark.sql(f"CREATE or REPLACE TABLE {TABLE_NAME} TBLPROPERTIES ('format-version'='{ICEBERG_SPEC_VERSION}') AS SELECT * FROM parquet_file_view"); +elif ICEBERG_SPEC_VERSION == '2': + spark.sql(f"CREATE or REPLACE TABLE {TABLE_NAME} TBLPROPERTIES ('format-version'='{ICEBERG_SPEC_VERSION}', 'write.update.mode'='merge-on-read') AS SELECT * FROM parquet_file_view"); +else: + print(f"Are you from the future? Iceberg spec version '{ICEBERG_SPEC_VERSION}' is unbeknownst to me") + exit(1) ### ### Apply modifications to base table generating verification results between each step ### -update_files = [str(path) for path in Path(f'{SCRIPT_DIR}').rglob('*.sql')] +update_files = [str(path) for path in Path(f'{SCRIPT_DIR}/updates_v{ICEBERG_SPEC_VERSION}').rglob('*.sql')] update_files.sort() # Order matters obviously last_file = "" for path in update_files: - full_file_path = f"{SCRIPT_DIR}/updates/{os.path.basename(path)}" + full_file_path = f"{SCRIPT_DIR}/updates_v{ICEBERG_SPEC_VERSION}/{os.path.basename(path)}" with open(full_file_path, 'r') as file: file_trimmed = os.path.basename(path)[:-4] last_file = file_trimmed diff --git a/scripts/test_data_generator/updates/q01.sql b/scripts/test_data_generator/updates_v1/q01.sql similarity index 97% rename from scripts/test_data_generator/updates/q01.sql rename to scripts/test_data_generator/updates_v1/q01.sql index e64db80..cb49fe2 100644 --- a/scripts/test_data_generator/updates/q01.sql +++ b/scripts/test_data_generator/updates_v1/q01.sql @@ -9,7 +9,6 @@ set l_orderkey_bool=NULL, l_commitdate_timestamp=NULL, l_commitdate_timestamp_tz=NULL, l_comment_string=NULL, - uuid=NULL, l_comment_blob=NULL, l_shipmode_quantity_struct=NULL, l_linenumber_quantity_list=NULL, diff --git a/scripts/test_data_generator/updates/q02.sql b/scripts/test_data_generator/updates_v1/q02.sql similarity index 100% rename from scripts/test_data_generator/updates/q02.sql rename to scripts/test_data_generator/updates_v1/q02.sql diff --git a/scripts/test_data_generator/updates/q03.sql b/scripts/test_data_generator/updates_v1/q03.sql similarity index 100% rename from scripts/test_data_generator/updates/q03.sql rename to scripts/test_data_generator/updates_v1/q03.sql diff --git a/scripts/test_data_generator/updates_v1/q04.sql b/scripts/test_data_generator/updates_v1/q04.sql new file mode 100644 index 0000000..e64ea11 --- /dev/null +++ b/scripts/test_data_generator/updates_v1/q04.sql @@ -0,0 +1,3 @@ +update iceberg_catalog.pyspark_iceberg_table +set l_orderkey_bool = false +where l_partkey_int % 4 = 0; \ No newline at end of file diff --git a/scripts/test_data_generator/updates_v1/q05.sql b/scripts/test_data_generator/updates_v1/q05.sql new file mode 100644 index 0000000..c5eb8cb --- /dev/null +++ b/scripts/test_data_generator/updates_v1/q05.sql @@ -0,0 +1,3 @@ +update iceberg_catalog.pyspark_iceberg_table +set l_orderkey_bool = false +where l_partkey_int % 5 = 0; \ No newline at end of file diff --git a/scripts/test_data_generator/updates_v2/q01.sql b/scripts/test_data_generator/updates_v2/q01.sql new file mode 100644 index 0000000..cb49fe2 --- /dev/null +++ b/scripts/test_data_generator/updates_v2/q01.sql @@ -0,0 +1,16 @@ +update iceberg_catalog.pyspark_iceberg_table +set l_orderkey_bool=NULL, + l_partkey_int=NULL, + l_suppkey_long=NULL, + l_extendedprice_float=NULL, + l_extendedprice_double=NULL, + l_shipdate_date=NULL, + l_partkey_time=NULL, + l_commitdate_timestamp=NULL, + l_commitdate_timestamp_tz=NULL, + l_comment_string=NULL, + l_comment_blob=NULL, + l_shipmode_quantity_struct=NULL, + l_linenumber_quantity_list=NULL, + l_linenumber_quantity_map=NULL +where l_partkey_int % 2 = 0; \ No newline at end of file diff --git a/scripts/test_data_generator/updates_v2/q02.sql b/scripts/test_data_generator/updates_v2/q02.sql new file mode 100644 index 0000000..63a7b39 --- /dev/null +++ b/scripts/test_data_generator/updates_v2/q02.sql @@ -0,0 +1,3 @@ +insert into iceberg_catalog.pyspark_iceberg_table +select * FROM iceberg_catalog.pyspark_iceberg_table +where l_extendedprice_double < 30000 \ No newline at end of file diff --git a/scripts/test_data_generator/updates_v2/q03.sql b/scripts/test_data_generator/updates_v2/q03.sql new file mode 100644 index 0000000..952e135 --- /dev/null +++ b/scripts/test_data_generator/updates_v2/q03.sql @@ -0,0 +1,2 @@ +update iceberg_catalog.pyspark_iceberg_table +set l_orderkey_bool = not l_orderkey_bool; \ No newline at end of file diff --git a/scripts/test_data_generator/updates/q04.sql b/scripts/test_data_generator/updates_v2/q04.sql similarity index 100% rename from scripts/test_data_generator/updates/q04.sql rename to scripts/test_data_generator/updates_v2/q04.sql diff --git a/scripts/test_data_generator/updates/q05.sql b/scripts/test_data_generator/updates_v2/q05.sql similarity index 100% rename from scripts/test_data_generator/updates/q05.sql rename to scripts/test_data_generator/updates_v2/q05.sql diff --git a/src/common/iceberg.cpp b/src/common/iceberg.cpp index 9f43afd..a075029 100644 --- a/src/common/iceberg.cpp +++ b/src/common/iceberg.cpp @@ -20,13 +20,13 @@ IcebergTable IcebergTable::Load(const string &iceberg_path, IcebergSnapshot &sna auto manifest_list_full_path = allow_moved_paths ? IcebergUtils::GetFullPath(iceberg_path, snapshot.manifest_list, fs) : snapshot.manifest_list; - auto manifests = ReadManifestListFile(manifest_list_full_path, fs); + auto manifests = ReadManifestListFile(manifest_list_full_path, fs, snapshot.iceberg_format_version); for (auto &manifest : manifests) { auto manifest_entry_full_path = allow_moved_paths ? IcebergUtils::GetFullPath(iceberg_path, manifest.manifest_path, fs) : manifest.manifest_path; - auto manifest_paths = ReadManifestEntries(manifest_entry_full_path, fs); + auto manifest_paths = ReadManifestEntries(manifest_entry_full_path, fs, snapshot.iceberg_format_version); ret.entries.push_back({std::move(manifest), std::move(manifest_paths)}); } @@ -34,36 +34,55 @@ IcebergTable IcebergTable::Load(const string &iceberg_path, IcebergSnapshot &sna return ret; } -vector IcebergTable::ReadManifestListFile(string path, FileSystem &fs) { +vector IcebergTable::ReadManifestListFile(string path, FileSystem &fs, idx_t iceberg_format_version) { vector ret; // TODO: make streaming string file = IcebergUtils::FileToString(path, fs); auto stream = avro::memoryInputStream((unsigned char *)file.c_str(), file.size()); - auto schema = avro::compileJsonSchemaFromString(MANIFEST_SCHEMA); - avro::DataFileReader dfr(std::move(stream), schema); - - c::manifest_file manifest_list; - while (dfr.read(manifest_list)) { - ret.emplace_back(IcebergManifest(manifest_list)); + avro::ValidSchema schema; + + if (iceberg_format_version == 1) { + schema = avro::compileJsonSchemaFromString(MANIFEST_SCHEMA_V1); + avro::DataFileReader dfr(std::move(stream), schema); + c::manifest_file_v1 manifest_list; + while (dfr.read(manifest_list)) { + ret.emplace_back(IcebergManifest(manifest_list)); + } + } else { + schema = avro::compileJsonSchemaFromString(MANIFEST_SCHEMA); + avro::DataFileReader dfr(std::move(stream), schema); + c::manifest_file manifest_list; + while (dfr.read(manifest_list)) { + ret.emplace_back(IcebergManifest(manifest_list)); + } } return ret; } -vector IcebergTable::ReadManifestEntries(string path, FileSystem &fs) { +vector IcebergTable::ReadManifestEntries(string path, FileSystem &fs, idx_t iceberg_format_version) { vector ret; // TODO: make streaming string file = IcebergUtils::FileToString(path, fs); auto stream = avro::memoryInputStream((unsigned char *)file.c_str(), file.size()); - auto schema = avro::compileJsonSchemaFromString(MANIFEST_ENTRY_SCHEMA); - avro::DataFileReader dfr(std::move(stream), schema); - c::manifest_entry manifest_entry; - while (dfr.read(manifest_entry)) { - ret.emplace_back(IcebergManifestEntry(manifest_entry)); + if (iceberg_format_version == 1) { + auto schema = avro::compileJsonSchemaFromString(MANIFEST_ENTRY_SCHEMA_V1); + avro::DataFileReader dfr(std::move(stream), schema); + c::manifest_entry_v1 manifest_entry; + while (dfr.read(manifest_entry)) { + ret.emplace_back(IcebergManifestEntry(manifest_entry)); + } + } else { + auto schema = avro::compileJsonSchemaFromString(MANIFEST_ENTRY_SCHEMA); + avro::DataFileReader dfr(std::move(stream), schema); + c::manifest_entry manifest_entry; + while (dfr.read(manifest_entry)) { + ret.emplace_back(IcebergManifestEntry(manifest_entry)); + } } return ret; @@ -73,6 +92,7 @@ IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(string &path, FileSystem &fs) auto metadata_json = ReadMetaData(path, fs); auto doc = yyjson_read(metadata_json.c_str(), metadata_json.size(), 0); auto root = yyjson_doc_get_root(doc); + auto iceberg_format_version = IcebergUtils::TryGetNumFromObject(root, "format-version"); auto snapshots = yyjson_obj_get(root, "snapshots"); auto latest_snapshot = FindLatestSnapshotInternal(snapshots); @@ -80,13 +100,14 @@ IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(string &path, FileSystem &fs) throw IOException("No snapshots found"); } - return ParseSnapShot(latest_snapshot); + return ParseSnapShot(latest_snapshot, iceberg_format_version); } IcebergSnapshot IcebergSnapshot::GetSnapshotById(string &path, FileSystem &fs, idx_t snapshot_id) { auto metadata_json = ReadMetaData(path, fs); auto doc = yyjson_read(metadata_json.c_str(), metadata_json.size(), 0); auto root = yyjson_doc_get_root(doc); + auto iceberg_format_version = IcebergUtils::TryGetNumFromObject(root, "format-version"); auto snapshots = yyjson_obj_get(root, "snapshots"); auto snapshot = FindSnapshotByIdInternal(snapshots, snapshot_id); @@ -94,13 +115,14 @@ IcebergSnapshot IcebergSnapshot::GetSnapshotById(string &path, FileSystem &fs, i throw IOException("Could not find snapshot with id " + to_string(snapshot_id)); } - return ParseSnapShot(snapshot); + return ParseSnapShot(snapshot, iceberg_format_version); } IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(string &path, FileSystem &fs, timestamp_t timestamp) { auto metadata_json = ReadMetaData(path, fs); auto doc = yyjson_read(metadata_json.c_str(), metadata_json.size(), 0); auto root = yyjson_doc_get_root(doc); + auto iceberg_format_version = IcebergUtils::TryGetNumFromObject(root, "format-version"); auto snapshots = yyjson_obj_get(root, "snapshots"); auto snapshot = FindSnapshotByIdTimestampInternal(snapshots, timestamp); @@ -108,7 +130,7 @@ IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(string &path, FileSystem throw IOException("Could not find latest snapshots for timestamp " + Timestamp::ToString(timestamp)); } - return ParseSnapShot(snapshot); + return ParseSnapShot(snapshot, iceberg_format_version); } string IcebergSnapshot::ReadMetaData(string &path, FileSystem &fs) { @@ -120,18 +142,23 @@ string IcebergSnapshot::ReadMetaData(string &path, FileSystem &fs) { return IcebergUtils::FileToString(metadata_file_path, fs); } -IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot) { +IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version) { IcebergSnapshot ret; - auto snapshot_tag = yyjson_get_tag(snapshot); if (snapshot_tag != YYJSON_TYPE_OBJ) { throw IOException("Invalid snapshot field found parsing iceberg metadata.json"); } + if (iceberg_format_version == 1) { + ret.sequence_number = 0; + } else if (iceberg_format_version == 2) { + ret.sequence_number = IcebergUtils::TryGetNumFromObject(snapshot, "sequence-number"); + } + ret.snapshot_id = IcebergUtils::TryGetNumFromObject(snapshot, "snapshot-id"); - ret.sequence_number = IcebergUtils::TryGetNumFromObject(snapshot, "sequence-number"); ret.timestamp_ms = Timestamp::FromEpochMs(IcebergUtils::TryGetNumFromObject(snapshot, "timestamp-ms")); ret.manifest_list = IcebergUtils::TryGetStrFromObject(snapshot, "manifest-list"); + ret.iceberg_format_version = iceberg_format_version; return ret; } diff --git a/src/iceberg_functions/iceberg_snapshots.cpp b/src/iceberg_functions/iceberg_snapshots.cpp index 384cc28..966f93a 100644 --- a/src/iceberg_functions/iceberg_snapshots.cpp +++ b/src/iceberg_functions/iceberg_snapshots.cpp @@ -29,6 +29,7 @@ struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState global_state->metadata_doc = yyjson_read(global_state->metadata_file.c_str(), global_state->metadata_file.size(), 0); auto root = yyjson_doc_get_root(global_state->metadata_doc); + global_state->iceberg_format_version = IcebergUtils::TryGetNumFromObject(root, "format-version"); auto snapshots = yyjson_obj_get(root, "snapshots"); yyjson_arr_iter_init(snapshots, &global_state->snapshot_it); return global_state; @@ -37,6 +38,7 @@ struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState string metadata_file; yyjson_doc *metadata_doc; yyjson_arr_iter snapshot_it; + idx_t iceberg_format_version; }; static unique_ptr IcebergSnapshotsBind(ClientContext &context, TableFunctionBindInput &input, @@ -69,7 +71,8 @@ static void IcebergSnapshotsFunction(ClientContext &context, TableFunctionInput if (i >= STANDARD_VECTOR_SIZE) { break; } - auto snapshot = IcebergSnapshot::ParseSnapShot(next_snapshot); + + auto snapshot = IcebergSnapshot::ParseSnapShot(next_snapshot, global_state.iceberg_format_version); FlatVector::GetData(output.data[0])[i] = snapshot.sequence_number; FlatVector::GetData(output.data[1])[i] = snapshot.snapshot_id; diff --git a/src/include/avro_codegen/iceberg_manifest_entry_partial_v1.hpp b/src/include/avro_codegen/iceberg_manifest_entry_partial_v1.hpp new file mode 100644 index 0000000..fb37323 --- /dev/null +++ b/src/include/avro_codegen/iceberg_manifest_entry_partial_v1.hpp @@ -0,0 +1,117 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* https://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + + +#ifndef CPX2_HH_2561633724_v1__H_ +#define CPX2_HH_2561633724_v1__H_ + + +#include +#include "boost/any.hpp" +#include "avro/Specific.hh" +#include "avro/Encoder.hh" +#include "avro/Decoder.hh" + +namespace c { +struct data_file_v1 { + std::string file_path; + std::string file_format; + int64_t record_count; + data_file_v1() : + file_path(std::string()), + file_format(std::string()), + record_count(int64_t()) + { } +}; + +struct manifest_entry_v1 { + int32_t status; + data_file_v1 data_file_; // NOTE: as generated, this is called data_file, but this causes issues with GCC + manifest_entry_v1() : + status(int32_t()), + data_file_() + { } +}; + +} +namespace avro { +template<> struct codec_traits { + static void encode(Encoder& e, const c::data_file_v1& v) { + avro::encode(e, v.file_path); + avro::encode(e, v.file_format); + avro::encode(e, v.record_count); + } + static void decode(Decoder& d, c::data_file_v1& v) { + if (avro::ResolvingDecoder *rd = + dynamic_cast(&d)) { + const std::vector fo = rd->fieldOrder(); + for (std::vector::const_iterator it = fo.begin(); + it != fo.end(); ++it) { + switch (*it) { + case 0: + avro::decode(d, v.file_path); + break; + case 1: + avro::decode(d, v.file_format); + break; + case 2: + avro::decode(d, v.record_count); + break; + default: + break; + } + } + } else { + avro::decode(d, v.file_path); + avro::decode(d, v.file_format); + avro::decode(d, v.record_count); + } + } +}; + +template<> struct codec_traits { + static void encode(Encoder& e, const c::manifest_entry_v1& v) { + avro::encode(e, v.status); + avro::encode(e, v.data_file_); + } + static void decode(Decoder& d, c::manifest_entry_v1& v) { + if (avro::ResolvingDecoder *rd = + dynamic_cast(&d)) { + const std::vector fo = rd->fieldOrder(); + for (std::vector::const_iterator it = fo.begin(); + it != fo.end(); ++it) { + switch (*it) { + case 0: + avro::decode(d, v.status); + break; + case 1: + avro::decode(d, v.data_file_); + break; + default: + break; + } + } + } else { + avro::decode(d, v.status); + avro::decode(d, v.data_file_); + } + } +}; + +} +#endif diff --git a/src/include/avro_codegen/iceberg_manifest_file_partial_v1.hpp b/src/include/avro_codegen/iceberg_manifest_file_partial_v1.hpp new file mode 100644 index 0000000..1121156 --- /dev/null +++ b/src/include/avro_codegen/iceberg_manifest_file_partial_v1.hpp @@ -0,0 +1,64 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* https://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + + +#ifndef CPX_HH_1629567502_v1__H_ +#define CPX_HH_1629567502_v1__H_ + +#include +#include "boost/any.hpp" +#include "avro/Specific.hh" +#include "avro/Encoder.hh" +#include "avro/Decoder.hh" + +namespace c { +struct manifest_file_v1 { + std::string manifest_path; + manifest_file_v1() : + manifest_path(std::string()) + { } +}; + +} +namespace avro { +template<> struct codec_traits { + static void encode(Encoder& e, const c::manifest_file_v1& v) { + avro::encode(e, v.manifest_path); + } + static void decode(Decoder& d, c::manifest_file_v1& v) { + if (avro::ResolvingDecoder *rd = + dynamic_cast(&d)) { + const std::vector fo = rd->fieldOrder(); + for (std::vector::const_iterator it = fo.begin(); + it != fo.end(); ++it) { + switch (*it) { + case 0: + avro::decode(d, v.manifest_path); + break; + default: + break; + } + } + } else { + avro::decode(d, v.manifest_path); + } + } +}; + +} +#endif diff --git a/src/include/iceberg_metadata.hpp b/src/include/iceberg_metadata.hpp index 8df4626..2a6929b 100644 --- a/src/include/iceberg_metadata.hpp +++ b/src/include/iceberg_metadata.hpp @@ -22,12 +22,13 @@ class IcebergSnapshot { uint64_t sequence_number; string manifest_list; timestamp_t timestamp_ms; + idx_t iceberg_format_version; static IcebergSnapshot GetLatestSnapshot(string &path, FileSystem &fs); static IcebergSnapshot GetSnapshotById(string &path, FileSystem &fs, idx_t snapshot_id); static IcebergSnapshot GetSnapshotByTimestamp(string &path, FileSystem &fs, timestamp_t timestamp); - static IcebergSnapshot ParseSnapShot(yyjson_val *snapshot); + static IcebergSnapshot ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version); static string ReadMetaData(string &path, FileSystem &fs); protected: @@ -76,9 +77,8 @@ struct IcebergTable { vector entries; protected: - static vector ReadManifestListFile(string path, FileSystem &fs); - static vector ReadManifestEntries(string path, FileSystem &fs); - + static vector ReadManifestListFile(string path, FileSystem &fs, idx_t iceberg_format_version); + static vector ReadManifestEntries(string path, FileSystem &fs, idx_t iceberg_format_version); string path; }; diff --git a/src/include/iceberg_types.hpp b/src/include/iceberg_types.hpp index a8de7d7..16ec1ca 100644 --- a/src/include/iceberg_types.hpp +++ b/src/include/iceberg_types.hpp @@ -8,13 +8,10 @@ #pragma once -// Note: The `_full` headers here provide the c++ classed for the full iceberg schema, I'm predicting this will -// inevitably be necessary when the iceberg tables snowflake spits out have slightly different schema's. -// TODO: can be removed when snowflake tests have succeeded #include "avro_codegen/iceberg_manifest_entry_partial.hpp" +#include "avro_codegen/iceberg_manifest_entry_partial_v1.hpp" #include "avro_codegen/iceberg_manifest_file_partial.hpp" -//#include "avro_codegen/iceberg_manifest_entry_full.hpp" -//#include "avro_codegen/iceberg_manifest_file_full.hpp" +#include "avro_codegen/iceberg_manifest_file_partial_v1.hpp" namespace duckdb { @@ -70,13 +67,31 @@ static string MANIFEST_SCHEMA = "{\n" " ]\n" " }"; +// Schema for v1, sequence_number and content are not present there +static string MANIFEST_SCHEMA_V1 = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"manifest_file\",\n" + " \"fields\" : [\n" + " {\"name\": \"manifest_path\", \"type\": \"string\"}\n" + " ]\n" + " }"; + //! An entry in the manifest list file (top level AVRO file) struct IcebergManifest { + //! Constructor from iceberg v2 spec manifest file explicit IcebergManifest(const c::manifest_file &schema) { manifest_path = schema.manifest_path; sequence_number = schema.sequence_number; content = (IcebergManifestContentType)schema.content; } + + //! Constructor from iceberg v1 spec manifest file + explicit IcebergManifest(const c::manifest_file_v1 &schema) { + manifest_path = schema.manifest_path; + sequence_number = 0; + content = IcebergManifestContentType::DATA; + } + //! Path to the manifest AVRO file string manifest_path; //! sequence_number when manifest was added to table (0 for Iceberg v1) @@ -122,6 +137,24 @@ static string MANIFEST_ENTRY_SCHEMA = "{\n" " ]\n" " }"; +static string MANIFEST_ENTRY_SCHEMA_V1 = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"manifest_entry\",\n" + " \"fields\" : [\n" + " {\"name\": \"status\", \"type\" : \"int\"},\n" + " {\"name\": \"data_file\", \"type\": {\n" + " \"type\": \"record\",\n" + " \"name\": \"r2\",\n" + " \"fields\" : [\n" + " {\"name\": \"file_path\", \"type\": \"string\"},\n" + " {\"name\": \"file_format\", \"type\": \"string\"},\n" + " {\"name\": \"record_count\", \"type\" : \"long\"}\n" + " ]}\n" + " }\n" + " ]\n" + " }"; + + //! An entry in a manifest file struct IcebergManifestEntry { explicit IcebergManifestEntry(const c::manifest_entry &schema) { @@ -132,6 +165,14 @@ struct IcebergManifestEntry { record_count = schema.data_file_.record_count; } + explicit IcebergManifestEntry(const c::manifest_entry_v1 &schema) { + status = (IcebergManifestEntryStatusType)schema.status; + content = IcebergManifestEntryContentType::DATA; + file_path = schema.data_file_.file_path; + file_format = schema.data_file_.file_format; + record_count = schema.data_file_.record_count; + } + IcebergManifestEntryStatusType status; //! ----- Data File Struct ------ diff --git a/test/sql/iceberg_scan_generated_data_0_01.test_slow b/test/sql/iceberg_scan_generated_data_0_01.test_slow index 27eb7bb..6654dde 100644 --- a/test/sql/iceberg_scan_generated_data_0_01.test_slow +++ b/test/sql/iceberg_scan_generated_data_0_01.test_slow @@ -8,17 +8,40 @@ require iceberg require-env DUCKDB_ICEBERG_HAVE_TEST_DATA +### Iceberg spec v1 + +foreach iceberg_spec 1 + # Check count matches query I -SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/generated_0_01/pyspark_iceberg_table'); +SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/generated_spec1_0_01/pyspark_iceberg_table'); ---- -:data/iceberg/generated_0_01/expected_results/last/count.csv +:data/iceberg/generated_spec1_0_01/expected_results/last/count.csv # Check data is identical, sorting by uuid to guarantee unique order query I nosort q1 -SELECT * FROM ICEBERG_SCAN('data/iceberg/generated_0_01/pyspark_iceberg_table') ORDER BY uuid NULLS LAST; +SELECT * FROM ICEBERG_SCAN('data/iceberg/generated_spec1_0_01/pyspark_iceberg_table') ORDER BY uuid; ---- query I nosort q1 -SELECT * FROM PARQUET_SCAN('data/iceberg/generated_0_01/expected_results/q05/data/*.parquet') ORDER BY uuid NULLS LAST; ----- \ No newline at end of file +SELECT * FROM PARQUET_SCAN('data/iceberg/generated_spec1_0_01/expected_results/q05/data/*.parquet') ORDER BY uuid; +---- + +### Iceberg spec v2 + +# Check count matches +query I +SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/generated_spec2_0_01/pyspark_iceberg_table'); +---- +:data/iceberg/generated_spec2_0_01/expected_results/last/count.csv + +# Check data is identical, sorting by uuid to guarantee unique order +query I nosort q2 +SELECT * FROM ICEBERG_SCAN('data/iceberg/generated_spec2_0_01/pyspark_iceberg_table') ORDER BY uuid; +---- + +query I nosort q2 +SELECT * FROM PARQUET_SCAN('data/iceberg/generated_spec2_0_01/expected_results/q05/data/*.parquet') ORDER BY uuid; +---- + +endloop \ No newline at end of file diff --git a/test/sql/iceberg_scan_generated_data_0_01_remote.test_slow b/test/sql/iceberg_scan_generated_data_0_01_remote.test_slow index 05f7c88..96aaf53 100644 --- a/test/sql/iceberg_scan_generated_data_0_01_remote.test_slow +++ b/test/sql/iceberg_scan_generated_data_0_01_remote.test_slow @@ -30,9 +30,9 @@ SELECT count(*) FROM ICEBERG_SCAN('s3://test-bucket-public/iceberg_0_01/pyspark_ # Check data is identical, sorting by uuid to guarantee unique order query I nosort q1 -SELECT * FROM ICEBERG_SCAN('s3://test-bucket-public/iceberg_0_01/pyspark_iceberg_table', ALLOW_MOVED_PATHS=TRUE) ORDER BY uuid NULLS LAST; +SELECT * FROM ICEBERG_SCAN('s3://test-bucket-public/iceberg_0_01/pyspark_iceberg_table', ALLOW_MOVED_PATHS=TRUE) ORDER BY uuid; ---- query I nosort q1 -SELECT * FROM PARQUET_SCAN('data/iceberg/generated_0_01/expected_results/last/data/*.parquet') ORDER BY uuid NULLS LAST; +SELECT * FROM PARQUET_SCAN('data/iceberg/generated_0_01/expected_results/last/data/*.parquet') ORDER BY uuid; ---- \ No newline at end of file diff --git a/vcpkg.json b/vcpkg.json index f2c49f9..801e5a2 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -10,5 +10,12 @@ "overlay-ports": [ "./vcpkg_ports" ] - } + }, + "builtin-baseline": "501db0f17ef6df184fcdbfbe0f87cde2313b6ab1", + "overrides": [ + { + "name": "openssl", + "version": "3.0.8" + } + ] } \ No newline at end of file