Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More sort and finalize fixes #1799

Merged
merged 72 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
65d151a
Fix having empty dataframe in staged writes
Aug 14, 2024
ddf9a96
Throw exception when there is an empty staging area
Aug 14, 2024
2bede7c
Fix failing test
Aug 14, 2024
07c2dd1
Fix compilation errors
Aug 14, 2024
19250a1
Apply fixes for empty dfs in staged writes with vanila finalize
Aug 15, 2024
3f85d61
Address review comments
Aug 16, 2024
10cf515
Unify finalize methods (#1767)
vasil-pashov Aug 20, 2024
8db2b78
Add comments for unreachable code as per review request
Aug 20, 2024
ff546ee
Add comments as per review request
Aug 21, 2024
39baaf2
Throw when trying to compact unordered incomplete segments with final…
Aug 21, 2024
583ef39
More fixes for sort_and_finalize
Aug 22, 2024
546a860
Fixes for sort and finalize
Aug 29, 2024
585cb07
Add tests for schema mismatch in finalize_staged_data
Aug 29, 2024
4da043a
Merge branch 'master' into sort-and-finalize-sorting
Aug 29, 2024
e5fdb1c
Fix errors from merge commit
Aug 29, 2024
e621b9f
Fix CI compilation error
Aug 29, 2024
2aa8497
Fix compilation errors in tests
Aug 29, 2024
9ec7fca
Fix failing C++ tests
Aug 30, 2024
f672906
Fix compilation errors
Aug 30, 2024
4f0d36b
Merge branch 'master' into sort-and-finalize-sorting
Aug 30, 2024
6b09f3c
Fix failing c++ tests
Aug 30, 2024
91cca50
Add fixes for dynamic schema and staged writes
Aug 30, 2024
3405b3a
Fixing failing tests
Sep 1, 2024
32bfcc7
* Make it possible to have columns in the staged segments with common…
Sep 2, 2024
2462d1f
Fix C++ tests
Sep 3, 2024
176a102
Fix index end when there are no rows
Sep 3, 2024
60cd544
Strict static schema unit tests for sort and finalize
Sep 4, 2024
7dec6c8
Fix static schema hypothesis test
Sep 4, 2024
57005f7
Add dynamic schema testing
Sep 4, 2024
a6f0618
Merge branch 'master' into more-sort-and-finalize-fixes
vasil-pashov Sep 4, 2024
befbe67
Fix tests
Sep 4, 2024
e2f57a9
Fix tests
Sep 5, 2024
ca12dc7
Fix int checking in hypothesis
Sep 5, 2024
e7f86de
Fix a dynamic schema bug
Sep 9, 2024
d18d9cb
Bring back validate_index for incompletes
Sep 9, 2024
5c11843
Add assertion for null value reducer when a column cannot be found in…
Sep 9, 2024
a21c8b3
Use std::adjacent_find to compare all staged stream descriptors are t…
Sep 9, 2024
7a1d888
Earlier assert that staged stream descriptor is the same as the one o…
Sep 9, 2024
ba58f6e
Comment the return value of read_incompletes_to_pipeline
Sep 9, 2024
c159946
Remove check_incomplete_descriptor_match which is redundant
Sep 9, 2024
bd02dcc
Use internal::check instead of util::check
Sep 9, 2024
1db047e
Rework incomplete keys RAII
Sep 9, 2024
9ffc788
Remove checks which are no longer needed
Sep 9, 2024
fc39ccc
Merge branch 'master' into sort-and-finalize-sorting
Sep 9, 2024
9ebe329
Bring back the check if !segments.empty()
Sep 10, 2024
b8420ad
Move stream descriptor far out of for loop in merge.hpp
Sep 10, 2024
2cd017b
Generate 4 cols in test_write_parallel_sort_merge
Sep 10, 2024
2a09829
Check values in test_repeating_index_values
Sep 10, 2024
14ab952
Fix duplicated col_1 in test_appending_reordered_column_set_throws
Sep 10, 2024
819d2a5
Fix typo in test case name test_type_mismatch_in_staged_segments_thro…
Sep 10, 2024
ddb83a6
Fix duplicated col_1 in test_staged_segments_cant_be_reordered
Sep 10, 2024
f7c7746
Explicit dtypes for all DFs in TestStreamDescriptorMismatchOnFinalize…
Sep 10, 2024
0336091
Move test_two_columns_with_different_dtypes to nonreg tests
Sep 10, 2024
bff0fdd
More test cases for NaT values
Sep 10, 2024
7b4ae50
Type promotion checks for sort merge and dynamic schema
Sep 10, 2024
1d7e76d
Move v1 TestFinalizeStagedDataStaticSchemaMismatch to test_parallel.py
Sep 10, 2024
495ec6d
Move TestFinalizeWithEmptySegments for v1 API in test_parallel.py
Sep 10, 2024
aae39b2
Remove has_common_valid_type from merge function
Sep 11, 2024
215cd3b
Simplify merge types checks ad fix date_and_time
Sep 11, 2024
98aef2b
Rework hypothesis tests for sort and finalize. TBD: dynnamic schema a…
Sep 12, 2024
5b98440
Rework dynamic schema sort and finalize append tests
Sep 13, 2024
5ef9514
Fix hypothesis tests
Sep 13, 2024
beac0bb
Fix merge clause test
Sep 13, 2024
d30124c
Fix a typo
Sep 13, 2024
575c07e
Allow for UNKNOWN sorted value in do_compact
Sep 16, 2024
5edea0e
Move no append keys assertion in the funcitons that assert errors to …
Sep 17, 2024
8cd0ac3
Use ScalarTypeInfo
Sep 17, 2024
d76e0e3
Fix test name
Sep 17, 2024
82cc9e9
Fix compilation errors on linux
Sep 17, 2024
006f78a
Remove empty columns from staged segments instead of filling them with
Sep 18, 2024
cef88e4
Do not try dropping empty colummns when static schema is used in merg…
Sep 19, 2024
8999834
Merge branch 'master' into more-sort-and-finalize-fixes
vasil-pashov Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cpp/arcticdb/column_store/column.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ class Column {
return TypedColumnIterator<TagType, const RawType>(*this, false);
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
template<typename T>
requires std::integral<T> || std::floating_point<T>
void set_scalar(ssize_t row_offset, T val) {
util::check(sizeof(T) == get_type_size(type_.data_type()), "Type mismatch in set_scalar, expected {}",
get_type_size(type_.data_type()));
Expand Down
8 changes: 5 additions & 3 deletions cpp/arcticdb/column_store/memory_segment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ class SegmentInMemory {
impl_->end_row();
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
template<typename T>
requires std::integral<T> || std::floating_point<T>
void set_scalar(position_t idx, T val) {
impl_->set_scalar(idx, val);
}

template<class T, std::enable_if_t<std::is_same_v<std::decay_t<T>, std::string>, int> = 0>
void set_scalar(position_t idx, T val) {
template<typename T>
requires std::same_as<std::decay_t<T>, std::string>
void set_scalar(position_t idx, const T& val) {
impl_->set_string(idx, val);
}

Expand Down
8 changes: 7 additions & 1 deletion cpp/arcticdb/entity/merge_descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ StreamDescriptor merge_descriptors(
merged_fields.emplace_back(idx.name());
merged_fields_map.try_emplace(idx.name(), TypeDescriptor{typename IndexType::TypeDescTag{}});
});
index = default_index_type_from_descriptor(*default_index);
} else {
util::raise_rte("Descriptor has uninitialized index and no default supplied");
}
Expand Down Expand Up @@ -71,7 +72,12 @@ StreamDescriptor merge_descriptors(
if(new_descriptor) {
merged_fields_map[field.name()] = *new_descriptor;
} else {
util::raise_rte("No valid common type between {} and {} for column {}", existing_type_desc, type_desc, field.name());
schema::raise<ErrorCode::E_DESCRIPTOR_MISMATCH>(
"No valid common type between {} and {} for column {}",
existing_type_desc,
type_desc,
field.name()
);
}
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/pipeline/read_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ struct ReduceColumnTask : async::BaseTask {
} else {
column.default_initialize_rows(0, frame_.row_count(), false);
}
} else {
} else if (column_data != slice_map_->columns_.end()) {
alexowens90 marked this conversation as resolved.
Show resolved Hide resolved
if(dynamic_schema_) {
NullValueReducer null_reducer{column, context_, frame_, shared_data_, handler_data_};
for (const auto &row : column_data->second) {
Expand Down
36 changes: 16 additions & 20 deletions cpp/arcticdb/processing/clause.cpp
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@willdealtry can you review the changes in this file? You have more context than I do

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <arcticdb/processing/processing_unit.hpp>
#include <arcticdb/column_store/string_pool.hpp>
#include <arcticdb/util/offset_string.hpp>
#include <arcticdb/stream/merge.hpp>

#include <arcticdb/processing/clause.hpp>
#include <arcticdb/pipeline/column_stats.hpp>
Expand Down Expand Up @@ -888,13 +889,12 @@ std::vector<EntityId> SortClause::process(std::vector<EntityId>&& entity_ids) co
return push_entities(component_manager_, std::move(proc));
}

template<typename IndexType, typename DensityPolicy, typename QueueType, typename Comparator, typename StreamId>
template<typename IndexType, typename DensityPolicy, typename QueueType>
void merge_impl(
std::shared_ptr<ComponentManager> component_manager,
std::vector<std::vector<EntityId>>& ret,
QueueType &input_streams,
bool add_symbol_column,
StreamId stream_id,
const RowRange& row_range,
const ColRange& col_range,
IndexType index,
Expand All @@ -908,19 +908,16 @@ void merge_impl(
};

using AggregatorType = stream::Aggregator<IndexType, stream::DynamicSchema, SegmentationPolicy, DensityPolicy>;
const auto& fields = stream_descriptor.fields();
FieldCollection new_fields{};
(void)new_fields.add(fields[0].ref());

auto index_desc = index_descriptor_from_range(stream_id, index, new_fields);
auto desc = StreamDescriptor{index_desc};

AggregatorType agg{
stream::DynamicSchema{desc, index},
std::move(func), std::move(segmentation_policy), desc, std::nullopt
stream::DynamicSchema{stream_descriptor, index},
std::move(func),
std::move(segmentation_policy),
stream_descriptor,
std::nullopt
};

stream::do_merge<IndexType, SegmentWrapper, AggregatorType, decltype(input_streams)>(
stream::do_merge<IndexType, AggregatorType, decltype(input_streams)>(
input_streams, agg, add_symbol_column);
}

Expand Down Expand Up @@ -977,15 +974,14 @@ std::optional<std::vector<std::vector<EntityId>>> MergeClause::repartition(std::
std::vector<std::vector<EntityId>> ret;
std::visit(
[this, &ret, &input_streams, &comp=compare, stream_id=stream_id_, &row_range, &col_range](auto idx, auto density) {
merge_impl<decltype(idx), decltype(density), decltype(input_streams), decltype(comp), decltype(stream_id)>(component_manager_,
ret,
input_streams,
add_symbol_column_,
stream_id,
row_range,
col_range,
idx,
stream_descriptor_);
merge_impl<decltype(idx), decltype(density), decltype(input_streams)>(component_manager_,
ret,
input_streams,
add_symbol_column_,
row_range,
col_range,
idx,
stream_descriptor_);
}, index_, density_policy_);
return ret;
}
Expand Down
1 change: 0 additions & 1 deletion cpp/arcticdb/processing/clause.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <arcticdb/processing/grouper.hpp>
#include <arcticdb/stream/aggregator.hpp>
#include <arcticdb/util/movable_priority_queue.hpp>
#include <arcticdb/stream/merge.hpp>
#include <arcticdb/pipeline/index_utils.hpp>

#include <folly/Poly.h>
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/processing/test/test_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ TEST(Clause, Merge) {

auto stream_id = StreamId("Merge");
StreamDescriptor descriptor{};
descriptor.set_index(static_cast<IndexDescriptorImpl>(TimeseriesIndex::default_index()));
descriptor.add_field(FieldRef{make_scalar_type(DataType::NANOSECONDS_UTC64),"time"});
MergeClause merge_clause{TimeseriesIndex{"time"}, SparseColumnPolicy{}, stream_id, descriptor};
merge_clause.set_component_manager(component_manager);
Expand Down
6 changes: 4 additions & 2 deletions cpp/arcticdb/stream/aggregator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,14 @@ class Aggregator {
void commit_impl(bool final);

private:
template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
template <typename T>
requires std::integral<T> || std::floating_point<T>
void set_scalar(std::size_t pos, T val) {
segment_.set_scalar(pos, val);
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
template<typename T>
requires std::integral<T> || std::floating_point<T>
void set_scalar_by_name(std::string_view name, T val, DataType data_type) {
position_t pos = schema_policy_.get_column_idx_by_name(segment_, name, make_scalar_type(data_type), segmenting_policy_.expected_row_size(), segment_.row_count());
set_scalar(pos, val);
Expand Down
15 changes: 4 additions & 11 deletions cpp/arcticdb/stream/append_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,9 @@ folly::Future<arcticdb::entity::VariantKey> write_incomplete_frame(
const std::shared_ptr<Store>& store,
const StreamId& stream_id,
const std::shared_ptr<InputTensorFrame>& frame,
bool validate_index,
std::optional<AtomKey>&& next_key) {
using namespace arcticdb::pipelines;

sorting::check<ErrorCode::E_UNSORTED_DATA>(
!validate_index || index_is_not_timeseries_or_is_sorted_ascending(*frame),
"When writing/appending staged data in parallel, input data must be sorted.");

auto index_range = frame->index_range;
auto segment = incomplete_segment_from_frame(frame, 0, std::move(next_key), false);
return store->write(
Expand All @@ -234,10 +229,9 @@ folly::Future<arcticdb::entity::VariantKey> write_incomplete_frame(
void write_parallel(
const std::shared_ptr<Store>& store,
const StreamId& stream_id,
const std::shared_ptr<InputTensorFrame>& frame,
bool validate_index) {
const std::shared_ptr<InputTensorFrame>& frame) {
// TODO: dynamic bucketize doesn't work with incompletes
(void)write_incomplete_frame(store, stream_id, frame, validate_index, std::nullopt).get();
(void)write_incomplete_frame(store, stream_id, frame, std::nullopt).get();
}

std::vector<SliceAndKey> get_incomplete(
Expand Down Expand Up @@ -376,8 +370,7 @@ AppendMapEntry entry_from_key(const std::shared_ptr<StreamSource>& store, const
void append_incomplete(
const std::shared_ptr<Store>& store,
const StreamId& stream_id,
const std::shared_ptr<InputTensorFrame>& frame,
bool validate_index) {
const std::shared_ptr<InputTensorFrame>& frame) {
using namespace arcticdb::proto::descriptors;
using namespace arcticdb::stream;
ARCTICDB_SAMPLE_DEFAULT(AppendIncomplete)
Expand All @@ -387,7 +380,7 @@ void append_incomplete(
const auto num_rows = frame->num_rows;
total_rows += num_rows;
auto desc = frame->desc.clone();
auto new_key = write_incomplete_frame(store, stream_id, frame, validate_index, std::move(next_key)).get();
auto new_key = write_incomplete_frame(store, stream_id, frame, std::move(next_key)).get();


ARCTICDB_DEBUG(log::version(), "Wrote incomplete frame for stream {}, {} rows, total rows {}", stream_id, num_rows, total_rows);
Expand Down
6 changes: 2 additions & 4 deletions cpp/arcticdb/stream/append_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ void remove_incomplete_segments(
void write_parallel(
const std::shared_ptr<Store>& store,
const StreamId& stream_id,
const std::shared_ptr<pipelines::InputTensorFrame>& frame,
bool validate_index);
const std::shared_ptr<pipelines::InputTensorFrame>& frame);

void write_head(
const std::shared_ptr<Store>& store,
Expand All @@ -69,8 +68,7 @@ void append_incomplete_segment(
void append_incomplete(
const std::shared_ptr<Store>& store,
const StreamId& stream_id,
const std::shared_ptr<pipelines::InputTensorFrame>& frame,
bool validate_index);
const std::shared_ptr<pipelines::InputTensorFrame>& frame);

std::optional<int64_t> latest_incomplete_timestamp(
const std::shared_ptr<Store>& store,
Expand Down
8 changes: 4 additions & 4 deletions cpp/arcticdb/stream/index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,22 @@ void TimeseriesIndex::check(const FieldCollection& fields) const {

IndexValue TimeseriesIndex::start_value_for_segment(const SegmentInMemory& segment) {
if (segment.row_count() == 0)
return {NumericIndex{0}};
return {NumericIndex{min_index_value()}};
auto first_ts = segment.template scalar_at<timestamp>(0, 0).value();
return {first_ts};
}

IndexValue TimeseriesIndex::end_value_for_segment(const SegmentInMemory& segment) {
auto row_count = segment.row_count();
if (row_count == 0)
return {NumericIndex{0}};
return {NumericIndex{min_index_value()}};
auto last_ts = segment.template scalar_at<timestamp>(row_count - 1, 0).value();
return {last_ts};
}

IndexValue TimeseriesIndex::start_value_for_keys_segment(const SegmentInMemory& segment) {
if (segment.row_count() == 0)
return {NumericIndex{0}};
return {NumericIndex{min_index_value()}};
auto start_index_id = int(pipelines::index::Fields::start_index);
auto first_ts = segment.template scalar_at<timestamp>(0, start_index_id).value();
return {first_ts};
Expand All @@ -93,7 +93,7 @@ IndexValue TimeseriesIndex::start_value_for_keys_segment(const SegmentInMemory&
IndexValue TimeseriesIndex::end_value_for_keys_segment(const SegmentInMemory& segment) {
auto row_count = segment.row_count();
if (row_count == 0)
return {NumericIndex{0}};
return {NumericIndex{min_index_value()}};
auto end_index_id = int(pipelines::index::Fields::end_index);
auto last_ts = segment.template scalar_at<timestamp>(row_count - 1, end_index_id).value();
return {last_ts};
Expand Down
8 changes: 7 additions & 1 deletion cpp/arcticdb/stream/index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ class TimeseriesIndex : public BaseIndex<TimeseriesIndex> {
static constexpr IndexDescriptorImpl::Type type() {
return IndexDescriptorImpl::Type::TIMESTAMP;
}

static constexpr timestamp min_index_value() {
// std::numeric_limits<timestamp>::min() is reserved for NaT
return std::numeric_limits<timestamp>::min() + 1;
}

TimeseriesIndex(const std::string& name);
static TimeseriesIndex default_index();
void check(const FieldCollection& fields) const;
Expand All @@ -76,7 +82,7 @@ class TimeseriesIndex : public BaseIndex<TimeseriesIndex> {

private:
std::string name_;
timestamp ts_ = 0;
timestamp ts_ = min_index_value();
};

class TableIndex : public BaseIndex<TableIndex> {
Expand Down
49 changes: 44 additions & 5 deletions cpp/arcticdb/stream/merge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,33 @@

#pragma once

#include <type_traits>
#include <arcticdb/pipeline/index_utils.hpp>
#include <arcticdb/util/constants.hpp>
#include <arcticdb/entity/type_utils.hpp>

namespace arcticdb::stream {
template<typename IndexType, typename WrapperType, typename AggregatorType, typename QueueType>
template<typename IndexType, typename AggregatorType, typename QueueType>
void do_merge(
QueueType& input_streams,
AggregatorType& agg,
bool add_symbol_column
) {
while (!input_streams.empty() && input_streams.top()->seg_.row_count() == 0) {
input_streams.pop_top();
}

// NaT is definied as std::numeric_limits<int64_t>::min(), if there are any NaT values they will be on the top of the queue
if (!input_streams.empty()) {
const auto& next = input_streams.top();
const auto index_value =
std::get<timestamp>(*pipelines::index::index_value_from_row(next->row(), IndexDescriptorImpl::Type::TIMESTAMP, 0));
sorting::check<ErrorCode::E_UNSORTED_DATA>(index_value != NaT, "NaT values are not allowed in the index");
}

while (!input_streams.empty()) {
auto next = input_streams.pop_top();

if (next->row().parent_->row_count() == 0) {
if (next->seg_.row_count() == 0) {
continue;
}
agg.start_row(pipelines::index::index_value_from_row(next->row(), IndexDescriptorImpl::Type::TIMESTAMP, 0).value()) ([&next, add_symbol_column](auto &rb) {
Expand All @@ -30,8 +44,33 @@ void do_merge(
std::advance(val, IndexType::field_count());
for(; val != next->row().end(); ++val) {
val->visit_field([&rb] (const auto& opt_v, std::string_view name, const TypeDescriptor& type_desc) {
if(opt_v)
rb.set_scalar_by_name(name, opt_v.value(), type_desc.data_type());
if (opt_v) {
const StreamDescriptor& descriptor = rb.descriptor();
alexowens90 marked this conversation as resolved.
Show resolved Hide resolved
const std::optional<size_t> field_idx = descriptor.find_field(name);
if (!field_idx || type_desc == descriptor.field(*field_idx).type()) {
alexowens90 marked this conversation as resolved.
Show resolved Hide resolved
rb.set_scalar_by_name(name, *opt_v, type_desc.data_type());
} else {
const auto common_type = has_valid_common_type(type_desc, descriptor.field(*field_idx).type());
schema::check<ErrorCode::E_DESCRIPTOR_MISMATCH>(
common_type,
"No valid common type between staged segments for column {}. Mismatched types are {} "
"and {}",
name,
type_desc,
descriptor.field(*field_idx).type()
);
common_type->visit_tag([&](auto type_desc_tag) {
using RawType = decltype(type_desc_tag)::DataTypeTag::raw_type;
if constexpr (std::is_convertible_v<RawType, std::decay_t<decltype(*opt_v)>>) {
using RawType = decltype(type_desc_tag)::DataTypeTag::raw_type;
const auto cast_value = static_cast<RawType>(*opt_v);
rb.set_scalar_by_name(name, cast_value, type_desc.data_type());
} else {
rb.set_scalar_by_name(name, *opt_v, type_desc.data_type());
}
});
}
}
});
}
});
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/stream/test/test_append_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ TEST(Append, Simple) {
auto wrapper = get_test_simple_frame(stream_id, 10, 0);
auto& frame = wrapper.frame_;
auto desc = frame->desc.clone();
append_incomplete(store, stream_id, frame, true);
append_incomplete(store, stream_id, frame);
pipelines::FilterRange range;
auto pipeline_context = std::make_shared<PipelineContext>(desc);
pipeline_context->selected_columns_ = util::BitSet(2);
Expand Down
Loading
Loading