Skip to content

Commit

Permalink
add sync writer
Browse files Browse the repository at this point in the history
  • Loading branch information
mmooyyii committed Dec 25, 2023
1 parent a36ec39 commit afd3b05
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 17 deletions.
44 changes: 44 additions & 0 deletions include/parquet4seastar/column_chunk_writer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <parquet4seastar/bytes.hh>
#include <parquet4seastar/column_chunk_reader.hh>
#include <parquet4seastar/encoding.hh>
#include <ranges>
#include <unordered_map>
#include <unordered_set>
#include <vector>
Expand Down Expand Up @@ -208,6 +209,49 @@ class column_chunk_writer
});
}

template <typename SINK>
seastar::lw_shared_ptr<format::ColumnMetaData> sync_flush_chunk(SINK& sink) {
if (_levels_in_current_page > 0) {
flush_page();
}
auto metadata = seastar::make_lw_shared<format::ColumnMetaData>();
metadata->__set_type(ParquetType);
metadata->__set_encodings(std::vector<format::Encoding::type>(_used_encodings.begin(), _used_encodings.end()));
metadata->__set_codec(_compressor->type());
metadata->__set_num_values(0);
metadata->__set_total_compressed_size(0);
metadata->__set_total_uncompressed_size(0);
auto write_page = [this, metadata, &sink](const format::PageHeader& header, bytes_view contents) -> void {
bytes_view serialized_header = _thrift_serializer.serialize(header);
metadata->total_uncompressed_size += serialized_header.size();
metadata->total_uncompressed_size += header.uncompressed_page_size;
metadata->total_compressed_size += serialized_header.size();
metadata->total_compressed_size += header.compressed_page_size;
{
const char* data = reinterpret_cast<const char*>(serialized_header.data());
sink.write(data, serialized_header.size());
}
{
const char* data = reinterpret_cast<const char*>(contents.data());
sink.write(data, contents.size());
}
};
if (_val_encoder->view_dict()) {
fill_dictionary_page();
metadata->__set_dictionary_page_offset(metadata->total_compressed_size);
write_page(_dict_page_header, _dict_page);
}
metadata->__set_data_page_offset(metadata->total_compressed_size);
for (size_t i : std::ranges::iota_view(0U, _page_headers.size())) {
metadata->num_values += _page_headers[i].data_page_header.num_values;
write_page(_page_headers[i], _pages[i]);
}
_pages.clear();
_page_headers.clear();
_estimated_chunk_size = 0;
return metadata;
}

size_t rows_written() const { return _rows_written; }
size_t estimated_chunk_size() const { return _estimated_chunk_size; }

Expand Down
173 changes: 164 additions & 9 deletions include/parquet4seastar/file_writer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <parquet4seastar/column_chunk_writer.hh>
#include <parquet4seastar/writer_schema.hh>
#include <parquet4seastar/y_combinator.hh>
#include <ranges>
#include <seastar/core/seastar.hh>
#include <utility>

Expand Down Expand Up @@ -121,15 +122,16 @@ class writer
return std::get<column_chunk_writer<ParquetType>>(_writers[i]);
}


auto flush_page(int idx, uint64_t limit_size)->bool {
return std::visit([&limit_size](auto& col) -> bool {
if (col.current_page_max_size() > limit_size) {
col.flush_page();
return true;
}
return false;
}, _writers[idx]);
auto flush_page(int idx, uint64_t limit_size) -> bool {
return std::visit(
[&limit_size](auto& col) -> bool {
if (col.current_page_max_size() > limit_size) {
col.flush_page();
return true;
}
return false;
},
_writers[idx]);
}

size_t estimated_row_group_size() const {
Expand Down Expand Up @@ -192,4 +194,157 @@ class writer
}
};

template <typename Class>
concept is_sync_sink_v = requires(Class obj, const char* str, size_t len) {
{ obj.write(str, len) } -> std::same_as<void>;
{ obj.flush() } -> std::same_as<void>;
{ obj.close() } -> std::same_as<void>;
};

template <typename SINK>
requires is_sync_sink_v<SINK>
class sync_writer
{
public:
using column_chunk_writer_variant =
std::variant<column_chunk_writer<format::Type::BOOLEAN>, column_chunk_writer<format::Type::INT32>,
column_chunk_writer<format::Type::INT64>, column_chunk_writer<format::Type::FLOAT>,
column_chunk_writer<format::Type::DOUBLE>, column_chunk_writer<format::Type::BYTE_ARRAY>,
column_chunk_writer<format::Type::FIXED_LEN_BYTE_ARRAY>>;

private:
bool _closed = false;
SINK _sink;
std::vector<column_chunk_writer_variant> _writers;
format::FileMetaData _metadata;
std::vector<std::vector<std::string>> _leaf_paths;
thrift_serializer _thrift_serializer;
size_t _file_offset = 0;

private:
void init_writers(const writer_schema::schema& root) {
using namespace writer_schema;
auto convert = y_combinator{[&](auto&& convert, const node& node_variant, uint32_t def, uint32_t rep) -> void {
std::visit(
overloaded{[&](const list_node& x) { convert(*x.element, def + 1 + x.optional, rep + 1); },
[&](const map_node& x) {
convert(*x.key, def + 1 + x.optional, rep + 1);
convert(*x.value, def + 1 + x.optional, rep + 1);
},
[&](const struct_node& x) {
for (const node& child : x.fields) {
convert(child, def + x.optional, rep);
}
},
[&](const primitive_node& x) {
std::visit(
overloaded{
[&](logical_type::INT96 logical_type) {
throw parquet_exception("INT96 is deprecated. Writing INT96 is unsupported.");
},
[&](auto logical_type) {
constexpr format::Type::type parquet_type = decltype(logical_type)::physical_type;
writer_options options = {def + x.optional, rep, x.encoding, x.compression};
_writers.push_back(make_column_chunk_writer<parquet_type>(options));
}},
x.logical_type);
}},
node_variant);
}};
for (const node& field : root.fields) {
convert(field, 0, 0);
}
}

public:
explicit sync_writer(SINK&& sink) : _sink(std::move(sink)) {}

auto fetch_sink() -> SINK {
assert(_closed);
return std::move(_sink);
}

static std::unique_ptr<sync_writer> open_and_write_par1(SINK&& sink, const writer_schema::schema& schema) {
auto fw = std::make_unique<sync_writer>(std::move(sink));
writer_schema::write_schema_result wsr = writer_schema::write_schema(schema);
fw->_metadata.schema = std::move(wsr.elements);
fw->_leaf_paths = std::move(wsr.leaf_paths);
fw->init_writers(schema);
fw->_file_offset = 4;
fw->_sink.write("PAR1", 4);
return fw;
}

static std::unique_ptr<sync_writer> open(SINK&& sink, const writer_schema::schema& schema) {
return open_and_write_par1(std::move(sink), schema);
}

template <format::Type::type ParquetType>
column_chunk_writer<ParquetType>& column(int i) {
return std::get<column_chunk_writer<ParquetType>>(_writers[i]);
}

auto flush_page(int idx, uint64_t limit_size) -> bool {
return std::visit(
[&limit_size](auto& col) -> bool {
if (col.current_page_max_size() > limit_size) {
col.flush_page();
return true;
}
return false;
},
_writers[idx]);
}

size_t estimated_row_group_size() const {
size_t size = 0;
for (const auto& writer : _writers) {
std::visit([&](const auto& x) { size += x.estimated_chunk_size(); }, writer);
}
return size;
}

auto flush_row_group() -> void {
_metadata.row_groups.push_back(format::RowGroup{});
size_t rows_written = 0;
if (_writers.size() > 0) {
rows_written = std::visit([&](auto& x) { return x.rows_written(); }, _writers[0]);
}
_metadata.row_groups.rbegin()->__set_num_rows(rows_written);
for (size_t i : std::ranges::iota_view(0U, _writers.size())) {
auto cmd = std::visit([&](auto& x) { return x.sync_flush_chunk(_sink); }, _writers[i]);
cmd->dictionary_page_offset += _file_offset;
cmd->data_page_offset += _file_offset;
cmd->__set_path_in_schema(_leaf_paths[i]);
bytes_view footer = _thrift_serializer.serialize(*cmd);

_file_offset += cmd->total_compressed_size;
format::ColumnChunk cc;
cc.__set_file_offset(_file_offset);
cc.__set_meta_data(*cmd);
_metadata.row_groups.rbegin()->columns.push_back(cc);
_metadata.row_groups.rbegin()->__set_total_byte_size(_metadata.row_groups.rbegin()->total_byte_size +
cmd->total_compressed_size + footer.size());
_file_offset += footer.size();
_sink.write(reinterpret_cast<const char*>(footer.data()), footer.size());
}
}

auto close() -> void {
_closed = true;
flush_row_group();
for (const format::RowGroup& rg : _metadata.row_groups) {
_metadata.num_rows += rg.num_rows;
}
_metadata.__set_version(1); // Parquet 2.0 == 1
const bytes_view footer = _thrift_serializer.serialize(_metadata);
_sink.write(reinterpret_cast<const char*>(footer.data()), footer.size());
const uint32_t footer_size = footer.size();
_sink.write(reinterpret_cast<const char*>(&footer_size), 4);
_sink.write("PAR1", 4);
_sink.flush();
_sink.close();
}
};

} // namespace parquet4seastar
15 changes: 7 additions & 8 deletions tests/file_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,16 @@ class MemorySink
public:
std::vector<char> data;

auto write(const char* str, size_t len) -> seastar::future<> {
auto write(const char* str, size_t len) -> void {
for (auto idx = 0; idx < len; ++idx) {
data.push_back(str[idx]);
}
co_return;
}
auto flush() -> seastar::future<> { co_return; }
auto close() -> seastar::future<> { co_return; }
auto flush() -> void {}
auto close() -> void {}
};

static_assert(parquet4seastar::is_sink_v<MemorySink>);
static_assert(parquet4seastar::is_sync_sink_v<MemorySink>);

SEASTAR_TEST_CASE(full_roundtrip) {
using namespace parquet4seastar;
Expand Down Expand Up @@ -110,7 +109,7 @@ SEASTAR_TEST_CASE(full_roundtrip) {
auto file = open_file_dma(test_file_name, flags).get0();
auto sink = make_file_output_stream(file).get0();
auto fw = writer<seastar::output_stream<char>>::open(std::move(sink), writer_schema).get0();
auto memory_fw = writer<MemorySink>::open(MemorySink(), writer_schema).get0();
auto memory_fw = sync_writer<MemorySink>::open(MemorySink(), writer_schema);
{
auto& map_key = fw->column<format::Type::BYTE_ARRAY>(0);
auto& map_value = fw->column<format::Type::INT32>(1);
Expand Down Expand Up @@ -144,7 +143,7 @@ SEASTAR_TEST_CASE(full_roundtrip) {
struct_field_1.put(0, 0, 1337);
struct_field_2.put(0, 0, 1337);

memory_fw->flush_row_group().get0();
memory_fw->flush_row_group();

map_key.put(2, 0, "key1"_bv);
map_value.put(2, 0, 1);
Expand All @@ -157,7 +156,7 @@ SEASTAR_TEST_CASE(full_roundtrip) {
}

fw->close().get0();
memory_fw->close().get();
memory_fw->close();

auto parquet_file = seastar::open_file_dma(test_file_name, seastar::open_flags::ro).get0();
auto size = parquet_file.size().get();
Expand Down

0 comments on commit afd3b05

Please sign in to comment.