Skip to content

Commit

Permalink
Merge pull request #13 from samansmink/v1_spec_support
Browse files Browse the repository at this point in the history
Support for v1 spec
  • Loading branch information
samansmink committed Aug 29, 2023
2 parents b0eaf41 + f33b2e3 commit 90de66a
Show file tree
Hide file tree
Showing 22 changed files with 368 additions and 51 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ target_link_libraries(

# Link dependencies into extension
target_link_libraries(${EXTENSION_NAME} PUBLIC optimized avro_static_release
debug avro_static_release)
debug avro_static_debug)
target_link_libraries(${TARGET_NAME}_loadable_extension optimized
avro_static_release debug avro_static_release)
avro_static_release debug avro_static_debug)

install(
TARGETS ${EXTENSION_NAME} ${TARGET_NAME}_loadable_extension
Expand Down
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ update:
git submodule update --remote --merge

data: data_clean
python3 scripts/test_data_generator/generate_iceberg.py 0.01 data/iceberg/generated_0_01
python3 scripts/test_data_generator/generate_iceberg.py 0.01 data/iceberg/generated_spec1_0_01 1
python3 scripts/test_data_generator/generate_iceberg.py 0.01 data/iceberg/generated_spec2_0_01 2

data_large: data_clean
python3 scripts/test_data_generator/generate_iceberg.py 0.01 data/iceberg/generated_0_01
python3 scripts/test_data_generator/generate_iceberg.py 1 data/iceberg/generated_1
data_large: data data_clean
python3 scripts/test_data_generator/generate_iceberg.py 1 data/iceberg/generated_spec2_1 2

data_clean:
rm -rf data/iceberg/generated_*
19 changes: 14 additions & 5 deletions scripts/test_data_generator/generate_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
from pyspark import SparkContext
from pathlib import Path

if (len(sys.argv) != 3):
print("Usage: generate_iceberg.py <SCALE_FACTOR> <DEST_PATH>")
if (len(sys.argv) != 4 ):
print("Usage: generate_iceberg.py <SCALE_FACTOR> <DEST_PATH> <ICBERG_SPEC_VERSION>")
exit(1)

SCALE = sys.argv[1]
DEST_PATH = sys.argv[2]
ICEBERG_SPEC_VERSION = sys.argv[3]

PARQUET_SRC_FILE = f'{DEST_PATH}/base_file/file.parquet'
TABLE_NAME = "iceberg_catalog.pyspark_iceberg_table";
CWD = os.getcwd()
Expand Down Expand Up @@ -43,17 +45,24 @@
### Create Iceberg table from dataset
###
spark.read.parquet(PARQUET_SRC_FILE).createOrReplaceTempView('parquet_file_view');
spark.sql(f"CREATE or REPLACE TABLE {TABLE_NAME} TBLPROPERTIES ('format-version'='2', 'write.update.mode'='merge-on-read') AS SELECT * FROM parquet_file_view");

if ICEBERG_SPEC_VERSION == '1':
spark.sql(f"CREATE or REPLACE TABLE {TABLE_NAME} TBLPROPERTIES ('format-version'='{ICEBERG_SPEC_VERSION}') AS SELECT * FROM parquet_file_view");
elif ICEBERG_SPEC_VERSION == '2':
spark.sql(f"CREATE or REPLACE TABLE {TABLE_NAME} TBLPROPERTIES ('format-version'='{ICEBERG_SPEC_VERSION}', 'write.update.mode'='merge-on-read') AS SELECT * FROM parquet_file_view");
else:
print(f"Are you from the future? Iceberg spec version '{ICEBERG_SPEC_VERSION}' is unbeknownst to me")
exit(1)

###
### Apply modifications to base table generating verification results between each step
###
update_files = [str(path) for path in Path(f'{SCRIPT_DIR}').rglob('*.sql')]
update_files = [str(path) for path in Path(f'{SCRIPT_DIR}/updates_v{ICEBERG_SPEC_VERSION}').rglob('*.sql')]
update_files.sort() # Order matters obviously
last_file = ""

for path in update_files:
full_file_path = f"{SCRIPT_DIR}/updates/{os.path.basename(path)}"
full_file_path = f"{SCRIPT_DIR}/updates_v{ICEBERG_SPEC_VERSION}/{os.path.basename(path)}"
with open(full_file_path, 'r') as file:
file_trimmed = os.path.basename(path)[:-4]
last_file = file_trimmed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ set l_orderkey_bool=NULL,
l_commitdate_timestamp=NULL,
l_commitdate_timestamp_tz=NULL,
l_comment_string=NULL,
uuid=NULL,
l_comment_blob=NULL,
l_shipmode_quantity_struct=NULL,
l_linenumber_quantity_list=NULL,
Expand Down
File renamed without changes.
File renamed without changes.
3 changes: 3 additions & 0 deletions scripts/test_data_generator/updates_v1/q04.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
update iceberg_catalog.pyspark_iceberg_table
set l_orderkey_bool = false
where l_partkey_int % 4 = 0;
3 changes: 3 additions & 0 deletions scripts/test_data_generator/updates_v1/q05.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
update iceberg_catalog.pyspark_iceberg_table
set l_orderkey_bool = false
where l_partkey_int % 5 = 0;
16 changes: 16 additions & 0 deletions scripts/test_data_generator/updates_v2/q01.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
update iceberg_catalog.pyspark_iceberg_table
set l_orderkey_bool=NULL,
l_partkey_int=NULL,
l_suppkey_long=NULL,
l_extendedprice_float=NULL,
l_extendedprice_double=NULL,
l_shipdate_date=NULL,
l_partkey_time=NULL,
l_commitdate_timestamp=NULL,
l_commitdate_timestamp_tz=NULL,
l_comment_string=NULL,
l_comment_blob=NULL,
l_shipmode_quantity_struct=NULL,
l_linenumber_quantity_list=NULL,
l_linenumber_quantity_map=NULL
where l_partkey_int % 2 = 0;
3 changes: 3 additions & 0 deletions scripts/test_data_generator/updates_v2/q02.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
insert into iceberg_catalog.pyspark_iceberg_table
select * FROM iceberg_catalog.pyspark_iceberg_table
where l_extendedprice_double < 30000
2 changes: 2 additions & 0 deletions scripts/test_data_generator/updates_v2/q03.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
update iceberg_catalog.pyspark_iceberg_table
set l_orderkey_bool = not l_orderkey_bool;
File renamed without changes.
File renamed without changes.
69 changes: 48 additions & 21 deletions src/common/iceberg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,50 +20,69 @@ IcebergTable IcebergTable::Load(const string &iceberg_path, IcebergSnapshot &sna
auto manifest_list_full_path = allow_moved_paths
? IcebergUtils::GetFullPath(iceberg_path, snapshot.manifest_list, fs)
: snapshot.manifest_list;
auto manifests = ReadManifestListFile(manifest_list_full_path, fs);
auto manifests = ReadManifestListFile(manifest_list_full_path, fs, snapshot.iceberg_format_version);

for (auto &manifest : manifests) {
auto manifest_entry_full_path = allow_moved_paths
? IcebergUtils::GetFullPath(iceberg_path, manifest.manifest_path, fs)
: manifest.manifest_path;
auto manifest_paths = ReadManifestEntries(manifest_entry_full_path, fs);
auto manifest_paths = ReadManifestEntries(manifest_entry_full_path, fs, snapshot.iceberg_format_version);

ret.entries.push_back({std::move(manifest), std::move(manifest_paths)});
}

return ret;
}

vector<IcebergManifest> IcebergTable::ReadManifestListFile(string path, FileSystem &fs) {
vector<IcebergManifest> IcebergTable::ReadManifestListFile(string path, FileSystem &fs, idx_t iceberg_format_version) {
vector<IcebergManifest> ret;

// TODO: make streaming
string file = IcebergUtils::FileToString(path, fs);

auto stream = avro::memoryInputStream((unsigned char *)file.c_str(), file.size());
auto schema = avro::compileJsonSchemaFromString(MANIFEST_SCHEMA);
avro::DataFileReader<c::manifest_file> dfr(std::move(stream), schema);

c::manifest_file manifest_list;
while (dfr.read(manifest_list)) {
ret.emplace_back(IcebergManifest(manifest_list));
avro::ValidSchema schema;

if (iceberg_format_version == 1) {
schema = avro::compileJsonSchemaFromString(MANIFEST_SCHEMA_V1);
avro::DataFileReader<c::manifest_file_v1> dfr(std::move(stream), schema);
c::manifest_file_v1 manifest_list;
while (dfr.read(manifest_list)) {
ret.emplace_back(IcebergManifest(manifest_list));
}
} else {
schema = avro::compileJsonSchemaFromString(MANIFEST_SCHEMA);
avro::DataFileReader<c::manifest_file> dfr(std::move(stream), schema);
c::manifest_file manifest_list;
while (dfr.read(manifest_list)) {
ret.emplace_back(IcebergManifest(manifest_list));
}
}

return ret;
}

vector<IcebergManifestEntry> IcebergTable::ReadManifestEntries(string path, FileSystem &fs) {
vector<IcebergManifestEntry> IcebergTable::ReadManifestEntries(string path, FileSystem &fs, idx_t iceberg_format_version) {
vector<IcebergManifestEntry> ret;

// TODO: make streaming
string file = IcebergUtils::FileToString(path, fs);
auto stream = avro::memoryInputStream((unsigned char *)file.c_str(), file.size());
auto schema = avro::compileJsonSchemaFromString(MANIFEST_ENTRY_SCHEMA);
avro::DataFileReader<c::manifest_entry> dfr(std::move(stream), schema);

c::manifest_entry manifest_entry;
while (dfr.read(manifest_entry)) {
ret.emplace_back(IcebergManifestEntry(manifest_entry));
if (iceberg_format_version == 1) {
auto schema = avro::compileJsonSchemaFromString(MANIFEST_ENTRY_SCHEMA_V1);
avro::DataFileReader<c::manifest_entry_v1> dfr(std::move(stream), schema);
c::manifest_entry_v1 manifest_entry;
while (dfr.read(manifest_entry)) {
ret.emplace_back(IcebergManifestEntry(manifest_entry));
}
} else {
auto schema = avro::compileJsonSchemaFromString(MANIFEST_ENTRY_SCHEMA);
avro::DataFileReader<c::manifest_entry> dfr(std::move(stream), schema);
c::manifest_entry manifest_entry;
while (dfr.read(manifest_entry)) {
ret.emplace_back(IcebergManifestEntry(manifest_entry));
}
}

return ret;
Expand All @@ -73,42 +92,45 @@ IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(string &path, FileSystem &fs)
auto metadata_json = ReadMetaData(path, fs);
auto doc = yyjson_read(metadata_json.c_str(), metadata_json.size(), 0);
auto root = yyjson_doc_get_root(doc);
auto iceberg_format_version = IcebergUtils::TryGetNumFromObject(root, "format-version");
auto snapshots = yyjson_obj_get(root, "snapshots");
auto latest_snapshot = FindLatestSnapshotInternal(snapshots);

if (!latest_snapshot) {
throw IOException("No snapshots found");
}

return ParseSnapShot(latest_snapshot);
return ParseSnapShot(latest_snapshot, iceberg_format_version);
}

IcebergSnapshot IcebergSnapshot::GetSnapshotById(string &path, FileSystem &fs, idx_t snapshot_id) {
auto metadata_json = ReadMetaData(path, fs);
auto doc = yyjson_read(metadata_json.c_str(), metadata_json.size(), 0);
auto root = yyjson_doc_get_root(doc);
auto iceberg_format_version = IcebergUtils::TryGetNumFromObject(root, "format-version");
auto snapshots = yyjson_obj_get(root, "snapshots");
auto snapshot = FindSnapshotByIdInternal(snapshots, snapshot_id);

if (!snapshot) {
throw IOException("Could not find snapshot with id " + to_string(snapshot_id));
}

return ParseSnapShot(snapshot);
return ParseSnapShot(snapshot, iceberg_format_version);
}

IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(string &path, FileSystem &fs, timestamp_t timestamp) {
auto metadata_json = ReadMetaData(path, fs);
auto doc = yyjson_read(metadata_json.c_str(), metadata_json.size(), 0);
auto root = yyjson_doc_get_root(doc);
auto iceberg_format_version = IcebergUtils::TryGetNumFromObject(root, "format-version");
auto snapshots = yyjson_obj_get(root, "snapshots");
auto snapshot = FindSnapshotByIdTimestampInternal(snapshots, timestamp);

if (!snapshot) {
throw IOException("Could not find latest snapshots for timestamp " + Timestamp::ToString(timestamp));
}

return ParseSnapShot(snapshot);
return ParseSnapShot(snapshot, iceberg_format_version);
}

string IcebergSnapshot::ReadMetaData(string &path, FileSystem &fs) {
Expand All @@ -120,18 +142,23 @@ string IcebergSnapshot::ReadMetaData(string &path, FileSystem &fs) {
return IcebergUtils::FileToString(metadata_file_path, fs);
}

IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot) {
IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version) {
IcebergSnapshot ret;

auto snapshot_tag = yyjson_get_tag(snapshot);
if (snapshot_tag != YYJSON_TYPE_OBJ) {
throw IOException("Invalid snapshot field found parsing iceberg metadata.json");
}

if (iceberg_format_version == 1) {
ret.sequence_number = 0;
} else if (iceberg_format_version == 2) {
ret.sequence_number = IcebergUtils::TryGetNumFromObject(snapshot, "sequence-number");
}

ret.snapshot_id = IcebergUtils::TryGetNumFromObject(snapshot, "snapshot-id");
ret.sequence_number = IcebergUtils::TryGetNumFromObject(snapshot, "sequence-number");
ret.timestamp_ms = Timestamp::FromEpochMs(IcebergUtils::TryGetNumFromObject(snapshot, "timestamp-ms"));
ret.manifest_list = IcebergUtils::TryGetStrFromObject(snapshot, "manifest-list");
ret.iceberg_format_version = iceberg_format_version;

return ret;
}
Expand Down
5 changes: 4 additions & 1 deletion src/iceberg_functions/iceberg_snapshots.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState
global_state->metadata_doc =
yyjson_read(global_state->metadata_file.c_str(), global_state->metadata_file.size(), 0);
auto root = yyjson_doc_get_root(global_state->metadata_doc);
global_state->iceberg_format_version = IcebergUtils::TryGetNumFromObject(root, "format-version");
auto snapshots = yyjson_obj_get(root, "snapshots");
yyjson_arr_iter_init(snapshots, &global_state->snapshot_it);
return global_state;
Expand All @@ -37,6 +38,7 @@ struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState
string metadata_file;
yyjson_doc *metadata_doc;
yyjson_arr_iter snapshot_it;
idx_t iceberg_format_version;
};

static unique_ptr<FunctionData> IcebergSnapshotsBind(ClientContext &context, TableFunctionBindInput &input,
Expand Down Expand Up @@ -69,7 +71,8 @@ static void IcebergSnapshotsFunction(ClientContext &context, TableFunctionInput
if (i >= STANDARD_VECTOR_SIZE) {
break;
}
auto snapshot = IcebergSnapshot::ParseSnapShot(next_snapshot);

auto snapshot = IcebergSnapshot::ParseSnapShot(next_snapshot, global_state.iceberg_format_version);

FlatVector::GetData<int64_t>(output.data[0])[i] = snapshot.sequence_number;
FlatVector::GetData<int64_t>(output.data[1])[i] = snapshot.snapshot_id;
Expand Down
Loading

0 comments on commit 90de66a

Please sign in to comment.