diff --git a/include/parquet4seastar/column_chunk_writer.hh b/include/parquet4seastar/column_chunk_writer.hh index fc7b731..ca5adc0 100644 --- a/include/parquet4seastar/column_chunk_writer.hh +++ b/include/parquet4seastar/column_chunk_writer.hh @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -208,6 +209,49 @@ class column_chunk_writer }); } + template + seastar::lw_shared_ptr sync_flush_chunk(SINK& sink) { + if (_levels_in_current_page > 0) { + flush_page(); + } + auto metadata = seastar::make_lw_shared(); + metadata->__set_type(ParquetType); + metadata->__set_encodings(std::vector(_used_encodings.begin(), _used_encodings.end())); + metadata->__set_codec(_compressor->type()); + metadata->__set_num_values(0); + metadata->__set_total_compressed_size(0); + metadata->__set_total_uncompressed_size(0); + auto write_page = [this, metadata, &sink](const format::PageHeader& header, bytes_view contents) -> void { + bytes_view serialized_header = _thrift_serializer.serialize(header); + metadata->total_uncompressed_size += serialized_header.size(); + metadata->total_uncompressed_size += header.uncompressed_page_size; + metadata->total_compressed_size += serialized_header.size(); + metadata->total_compressed_size += header.compressed_page_size; + { + const char* data = reinterpret_cast(serialized_header.data()); + sink.write(data, serialized_header.size()); + } + { + const char* data = reinterpret_cast(contents.data()); + sink.write(data, contents.size()); + } + }; + if (_val_encoder->view_dict()) { + fill_dictionary_page(); + metadata->__set_dictionary_page_offset(metadata->total_compressed_size); + write_page(_dict_page_header, _dict_page); + } + metadata->__set_data_page_offset(metadata->total_compressed_size); + for (size_t i : std::ranges::iota_view(0U, _page_headers.size())) { + metadata->num_values += _page_headers[i].data_page_header.num_values; + write_page(_page_headers[i], _pages[i]); + } + _pages.clear(); + _page_headers.clear(); + _estimated_chunk_size = 0; + return metadata; + } + size_t rows_written() const { return _rows_written; } size_t estimated_chunk_size() const { return _estimated_chunk_size; } diff --git a/include/parquet4seastar/file_writer.hh b/include/parquet4seastar/file_writer.hh index 34e4c43..fa7d2ec 100644 --- a/include/parquet4seastar/file_writer.hh +++ b/include/parquet4seastar/file_writer.hh @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -121,15 +122,16 @@ class writer return std::get>(_writers[i]); } - - auto flush_page(int idx, uint64_t limit_size)->bool { - return std::visit([&limit_size](auto& col) -> bool { - if (col.current_page_max_size() > limit_size) { - col.flush_page(); - return true; - } - return false; - }, _writers[idx]); + auto flush_page(int idx, uint64_t limit_size) -> bool { + return std::visit( + [&limit_size](auto& col) -> bool { + if (col.current_page_max_size() > limit_size) { + col.flush_page(); + return true; + } + return false; + }, + _writers[idx]); } size_t estimated_row_group_size() const { @@ -192,4 +194,157 @@ class writer } }; +template +concept is_sync_sink_v = requires(Class obj, const char* str, size_t len) { + { obj.write(str, len) } -> std::same_as; + { obj.flush() } -> std::same_as; + { obj.close() } -> std::same_as; +}; + +template +requires is_sync_sink_v +class sync_writer +{ + public: + using column_chunk_writer_variant = + std::variant, column_chunk_writer, + column_chunk_writer, column_chunk_writer, + column_chunk_writer, column_chunk_writer, + column_chunk_writer>; + + private: + bool _closed = false; + SINK _sink; + std::vector _writers; + format::FileMetaData _metadata; + std::vector> _leaf_paths; + thrift_serializer _thrift_serializer; + size_t _file_offset = 0; + + private: + void init_writers(const writer_schema::schema& root) { + using namespace writer_schema; + auto convert = y_combinator{[&](auto&& convert, const node& node_variant, uint32_t def, uint32_t rep) -> void { + std::visit( + overloaded{[&](const list_node& x) { convert(*x.element, def + 1 + x.optional, rep + 1); }, + [&](const map_node& x) { + convert(*x.key, def + 1 + x.optional, rep + 1); + convert(*x.value, def + 1 + x.optional, rep + 1); + }, + [&](const struct_node& x) { + for (const node& child : x.fields) { + convert(child, def + x.optional, rep); + } + }, + [&](const primitive_node& x) { + std::visit( + overloaded{ + [&](logical_type::INT96 logical_type) { + throw parquet_exception("INT96 is deprecated. Writing INT96 is unsupported."); + }, + [&](auto logical_type) { + constexpr format::Type::type parquet_type = decltype(logical_type)::physical_type; + writer_options options = {def + x.optional, rep, x.encoding, x.compression}; + _writers.push_back(make_column_chunk_writer(options)); + }}, + x.logical_type); + }}, + node_variant); + }}; + for (const node& field : root.fields) { + convert(field, 0, 0); + } + } + + public: + explicit sync_writer(SINK&& sink) : _sink(std::move(sink)) {} + + auto fetch_sink() -> SINK { + assert(_closed); + return std::move(_sink); + } + + static std::unique_ptr open_and_write_par1(SINK&& sink, const writer_schema::schema& schema) { + auto fw = std::make_unique(std::move(sink)); + writer_schema::write_schema_result wsr = writer_schema::write_schema(schema); + fw->_metadata.schema = std::move(wsr.elements); + fw->_leaf_paths = std::move(wsr.leaf_paths); + fw->init_writers(schema); + fw->_file_offset = 4; + fw->_sink.write("PAR1", 4); + return fw; + } + + static std::unique_ptr open(SINK&& sink, const writer_schema::schema& schema) { + return open_and_write_par1(std::move(sink), schema); + } + + template + column_chunk_writer& column(int i) { + return std::get>(_writers[i]); + } + + auto flush_page(int idx, uint64_t limit_size) -> bool { + return std::visit( + [&limit_size](auto& col) -> bool { + if (col.current_page_max_size() > limit_size) { + col.flush_page(); + return true; + } + return false; + }, + _writers[idx]); + } + + size_t estimated_row_group_size() const { + size_t size = 0; + for (const auto& writer : _writers) { + std::visit([&](const auto& x) { size += x.estimated_chunk_size(); }, writer); + } + return size; + } + + auto flush_row_group() -> void { + _metadata.row_groups.push_back(format::RowGroup{}); + size_t rows_written = 0; + if (_writers.size() > 0) { + rows_written = std::visit([&](auto& x) { return x.rows_written(); }, _writers[0]); + } + _metadata.row_groups.rbegin()->__set_num_rows(rows_written); + for (size_t i : std::ranges::iota_view(0U, _writers.size())) { + auto cmd = std::visit([&](auto& x) { return x.sync_flush_chunk(_sink); }, _writers[i]); + cmd->dictionary_page_offset += _file_offset; + cmd->data_page_offset += _file_offset; + cmd->__set_path_in_schema(_leaf_paths[i]); + bytes_view footer = _thrift_serializer.serialize(*cmd); + + _file_offset += cmd->total_compressed_size; + format::ColumnChunk cc; + cc.__set_file_offset(_file_offset); + cc.__set_meta_data(*cmd); + _metadata.row_groups.rbegin()->columns.push_back(cc); + _metadata.row_groups.rbegin()->__set_total_byte_size(_metadata.row_groups.rbegin()->total_byte_size + + cmd->total_compressed_size + footer.size()); + _file_offset += footer.size(); + _sink.write(reinterpret_cast(footer.data()), footer.size()); + } + } + + auto close() -> void { + _closed = true; + flush_row_group(); + for (const format::RowGroup& rg : _metadata.row_groups) { + _metadata.num_rows += rg.num_rows; + } + _metadata.__set_version(1); // Parquet 2.0 == 1 + const bytes_view footer = _thrift_serializer.serialize(_metadata); + _sink.write(reinterpret_cast(footer.data()), footer.size()); + const uint32_t footer_size = footer.size(); + _sink.write(reinterpret_cast(&footer_size), 4); + _sink.write("PAR1", 4); + _sink.flush(); + _sink.close(); + } +}; + } // namespace parquet4seastar diff --git a/tests/file_writer_test.cc b/tests/file_writer_test.cc index d338e10..d830b49 100644 --- a/tests/file_writer_test.cc +++ b/tests/file_writer_test.cc @@ -63,17 +63,16 @@ class MemorySink public: std::vector data; - auto write(const char* str, size_t len) -> seastar::future<> { + auto write(const char* str, size_t len) -> void { for (auto idx = 0; idx < len; ++idx) { data.push_back(str[idx]); } - co_return; } - auto flush() -> seastar::future<> { co_return; } - auto close() -> seastar::future<> { co_return; } + auto flush() -> void {} + auto close() -> void {} }; -static_assert(parquet4seastar::is_sink_v); +static_assert(parquet4seastar::is_sync_sink_v); SEASTAR_TEST_CASE(full_roundtrip) { using namespace parquet4seastar; @@ -110,7 +109,7 @@ SEASTAR_TEST_CASE(full_roundtrip) { auto file = open_file_dma(test_file_name, flags).get0(); auto sink = make_file_output_stream(file).get0(); auto fw = writer>::open(std::move(sink), writer_schema).get0(); - auto memory_fw = writer::open(MemorySink(), writer_schema).get0(); + auto memory_fw = sync_writer::open(MemorySink(), writer_schema); { auto& map_key = fw->column(0); auto& map_value = fw->column(1); @@ -144,7 +143,7 @@ SEASTAR_TEST_CASE(full_roundtrip) { struct_field_1.put(0, 0, 1337); struct_field_2.put(0, 0, 1337); - memory_fw->flush_row_group().get0(); + memory_fw->flush_row_group(); map_key.put(2, 0, "key1"_bv); map_value.put(2, 0, 1); @@ -157,7 +156,7 @@ SEASTAR_TEST_CASE(full_roundtrip) { } fw->close().get0(); - memory_fw->close().get(); + memory_fw->close(); auto parquet_file = seastar::open_file_dma(test_file_name, seastar::open_flags::ro).get0(); auto size = parquet_file.size().get();