Skip to content

Commit

Permalink
Remove has_common_valid_type from merge function
Browse files Browse the repository at this point in the history
  • Loading branch information
Vasil Pashov committed Sep 11, 2024
1 parent 495ec6d commit 3586eac
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 90 deletions.
8 changes: 6 additions & 2 deletions cpp/arcticdb/column_store/column.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,12 @@ class Column {
template<typename T>
requires std::integral<T> || std::floating_point<T>
void set_scalar(ssize_t row_offset, T val) {
util::check(sizeof(T) == get_type_size(type_.data_type()), "Type mismatch in set_scalar, expected {}",
get_type_size(type_.data_type()));
util::check(
sizeof(T) == get_type_size(type_.data_type()),
"Type mismatch in set_scalar, expected {} byte scalar got {} byte scalar",
get_type_size(type_.data_type()),
sizeof(T)
);

auto prev_logical_row = last_logical_row_;
last_logical_row_ = row_offset;
Expand Down
48 changes: 25 additions & 23 deletions cpp/arcticdb/column_store/memory_segment_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,31 +67,31 @@ class SegmentInMemoryImpl {

template<class Callable>
auto visit(Callable &&c) const {
return entity::visit_field(parent_->descriptor().field(column_id_), [that=this, c=std::forward<Callable>(c)](auto type_desc_tag) {
return entity::visit_field(parent_->descriptor().field(column_id_), [this, c=std::forward<Callable>(c)](auto type_desc_tag) {
using RawType = typename std::decay_t<decltype(type_desc_tag)>::DataTypeTag::raw_type;
return c(that->parent_->scalar_at<RawType>(that->row_id_, that->column_id_));
return c(parent_->scalar_at<RawType>(row_id_, column_id_));
});
}

template<class Callable>
auto visit_string(Callable &&c) const {
return entity::visit_field(parent_->descriptor().field(column_id_), [that=this, c = std::forward<Callable>(c)](auto type_desc_tag) {
return entity::visit_field(parent_->descriptor().field(column_id_), [this, c = std::forward<Callable>(c)](auto type_desc_tag) {
using DTT = typename std::decay_t<decltype(type_desc_tag)>::DataTypeTag;
if constexpr(is_sequence_type(DTT::data_type))
return c(that->parent_->string_at(that->row_id_, position_t(that->column_id_)));
return c(parent_->string_at(row_id_, position_t(column_id_)));
});
}

template<class Callable>
auto visit_field(Callable &&c) const {
const auto& field = parent_->descriptor().field(column_id_);
return entity::visit_field(parent_->descriptor().field(column_id_), [&field, that=this, c = std::forward<Callable>(c)](auto type_desc_tag) {
return entity::visit_field(field, [&field, this, c = std::forward<Callable>(c)](auto type_desc_tag) {
using DataTypeTag = typename std::decay_t<decltype(type_desc_tag)>::DataTypeTag;
using RawType = typename DataTypeTag::raw_type;
if constexpr (is_sequence_type(DataTypeTag::data_type))
return c(that->parent_->string_at(that->row_id_, position_t(that->column_id_)), std::string_view{field.name()}, field.type());
return c(parent_->string_at(row_id_, position_t(column_id_)), std::string_view{field.name()}, type_desc_tag);
else if constexpr (is_numeric_type(DataTypeTag::data_type) || is_bool_type(DataTypeTag::data_type))
return c(that->parent_->scalar_at<RawType>(that->row_id_, that->column_id_), std::string_view{field.name()}, field.type());
return c(parent_->scalar_at<RawType>(row_id_, column_id_), std::string_view{field.name()}, type_desc_tag);
else if constexpr(is_empty_type(DataTypeTag::data_type))
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("visit_field does not support empty-type columns");
else
Expand All @@ -101,9 +101,9 @@ class SegmentInMemoryImpl {

template<class Callable>
auto visit(Callable &&c) {
return entity::visit_field(parent_->descriptor().field(column_id_), [that=this, c=std::forward<Callable>(c)](auto type_desc_tag) {
return entity::visit_field(parent_->descriptor().field(column_id_), [this, c=std::forward<Callable>(c)](auto type_desc_tag) {
using RawType = typename std::decay_t<decltype(type_desc_tag)>::DataTypeTag::raw_type;
return c(that->parent_->reference_at<RawType>(that->row_id_, that->column_id_));
return c(parent_->reference_at<RawType>(row_id_, column_id_));
});
}

Expand Down Expand Up @@ -454,18 +454,21 @@ class SegmentInMemoryImpl {
});
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
void set_scalar(position_t idx, T val) {
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_scalar(position_t idx, T val) {
ARCTICDB_TRACE(log::version(), "Segment setting scalar {} at row {} column {}", val, row_id_ + 1, idx);
column(idx).set_scalar(row_id_ + 1, val);
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
void set_external_block(position_t idx, T *val, size_t size) {
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_external_block(position_t idx, T *val, size_t size) {
column_unchecked(idx).set_external_block(row_id_ + 1, val, size);
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_sparse_block(position_t idx, T *val, size_t rows_to_write) {
column_unchecked(idx).set_sparse_block(row_id_ + 1, val, rows_to_write);
}
Expand All @@ -478,23 +481,22 @@ class SegmentInMemoryImpl {
column_unchecked(idx).set_sparse_block(std::move(buffer), std::move(shapes), std::move(bitset));
}

template<class T, std::enable_if_t<std::is_same_v<std::decay_t<T>, std::string>, int> = 0>
void set_scalar(position_t idx, T val) {
template<class T>
requires std::same_as<std::decay_t<T>, std::string>
void set_scalar(position_t idx, const T& val) {
set_string(idx, val);
}

template<class T, template<class> class Tensor, std::enable_if_t<
std::is_integral_v<T> || std::is_floating_point_v<T>,
int> = 0>
void set_array(position_t pos, Tensor<T> &val) {
template<class T, template<class> class Tensor>
requires std::integral<T> || std::floating_point<T>
void set_array(position_t pos, Tensor<T> &val) {
magic_.check();
ARCTICDB_SAMPLE(MemorySegmentSetArray, 0)
column_unchecked(pos).set_array(row_id_ + 1, val);
}

template<class T, std::enable_if_t<
std::is_integral_v<T> || std::is_floating_point_v<T>,
int> = 0>
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_array(position_t pos, py::array_t<T>& val) {
magic_.check();
ARCTICDB_SAMPLE(MemorySegmentSetArray, 0)
Expand Down
76 changes: 60 additions & 16 deletions cpp/arcticdb/processing/clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ std::vector<EntityId> SortClause::process(std::vector<EntityId>&& entity_ids) co
return push_entities(component_manager_, std::move(proc));
}

template<typename IndexType, typename DensityPolicy, typename QueueType>
template<typename IndexType, typename DensityPolicy, typename QueueType, bool dynamic_schema>
void merge_impl(
std::shared_ptr<ComponentManager> component_manager,
std::vector<std::vector<EntityId>>& ret,
Expand All @@ -907,18 +907,48 @@ void merge_impl(
ret.emplace_back(push_entities(component_manager, ProcessingUnit{std::forward<SegmentInMemory>(segment), row_range, col_range}));
};

using AggregatorType = stream::Aggregator<IndexType, stream::DynamicSchema, SegmentationPolicy, DensityPolicy>;
using Schema = std::conditional_t<dynamic_schema, stream::DynamicSchema, stream::FixedSchema>;
using AggregatorType = stream::Aggregator<IndexType, Schema, SegmentationPolicy, DensityPolicy>;

AggregatorType agg{
stream::DynamicSchema{stream_descriptor, index},
Schema{stream_descriptor, index},
std::move(func),
std::move(segmentation_policy),
stream_descriptor,
std::nullopt
};

stream::do_merge<IndexType, AggregatorType, decltype(input_streams)>(
input_streams, agg, add_symbol_column);
stream::do_merge<IndexType, AggregatorType, decltype(input_streams)>(input_streams, agg, add_symbol_column);
}

MergeClause::MergeClause(
stream::Index index,
const stream::VariantColumnPolicy &density_policy,
const StreamId& stream_id,
const StreamDescriptor& stream_descriptor,
bool dynamic_schema) :
index_(std::move(index)),
density_policy_(density_policy),
stream_id_(stream_id),
stream_descriptor_(stream_descriptor),
dynamic_schema_(dynamic_schema) {
clause_info_.requires_repartition_ = true;
}

void MergeClause::set_processing_config(const ProcessingConfig&) {}

void MergeClause::set_component_manager(std::shared_ptr<ComponentManager> component_manager) {
component_manager_ = std::move(component_manager);
}

const ClauseInfo& MergeClause::clause_info() const {
return clause_info_;
}

std::vector<std::vector<size_t>> MergeClause::structure_for_processing(
std::vector<RangesAndKey>& ranges_and_keys,
size_t) const {
return structure_all_together(ranges_and_keys);
}

// MergeClause receives a list of DataFrames as input and merge them into a single one where all
Expand Down Expand Up @@ -972,17 +1002,31 @@ std::optional<std::vector<std::vector<EntityId>>> MergeClause::repartition(std::
const RowRange row_range{min_start_row, max_end_row};
const ColRange col_range{min_start_col, max_end_col};
std::vector<std::vector<EntityId>> ret;
std::visit(
[this, &ret, &input_streams, &comp=compare, stream_id=stream_id_, &row_range, &col_range](auto idx, auto density) {
merge_impl<decltype(idx), decltype(density), decltype(input_streams)>(component_manager_,
ret,
input_streams,
add_symbol_column_,
row_range,
col_range,
idx,
stream_descriptor_);
}, index_, density_policy_);
std::visit([this, &ret, &input_streams, &comp=compare, stream_id=stream_id_, &row_range, &col_range](auto idx, auto density) {
if (dynamic_schema_) {
merge_impl<decltype(idx), decltype(density), decltype(input_streams), true>(
component_manager_,
ret,
input_streams,
add_symbol_column_,
row_range,
col_range,
idx,
stream_descriptor_
);
} else {
merge_impl<decltype(idx), decltype(density), decltype(input_streams), false>(
component_manager_,
ret,
input_streams,
add_symbol_column_,
row_range,
col_range,
idx,
stream_descriptor_
);
}
}, index_, density_policy_);
return ret;
}

Expand Down
34 changes: 12 additions & 22 deletions cpp/arcticdb/processing/clause.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,42 +572,32 @@ struct MergeClause {
stream::Index index_;
stream::VariantColumnPolicy density_policy_;
StreamId stream_id_;
bool add_symbol_column_ = false;
StreamId target_id_;
StreamDescriptor stream_descriptor_;
bool add_symbol_column_ = false;
bool dynamic_schema_;

MergeClause(
stream::Index index,
const stream::VariantColumnPolicy &density_policy,
const StreamId& stream_id,
const StreamDescriptor& stream_descriptor) :
index_(std::move(index)),
density_policy_(density_policy),
stream_id_(stream_id),
stream_descriptor_(stream_descriptor) {
clause_info_.requires_repartition_ = true;
}
stream::Index index,
const stream::VariantColumnPolicy& density_policy,
const StreamId& stream_id,
const StreamDescriptor& stream_descriptor,
bool dynamic_schema
);

[[nodiscard]] std::vector<std::vector<size_t>> structure_for_processing(
std::vector<RangesAndKey>& ranges_and_keys,
size_t) {
return structure_all_together(ranges_and_keys);
}
size_t) const;

[[nodiscard]] std::vector<EntityId> process(std::vector<EntityId>&& entity_ids) const;

[[nodiscard]] std::optional<std::vector<std::vector<EntityId>>> repartition(std::vector<std::vector<EntityId>>&& entity_ids_vec) const;

[[nodiscard]] const ClauseInfo& clause_info() const {
return clause_info_;
}
[[nodiscard]] const ClauseInfo& clause_info() const;

void set_processing_config(ARCTICDB_UNUSED const ProcessingConfig& processing_config) {
}
void set_processing_config(const ProcessingConfig& processing_config);

void set_component_manager(std::shared_ptr<ComponentManager> component_manager) {
component_manager_ = component_manager;
}
void set_component_manager(std::shared_ptr<ComponentManager> component_manager);
};

struct ColumnStatsGenerationClause {
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/processing/test/benchmark_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void time_merge_on_segments(const std::vector<SegmentInMemory> &segments, benchm
auto stream_id = StreamId("Merge");
StreamDescriptor descriptor{};
descriptor.add_field(FieldRef{make_scalar_type(DataType::NANOSECONDS_UTC64),"time"});
MergeClause merge_clause{TimeseriesIndex{"time"}, DenseColumnPolicy{}, stream_id, descriptor};
MergeClause merge_clause{TimeseriesIndex{"time"}, DenseColumnPolicy{}, stream_id, descriptor, false};
merge_clause.set_component_manager(component_manager);
state.ResumeTiming();

Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/processing/test/test_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ TEST(Clause, Merge) {
StreamDescriptor descriptor{};
descriptor.set_index(static_cast<IndexDescriptorImpl>(TimeseriesIndex::default_index()));
descriptor.add_field(FieldRef{make_scalar_type(DataType::NANOSECONDS_UTC64),"time"});
MergeClause merge_clause{TimeseriesIndex{"time"}, SparseColumnPolicy{}, stream_id, descriptor};
MergeClause merge_clause{TimeseriesIndex{"time"}, SparseColumnPolicy{}, stream_id, descriptor, false};
merge_clause.set_component_manager(component_manager);

auto seg = get_standard_timeseries_segment(std::get<StringId>(stream_id), num_rows);
Expand Down
1 change: 0 additions & 1 deletion cpp/arcticdb/storage/library_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <arcticdb/storage/library.hpp>
#include <arcticdb/storage/library_path.hpp>
#include <arcticdb/storage/storage_override.hpp>
#include <arcticdb/pipeline/index_segment_reader.hpp>


namespace arcticdb::storage {
Expand Down
Loading

0 comments on commit 3586eac

Please sign in to comment.