Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More sort and finalize fixes #1799

Open
wants to merge 72 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
65d151a
Fix having empty dataframe in staged writes
Aug 14, 2024
ddf9a96
Throw exception when there is an empty staging area
Aug 14, 2024
2bede7c
Fix failing test
Aug 14, 2024
07c2dd1
Fix compilation errors
Aug 14, 2024
19250a1
Apply fixes for empty dfs in staged writes with vanila finalize
Aug 15, 2024
3f85d61
Address review comments
Aug 16, 2024
10cf515
Unify finalize methods (#1767)
vasil-pashov Aug 20, 2024
8db2b78
Add comments for unreachable code as per review request
Aug 20, 2024
ff546ee
Add comments as per review request
Aug 21, 2024
39baaf2
Throw when trying to compact unordered incomplete segments with final…
Aug 21, 2024
583ef39
More fixes for sort_and_finalize
Aug 22, 2024
546a860
Fixes for sort and finalize
Aug 29, 2024
585cb07
Add tests for schema mismatch in finalize_staged_data
Aug 29, 2024
4da043a
Merge branch 'master' into sort-and-finalize-sorting
Aug 29, 2024
e5fdb1c
Fix errors from merge commit
Aug 29, 2024
e621b9f
Fix CI compilation error
Aug 29, 2024
2aa8497
Fix compilation errors in tests
Aug 29, 2024
9ec7fca
Fix failing C++ tests
Aug 30, 2024
f672906
Fix compilation errors
Aug 30, 2024
4f0d36b
Merge branch 'master' into sort-and-finalize-sorting
Aug 30, 2024
6b09f3c
Fix failing c++ tests
Aug 30, 2024
91cca50
Add fixes for dynamic schema and staged writes
Aug 30, 2024
3405b3a
Fixing failing tests
Sep 1, 2024
32bfcc7
* Make it possible to have columns in the staged segments with common…
Sep 2, 2024
2462d1f
Fix C++ tests
Sep 3, 2024
176a102
Fix index end when there are no rows
Sep 3, 2024
60cd544
Strict static schema unit tests for sort and finalize
Sep 4, 2024
7dec6c8
Fix static schema hypothesis test
Sep 4, 2024
57005f7
Add dynamic schema testing
Sep 4, 2024
a6f0618
Merge branch 'master' into more-sort-and-finalize-fixes
vasil-pashov Sep 4, 2024
befbe67
Fix tests
Sep 4, 2024
e2f57a9
Fix tests
Sep 5, 2024
ca12dc7
Fix int checking in hypothesis
Sep 5, 2024
e7f86de
Fix a dynamic schema bug
Sep 9, 2024
d18d9cb
Bring back validate_index for incompletes
Sep 9, 2024
5c11843
Add assertion for null value reducer when a column cannot be found in…
Sep 9, 2024
a21c8b3
Use std::adjacent_find to compare all staged stream descriptors are t…
Sep 9, 2024
7a1d888
Earlier assert that staged stream descriptor is the same as the one o…
Sep 9, 2024
ba58f6e
Comment the return value of read_incompletes_to_pipeline
Sep 9, 2024
c159946
Remove check_incomplete_descriptor_match which is redundant
Sep 9, 2024
bd02dcc
Use internal::check instead of util::check
Sep 9, 2024
1db047e
Rework incomplete keys RAII
Sep 9, 2024
9ffc788
Remove checks which are no longer needed
Sep 9, 2024
fc39ccc
Merge branch 'master' into sort-and-finalize-sorting
Sep 9, 2024
9ebe329
Bring back the check if !segments.empty()
Sep 10, 2024
b8420ad
Move stream descriptor far out of for loop in merge.hpp
Sep 10, 2024
2cd017b
Generate 4 cols in test_write_parallel_sort_merge
Sep 10, 2024
2a09829
Check values in test_repeating_index_values
Sep 10, 2024
14ab952
Fix duplicated col_1 in test_appending_reordered_column_set_throws
Sep 10, 2024
819d2a5
Fix typo in test case name test_type_mismatch_in_staged_segments_thro…
Sep 10, 2024
ddb83a6
Fix duplicated col_1 in test_staged_segments_cant_be_reordered
Sep 10, 2024
f7c7746
Explicit dtypes for all DFs in TestStreamDescriptorMismatchOnFinalize…
Sep 10, 2024
0336091
Move test_two_columns_with_different_dtypes to nonreg tests
Sep 10, 2024
bff0fdd
More test cases for NaT values
Sep 10, 2024
7b4ae50
Type promotion checks for sort merge and dynamic schema
Sep 10, 2024
1d7e76d
Move v1 TestFinalizeStagedDataStaticSchemaMismatch to test_parallel.py
Sep 10, 2024
495ec6d
Move TestFinalizeWithEmptySegments for v1 API in test_parallel.py
Sep 10, 2024
aae39b2
Remove has_common_valid_type from merge function
Sep 11, 2024
215cd3b
Simplify merge types checks ad fix date_and_time
Sep 11, 2024
98aef2b
Rework hypothesis tests for sort and finalize. TBD: dynnamic schema a…
Sep 12, 2024
5b98440
Rework dynamic schema sort and finalize append tests
Sep 13, 2024
5ef9514
Fix hypothesis tests
Sep 13, 2024
beac0bb
Fix merge clause test
Sep 13, 2024
d30124c
Fix a typo
Sep 13, 2024
575c07e
Allow for UNKNOWN sorted value in do_compact
Sep 16, 2024
5edea0e
Move no append keys assertion in the funcitons that assert errors to …
Sep 17, 2024
8cd0ac3
Use ScalarTypeInfo
Sep 17, 2024
d76e0e3
Fix test name
Sep 17, 2024
82cc9e9
Fix compilation errors on linux
Sep 17, 2024
006f78a
Remove empty columns from staged segments instead of filling them with
Sep 18, 2024
cef88e4
Do not try dropping empty colummns when static schema is used in merg…
Sep 19, 2024
8999834
Merge branch 'master' into more-sort-and-finalize-fixes
vasil-pashov Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ set(arcticdb_srcs
util/offset_string.cpp
util/sparse_utils.cpp
util/string_utils.cpp
util/timer.cpp
util/trace.cpp
util/type_handler.cpp
version/local_versioned_engine.cpp
Expand Down
11 changes: 8 additions & 3 deletions cpp/arcticdb/column_store/column.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,15 @@ class Column {
return TypedColumnIterator<TagType, const RawType>(*this, false);
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
template<typename T>
requires std::integral<T> || std::floating_point<T>
void set_scalar(ssize_t row_offset, T val) {
util::check(sizeof(T) == get_type_size(type_.data_type()), "Type mismatch in set_scalar, expected {}",
get_type_size(type_.data_type()));
util::check(
sizeof(T) == get_type_size(type_.data_type()),
"Type mismatch in set_scalar, expected {} byte scalar got {} byte scalar",
get_type_size(type_.data_type()),
sizeof(T)
);

auto prev_logical_row = last_logical_row_;
last_logical_row_ = row_offset;
Expand Down
8 changes: 5 additions & 3 deletions cpp/arcticdb/column_store/memory_segment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ class SegmentInMemory {
impl_->end_row();
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
template<typename T>
requires std::integral<T> || std::floating_point<T>
void set_scalar(position_t idx, T val) {
impl_->set_scalar(idx, val);
}

template<class T, std::enable_if_t<std::is_same_v<std::decay_t<T>, std::string>, int> = 0>
void set_scalar(position_t idx, T val) {
template<typename T>
requires std::same_as<std::decay_t<T>, std::string>
void set_scalar(position_t idx, const T& val) {
impl_->set_string(idx, val);
}

Expand Down
48 changes: 25 additions & 23 deletions cpp/arcticdb/column_store/memory_segment_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,31 +67,31 @@ class SegmentInMemoryImpl {

template<class Callable>
auto visit(Callable &&c) const {
return entity::visit_field(parent_->descriptor().field(column_id_), [that=this, c=std::forward<Callable>(c)](auto type_desc_tag) {
return entity::visit_field(parent_->descriptor().field(column_id_), [this, c=std::forward<Callable>(c)](auto type_desc_tag) {
using RawType = typename std::decay_t<decltype(type_desc_tag)>::DataTypeTag::raw_type;
return c(that->parent_->scalar_at<RawType>(that->row_id_, that->column_id_));
return c(parent_->scalar_at<RawType>(row_id_, column_id_));
});
}

template<class Callable>
auto visit_string(Callable &&c) const {
return entity::visit_field(parent_->descriptor().field(column_id_), [that=this, c = std::forward<Callable>(c)](auto type_desc_tag) {
return entity::visit_field(parent_->descriptor().field(column_id_), [this, c = std::forward<Callable>(c)](auto type_desc_tag) {
using DTT = typename std::decay_t<decltype(type_desc_tag)>::DataTypeTag;
if constexpr(is_sequence_type(DTT::data_type))
return c(that->parent_->string_at(that->row_id_, position_t(that->column_id_)));
return c(parent_->string_at(row_id_, position_t(column_id_)));
});
}

template<class Callable>
auto visit_field(Callable &&c) const {
const auto& field = parent_->descriptor().field(column_id_);
return entity::visit_field(parent_->descriptor().field(column_id_), [&field, that=this, c = std::forward<Callable>(c)](auto type_desc_tag) {
return entity::visit_field(field, [&field, this, c = std::forward<Callable>(c)](auto type_desc_tag) {
using DataTypeTag = typename std::decay_t<decltype(type_desc_tag)>::DataTypeTag;
using RawType = typename DataTypeTag::raw_type;
if constexpr (is_sequence_type(DataTypeTag::data_type))
return c(that->parent_->string_at(that->row_id_, position_t(that->column_id_)), std::string_view{field.name()}, field.type());
return c(parent_->string_at(row_id_, position_t(column_id_)), std::string_view{field.name()}, type_desc_tag);
else if constexpr (is_numeric_type(DataTypeTag::data_type) || is_bool_type(DataTypeTag::data_type))
return c(that->parent_->scalar_at<RawType>(that->row_id_, that->column_id_), std::string_view{field.name()}, field.type());
return c(parent_->scalar_at<RawType>(row_id_, column_id_), std::string_view{field.name()}, type_desc_tag);
else if constexpr(is_empty_type(DataTypeTag::data_type))
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("visit_field does not support empty-type columns");
else
Expand All @@ -101,9 +101,9 @@ class SegmentInMemoryImpl {

template<class Callable>
auto visit(Callable &&c) {
return entity::visit_field(parent_->descriptor().field(column_id_), [that=this, c=std::forward<Callable>(c)](auto type_desc_tag) {
return entity::visit_field(parent_->descriptor().field(column_id_), [this, c=std::forward<Callable>(c)](auto type_desc_tag) {
using RawType = typename std::decay_t<decltype(type_desc_tag)>::DataTypeTag::raw_type;
return c(that->parent_->reference_at<RawType>(that->row_id_, that->column_id_));
return c(parent_->reference_at<RawType>(row_id_, column_id_));
});
}

Expand Down Expand Up @@ -454,18 +454,21 @@ class SegmentInMemoryImpl {
});
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
void set_scalar(position_t idx, T val) {
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_scalar(position_t idx, T val) {
ARCTICDB_TRACE(log::version(), "Segment setting scalar {} at row {} column {}", val, row_id_ + 1, idx);
column(idx).set_scalar(row_id_ + 1, val);
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
void set_external_block(position_t idx, T *val, size_t size) {
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_external_block(position_t idx, T *val, size_t size) {
column_unchecked(idx).set_external_block(row_id_ + 1, val, size);
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_sparse_block(position_t idx, T *val, size_t rows_to_write) {
column_unchecked(idx).set_sparse_block(row_id_ + 1, val, rows_to_write);
}
Expand All @@ -478,23 +481,22 @@ class SegmentInMemoryImpl {
column_unchecked(idx).set_sparse_block(std::move(buffer), std::move(shapes), std::move(bitset));
}

template<class T, std::enable_if_t<std::is_same_v<std::decay_t<T>, std::string>, int> = 0>
void set_scalar(position_t idx, T val) {
template<class T>
requires std::same_as<std::decay_t<T>, std::string>
void set_scalar(position_t idx, const T& val) {
set_string(idx, val);
}

template<class T, template<class> class Tensor, std::enable_if_t<
std::is_integral_v<T> || std::is_floating_point_v<T>,
int> = 0>
void set_array(position_t pos, Tensor<T> &val) {
template<class T, template<class> class Tensor>
requires std::integral<T> || std::floating_point<T>
void set_array(position_t pos, Tensor<T> &val) {
magic_.check();
ARCTICDB_SAMPLE(MemorySegmentSetArray, 0)
column_unchecked(pos).set_array(row_id_ + 1, val);
}

template<class T, std::enable_if_t<
std::is_integral_v<T> || std::is_floating_point_v<T>,
int> = 0>
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_array(position_t pos, py::array_t<T>& val) {
magic_.check();
ARCTICDB_SAMPLE(MemorySegmentSetArray, 0)
Expand Down
21 changes: 19 additions & 2 deletions cpp/arcticdb/entity/merge_descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace arcticdb {
StreamDescriptor merge_descriptors(
const StreamDescriptor &original,
const std::vector<std::shared_ptr<FieldCollection>> &entries,
std::span<const std::shared_ptr<FieldCollection>> entries,
const std::unordered_set<std::string_view> &filtered_set,
const std::optional<IndexDescriptorImpl>& default_index) {
using namespace arcticdb::stream;
Expand All @@ -34,6 +34,7 @@ StreamDescriptor merge_descriptors(
merged_fields.emplace_back(idx.name());
merged_fields_map.try_emplace(idx.name(), TypeDescriptor{typename IndexType::TypeDescTag{}});
});
index = default_index_type_from_descriptor(*default_index);
} else {
util::raise_rte("Descriptor has uninitialized index and no default supplied");
}
Expand Down Expand Up @@ -71,7 +72,12 @@ StreamDescriptor merge_descriptors(
if(new_descriptor) {
merged_fields_map[field.name()] = *new_descriptor;
} else {
util::raise_rte("No valid common type between {} and {} for column {}", existing_type_desc, type_desc, field.name());
schema::raise<ErrorCode::E_DESCRIPTOR_MISMATCH>(
"No valid common type between {} and {} for column {}",
existing_type_desc,
type_desc,
field.name()
);
}
}
} else {
Expand Down Expand Up @@ -99,6 +105,17 @@ StreamDescriptor merge_descriptors(
return merge_descriptors(original, entries, filtered_set, default_index);
}

StreamDescriptor merge_descriptors(
const StreamDescriptor& original,
std::span<const std::shared_ptr<FieldCollection>> entries,
const std::optional<std::vector<std::string>>& filtered_columns,
const std::optional<IndexDescriptorImpl>& default_index) {
std::unordered_set<std::string_view> filtered_set = filtered_columns.has_value()
? std::unordered_set<std::string_view>(filtered_columns->begin(), filtered_columns->end())
: std::unordered_set<std::string_view>{};
return merge_descriptors(original, entries, filtered_set, default_index);
}

StreamDescriptor merge_descriptors(
const StreamDescriptor &original,
const std::vector<pipelines::SliceAndKey> &entries,
Expand Down
9 changes: 8 additions & 1 deletion cpp/arcticdb/entity/merge_descriptors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@

#include <arcticdb/entity/stream_descriptor.hpp>
#include <arcticdb/pipeline/frame_slice.hpp>
#include <span>

namespace arcticdb {
StreamDescriptor merge_descriptors(
const StreamDescriptor &original,
const std::vector<std::shared_ptr<FieldCollection>> &entries,
std::span<const std::shared_ptr<FieldCollection>> entries,
const std::unordered_set<std::string_view> &filtered_set,
const std::optional<IndexDescriptorImpl>& default_index);

Expand All @@ -21,6 +22,12 @@ entity::StreamDescriptor merge_descriptors(
const std::optional<std::vector<std::string>> &filtered_columns,
const std::optional<entity::IndexDescriptorImpl>& default_index = std::nullopt);

entity::StreamDescriptor merge_descriptors(
const entity::StreamDescriptor& original,
std::span<const std::shared_ptr<FieldCollection>> entries,
const std::optional<std::vector<std::string>>& filtered_columns,
const std::optional<entity::IndexDescriptorImpl>& default_index = std::nullopt);

entity::StreamDescriptor merge_descriptors(
const entity::StreamDescriptor &original,
const std::vector<pipelines::SliceAndKey> &entries,
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/entity/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,11 +441,11 @@ struct TypeDescriptor {
template<typename Callable>
constexpr auto visit_tag(Callable &&callable) const;

bool operator==(const TypeDescriptor &o) const {
[[nodiscard]] constexpr bool operator==(const TypeDescriptor& o) const {
return data_type_ == o.data_type_ && dimension_ == o.dimension_;
}

bool operator!=(const TypeDescriptor &o) const {
[[nodiscard]] constexpr bool operator!=(const TypeDescriptor& o) const {
return !(*this == o);
}

Expand Down
3 changes: 2 additions & 1 deletion cpp/arcticdb/pipeline/index_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,10 @@ TimeseriesDescriptor get_merged_tsd(
}
else if (dynamic_schema) {
// In case of dynamic schema
const std::array fields_ptr = {new_frame->desc.fields_ptr()};
merged_descriptor = merge_descriptors(
existing_descriptor,
std::vector<std::shared_ptr<FieldCollection>>{new_frame->desc.fields_ptr()},
fields_ptr,
{}
);
} else {
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/pipeline/pipeline_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ struct PipelineContext : public std::enable_shared_from_this<PipelineContext> {
// written in, desc_ will be modified such that the return matches what's requested, and this'll be set to the
// original value. It's only set in this edge case.
std::optional<StreamDescriptor> orig_desc_;
// When there are staged segments this holds the combined stream descriptor for all staged segments
// This can be different than desc_ in case dynamic schema is used. Otherwise they must be the same.
std::optional<StreamDescriptor> staged_descriptor_;
StreamId stream_id_;
VersionId version_id_ = 0;
size_t total_rows_ = 0;
Expand Down Expand Up @@ -200,6 +203,7 @@ struct PipelineContext : public std::enable_shared_from_this<PipelineContext> {
swap(left.segment_descriptors_, right.segment_descriptors_);
swap(left.filter_columns_set_, right.filter_columns_set_);
swap(left.compacted_, right.compacted_);
swap(left.staged_descriptor_, right.staged_descriptor_);
}

using iterator = PipelineContextIterator<PipelineContextRow>;
Expand Down
4 changes: 3 additions & 1 deletion cpp/arcticdb/pipeline/read_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ struct ReduceColumnTask : async::BaseTask {
} else {
column.default_initialize_rows(0, frame_.row_count(), false);
}
} else {
} else if (column_data != slice_map_->columns_.end()) {
alexowens90 marked this conversation as resolved.
Show resolved Hide resolved
if(dynamic_schema_) {
NullValueReducer null_reducer{column, context_, frame_, shared_data_, handler_data_};
for (const auto &row : column_data->second) {
Expand All @@ -623,6 +623,8 @@ struct ReduceColumnTask : async::BaseTask {

column.set_inflated(frame_.row_count());
}
} else if (!dynamic_schema_ && column_data == slice_map_->columns_.end() && is_sequence_type(column.type().data_type())) {
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("Column with index {} is not in static schema slice map.", column_index_);
}
return folly::Unit{};
}
Expand Down
Loading
Loading