diff --git a/.gitmodules b/.gitmodules index 7c009e8039..ef171067e6 100644 --- a/.gitmodules +++ b/.gitmodules @@ -16,3 +16,6 @@ [submodule "cpp/vcpkg"] path = cpp/vcpkg url = https://github.com/microsoft/vcpkg.git +[submodule "cpp/third_party/entt"] + path = cpp/third_party/entt + url = https://github.com/skypjack/entt.git diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index 171945aa7f..c8cf7264c5 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -639,6 +639,7 @@ if (WIN32) ) endif() +add_compile_definitions(ENTT_ID_TYPE=std::uint64_t) set (arcticdb_core_libraries pybind11::module # Transitively includes Python::Module or Python::Python as appropriate @@ -657,6 +658,7 @@ set (arcticdb_core_libraries Folly::folly # Transitively includes: double-conversion, gflags, glog, libevent, libssl, libcrypto, libiberty, libsodium Azure::azure-identity Azure::azure-storage-blobs + EnTT::EnTT ) if(${ARCTICDB_PYTHON_EXPLICIT_LINK}) diff --git a/cpp/arcticdb/processing/clause.cpp b/cpp/arcticdb/processing/clause.cpp index 6994c89c37..683dd7134a 100644 --- a/cpp/arcticdb/processing/clause.cpp +++ b/cpp/arcticdb/processing/clause.cpp @@ -72,62 +72,30 @@ std::vector> structure_all_together(std::vector component_manager, - std::vector&& entity_ids, - bool include_atom_keys, - bool include_initial_expected_get_calls) { - ProcessingUnit res; - res.set_segments(component_manager->get>(entity_ids)); - res.set_row_ranges(component_manager->get>(entity_ids)); - res.set_col_ranges(component_manager->get>(entity_ids)); - - if (include_atom_keys) { - res.set_atom_keys(component_manager->get>(entity_ids)); - } - if (include_initial_expected_get_calls) { - std::vector segment_initial_expected_get_calls; - segment_initial_expected_get_calls.reserve(entity_ids.size()); - for (auto entity_id: entity_ids) { - segment_initial_expected_get_calls.emplace_back(component_manager->get_initial_expected_get_calls>(entity_id)); - } - res.set_segment_initial_expected_get_calls(std::move(segment_initial_expected_get_calls)); - } - return res; -} - /* * On exit from a clause, we need to push the elements of the newly created processing unit's into the component * manager. These will either be used by the next clause in the pipeline, or to present the output dataframe back to * the user if this is the final clause in the pipeline. - * Elements that share an index in the optional vectors of a ProcessingUnit correspond to the same entity, and so are - * pushed into the component manager with the same ID. */ -std::vector push_entities(std::shared_ptr component_manager, ProcessingUnit&& proc) { - std::optional> res; - if (proc.segments_.has_value()) { - res = std::make_optional>(component_manager->add(std::move(*proc.segments_))); - } - if (proc.row_ranges_.has_value()) { - res = component_manager->add(std::move(*proc.row_ranges_), res); - } - if (proc.col_ranges_.has_value()) { - res = component_manager->add(std::move(*proc.col_ranges_), res); - } - if (proc.atom_keys_.has_value()) { - res = component_manager->add(std::move(*proc.atom_keys_), res); - } - internal::check(res.has_value(), "Unexpected empty result in push_entities"); +std::vector push_entities(ComponentManager& component_manager, ProcessingUnit&& proc, EntityFetchCount entity_fetch_count) { + std::vector entity_fetch_counts(proc.segments_->size(), entity_fetch_count); + std::vector ids; if (proc.bucket_.has_value()) { - component_manager->add(std::vector(res->size(), *proc.bucket_), res); + std::vector bucket_ids(proc.segments_->size(), *proc.bucket_); + ids = component_manager.add_entities( + std::move(*proc.segments_), + std::move(*proc.row_ranges_), + std::move(*proc.col_ranges_), + std::move(entity_fetch_counts), + std::move(bucket_ids)); + } else { + ids = component_manager.add_entities( + std::move(*proc.segments_), + std::move(*proc.row_ranges_), + std::move(*proc.col_ranges_), + std::move(entity_fetch_counts)); } - return *res; + return ids; } std::vector flatten_entities(std::vector>&& entity_ids_vec) { @@ -220,7 +188,7 @@ std::vector FilterClause::process(std::vector&& entity_ids) if (entity_ids.empty()) { return {}; } - auto proc = gather_entities(component_manager_, std::move(entity_ids)); + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager_, std::move(entity_ids)); proc.set_expression_context(expression_context_); auto variant_data = proc.get(expression_context_->root_node_name_); std::vector output; @@ -228,7 +196,7 @@ std::vector FilterClause::process(std::vector&& entity_ids) [&proc, &output, this](util::BitSet &bitset) { if (bitset.count() > 0) { proc.apply_filter(std::move(bitset), optimisation_); - output = push_entities(component_manager_, std::move(proc)); + output = push_entities(*component_manager_, std::move(proc)); } else { log::version().debug("Filter returned empty result"); } @@ -237,7 +205,7 @@ std::vector FilterClause::process(std::vector&& entity_ids) log::version().debug("Filter returned empty result"); }, [&output, &proc, this](FullResult) { - output = push_entities(component_manager_, std::move(proc)); + output = push_entities(*component_manager_, std::move(proc)); }, [](const auto &) { util::raise_rte("Expected bitset from filter clause"); @@ -253,7 +221,7 @@ std::vector ProjectClause::process(std::vector&& entity_ids) if (entity_ids.empty()) { return {}; } - auto proc = gather_entities(component_manager_, std::move(entity_ids)); + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager_, std::move(entity_ids)); proc.set_expression_context(expression_context_); auto variant_data = proc.get(expression_context_->root_node_name_); std::vector output; @@ -265,11 +233,11 @@ std::vector ProjectClause::process(std::vector&& entity_ids) proc.segments_->back()->add_column(scalar_field(data_type, name), col.column_); ++proc.col_ranges_->back()->second; - output = push_entities(component_manager_, std::move(proc)); + output = push_entities(*component_manager_, std::move(proc)); }, [&proc, &output, this](const EmptyResult &) { if (expression_context_->dynamic_schema_) - output = push_entities(component_manager_, std::move(proc)); + output = push_entities(*component_manager_, std::move(proc)); else util::raise_rte("Cannot project from empty column with static schema"); }, @@ -320,7 +288,7 @@ std::vector AggregationClause::process(std::vector&& entity_ if (entity_ids.empty()) { return {}; } - auto proc = gather_entities(component_manager_, std::move(entity_ids)); + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager_, std::move(entity_ids)); auto row_slices = split_by_row_slice(std::move(proc)); // Sort procs following row range descending order, as we are going to iterate through them backwards @@ -502,7 +470,7 @@ std::vector AggregationClause::process(std::vector&& entity_ seg.set_string_pool(string_pool); seg.set_row_id(num_unique - 1); - return push_entities(component_manager_, ProcessingUnit(std::move(seg))); + return push_entities(*component_manager_, ProcessingUnit(std::move(seg))); } [[nodiscard]] std::string AggregationClause::to_string() const { @@ -662,13 +630,13 @@ std::vector ResampleClause::process(std::vector, std::shared_ptr, std::shared_ptr, EntityFetchCount>(*component_manager_, std::move(entity_ids)); auto row_slices = split_by_row_slice(std::move(proc)); - // If the expected get calls for the segments in the first row slice are 2, the first bucket overlapping this row + // If the entity fetch counts for the entities in the first row slice are 2, the first bucket overlapping this row // slice is being computed by the call to process dealing with the row slices above these. Otherwise, this call // should do it const auto& front_slice = row_slices.front(); - bool responsible_for_first_overlapping_bucket = front_slice.segment_initial_expected_get_calls_->at(0) == 1; + bool responsible_for_first_overlapping_bucket = front_slice.entity_fetch_count_->at(0) == 1; // Find the iterators into bucket_boundaries_ of the start of the first and the end of the last bucket this call to process is // responsible for calculating // All segments in a given row slice contain the same index column, so just grab info from the first one @@ -720,7 +688,7 @@ std::vector ResampleClause::process(std::vectortype().data_type(), aggregator.get_output_column_name().value), aggregated_column); } seg.set_row_data(output_index_column->row_count() - 1); - return push_entities(component_manager_, ProcessingUnit(std::move(seg), std::move(output_row_range))); + return push_entities(*component_manager_, ProcessingUnit(std::move(seg), std::move(output_row_range))); } template @@ -825,7 +793,7 @@ template struct ResampleClause; if (entity_ids.empty()) { return {}; } - auto proc = gather_entities(component_manager_, std::move(entity_ids)); + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager_, std::move(entity_ids)); size_t min_start_row = std::numeric_limits::max(); size_t max_end_row = 0; size_t min_start_col = std::numeric_limits::max(); @@ -845,7 +813,7 @@ template struct ResampleClause; } std::vector output; if (output_seg.has_value()) { - output = push_entities(component_manager_, + output = push_entities(*component_manager_, ProcessingUnit(std::move(*output_seg), RowRange{min_start_row, max_end_row}, ColRange{min_start_col, max_end_col})); @@ -857,7 +825,7 @@ std::vector SplitClause::process(std::vector&& entity_ids) c if (entity_ids.empty()) { return {}; } - auto proc = gather_entities(component_manager_, std::move(entity_ids)); + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager_, std::move(entity_ids)); std::vector ret; for (auto&& [idx, seg]: folly::enumerate(proc.segments_.value())) { auto split_segs = seg->split(rows_); @@ -865,7 +833,7 @@ std::vector SplitClause::process(std::vector&& entity_ids) c size_t end_row = 0; for (auto&& split_seg : split_segs) { end_row = start_row + split_seg.row_count(); - auto new_entity_ids = push_entities(component_manager_, + auto new_entity_ids = push_entities(*component_manager_, ProcessingUnit(std::move(split_seg), RowRange(start_row, end_row), std::move(*proc.col_ranges_->at(idx)))); @@ -880,13 +848,13 @@ std::vector SortClause::process(std::vector&& entity_ids) co if (entity_ids.empty()) { return {}; } - auto proc = gather_entities(component_manager_, std::move(entity_ids)); + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager_, std::move(entity_ids)); for (auto& seg: proc.segments_.value()) { // This modifies the segment in place, which goes against the ECS principle of all entities being immutable // Only used by SortMerge right now and so this is fine, although it would not generalise well seg->sort(column_); } - return push_entities(component_manager_, std::move(proc)); + return push_entities(*component_manager_, std::move(proc)); } template @@ -905,7 +873,7 @@ void merge_impl( auto func = [&component_manager, &ret, &col_range, start_row = row_range.first](auto&& segment) mutable { const size_t end_row = start_row + segment.row_count(); - ret.emplace_back(push_entities(component_manager, ProcessingUnit{std::forward(segment), RowRange{start_row, end_row}, col_range})); + ret.emplace_back(push_entities(*component_manager, ProcessingUnit{std::forward(segment), RowRange{start_row, end_row}, col_range})); start_row = end_row; }; @@ -966,7 +934,7 @@ std::optional>> MergeClause::repartition(std:: // first one and can use structure_for_processing. Ideally // merging should be parallel like resampling auto entity_ids = flatten_entities(std::move(entity_ids_vec)); - auto proc = gather_entities(component_manager_, std::move(entity_ids)); + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager_, std::move(entity_ids)); auto compare = [](const std::unique_ptr &left, @@ -1037,7 +1005,7 @@ std::vector ColumnStatsGenerationClause::process(std::vector internal::check( !entity_ids.empty(), "ColumnStatsGenerationClause::process does not make sense with no processing units"); - auto proc = gather_entities(component_manager_, std::move(entity_ids), true); + auto proc = gather_entities, std::shared_ptr, std::shared_ptr, std::shared_ptr>(*component_manager_, std::move(entity_ids)); std::vector aggregators_data; internal::check( static_cast(column_stats_aggregators_), @@ -1091,7 +1059,7 @@ std::vector ColumnStatsGenerationClause::process(std::vector seg.concatenate(agg_data->finalize(column_stats_aggregators_->at(agg_data.index).get_output_column_names())); } seg.set_row_id(0); - return push_entities(component_manager_, ProcessingUnit(std::move(seg))); + return push_entities(*component_manager_, ProcessingUnit(std::move(seg))); } std::vector> RowRangeClause::structure_for_processing( @@ -1107,7 +1075,7 @@ std::vector RowRangeClause::process(std::vector&& entity_ids if (entity_ids.empty()) { return {}; } - auto proc = gather_entities(component_manager_, std::move(entity_ids)); + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager_, std::move(entity_ids)); std::vector output; for (auto&& [idx, row_range]: folly::enumerate(proc.row_ranges_.value())) { if ((start_ > row_range->start() && start_ < row_range->end()) || @@ -1129,7 +1097,7 @@ std::vector RowRangeClause::process(std::vector&& entity_ids proc.segments_->at(idx) = std::make_shared(std::move(truncated_segment)); } // else all rows in this segment are required, do nothing } - return push_entities(component_manager_, std::move(proc)); + return push_entities(*component_manager_, std::move(proc)); } void RowRangeClause::set_processing_config(const ProcessingConfig& processing_config) { @@ -1194,7 +1162,7 @@ std::vector DateRangeClause::process(std::vector &&entity_id if (entity_ids.empty()) { return {}; } - auto proc = gather_entities(component_manager_, std::move(entity_ids), true); + auto proc = gather_entities, std::shared_ptr, std::shared_ptr, std::shared_ptr>(*component_manager_, std::move(entity_ids)); std::vector output; // We are only interested in the index, which is in every SegmentInMemory in proc.segments_, so just use the first auto row_range = proc.row_ranges_->at(0); @@ -1210,7 +1178,7 @@ std::vector DateRangeClause::process(std::vector &&entity_id } proc.truncate(start_row, end_row); } // else all rows in the processing unit are required, do nothing - return push_entities(component_manager_, std::move(proc)); + return push_entities(*component_manager_, std::move(proc)); } std::string DateRangeClause::to_string() const { diff --git a/cpp/arcticdb/processing/clause.hpp b/cpp/arcticdb/processing/clause.hpp index 01df543966..e5ae002eda 100644 --- a/cpp/arcticdb/processing/clause.hpp +++ b/cpp/arcticdb/processing/clause.hpp @@ -113,12 +113,35 @@ std::vector> structure_by_column_slice(std::vector> structure_all_together(std::vector& ranges_and_keys); -ProcessingUnit gather_entities(std::shared_ptr component_manager, - std::vector&& entity_ids, - bool include_atom_keys = false, - bool include_initial_expected_get_calls = false); +/* + * On entry to a clause, construct ProcessingUnits from the input entity IDs. These will either be provided by the + * structure_for_processing method for the first clause in the pipeline, or by the previous clause for all subsequent + * clauses. + */ +template +ProcessingUnit gather_entities(ComponentManager& component_manager, std::vector&& entity_ids) { + ProcessingUnit res; + auto components = component_manager.get_entities(entity_ids); + ([&]{ + auto component = std::move(std::get>(components)); + if constexpr (std::is_same_v>) { + res.set_segments(std::move(component)); + } else if constexpr (std::is_same_v>) { + res.set_row_ranges(std::move(component)); + } else if constexpr (std::is_same_v>) { + res.set_col_ranges(std::move(component)); + } else if constexpr (std::is_same_v>) { + res.set_atom_keys(std::move(component)); + } else if constexpr (std::is_same_v) { + res.set_entity_fetch_count(std::move(component)); + } else { + static_assert(sizeof(Args) == 0, "Unexpected component type provided in gather_entities"); + } + }(), ...); + return res; +} -std::vector push_entities(std::shared_ptr component_manager, ProcessingUnit&& proc); +std::vector push_entities(ComponentManager& component_manager, ProcessingUnit&& proc, EntityFetchCount entity_fetch_count=1); std::vector flatten_entities(std::vector>&& entity_ids_vec); @@ -271,14 +294,15 @@ struct PartitionClause { if (entity_ids.empty()) { return {}; } - auto proc = gather_entities(component_manager_, std::move(entity_ids)); + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager_, std::move(entity_ids)); std::vector partitioned_procs = partition_processing_segment( proc, ColumnName(grouping_column_), processing_config_.dynamic_schema_); std::vector output; for (auto &&partitioned_proc: partitioned_procs) { - std::vector proc_entity_ids = push_entities(component_manager_, std::move(partitioned_proc)); + // EntityFetchCount is 2, once for the repartition, and once for AggregationClause::process + std::vector proc_entity_ids = push_entities(*component_manager_, std::move(partitioned_proc), 2); output.insert(output.end(), proc_entity_ids.begin(), proc_entity_ids.end()); } return output; @@ -304,7 +328,7 @@ struct PartitionClause { // Experimentation shows flattening the entities into a single vector and a single call to // component_manager_->get is faster than not flattening and making multiple calls auto entity_ids = flatten_entities(std::move(entity_ids_vec)); - std::vector buckets{component_manager_->get(entity_ids)}; + auto [buckets] = component_manager_->get_entities(entity_ids); for (auto [idx, entity_id]: folly::enumerate(entity_ids)) { res[buckets[idx]].emplace_back(entity_id); } diff --git a/cpp/arcticdb/processing/component_manager.cpp b/cpp/arcticdb/processing/component_manager.cpp index 746b0215a4..5b413b6477 100644 --- a/cpp/arcticdb/processing/component_manager.cpp +++ b/cpp/arcticdb/processing/component_manager.cpp @@ -5,17 +5,27 @@ * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. */ + +#include + #include namespace arcticdb { -void ComponentManager::set_next_entity_id(EntityId id) { - next_entity_id_ = id; +std::vector ComponentManager::get_new_entity_ids(size_t count) { + std::vector ids(count); + std::lock_guard lock(mtx_); + registry_.create(ids.begin(), ids.end()); + return ids; } -EntityId ComponentManager::entity_id(const std::optional& id) { - // Do not use value_or as we do not want the side effect of fetch_add when id was provided by the caller - return id.has_value() ? *id : next_entity_id_.fetch_add(1); +void ComponentManager::erase_entity(EntityId id) { + // Ideally would call registry_.destroy(id), or at least registry_.erase>(id) + // at this point. However, they are both slower than this, so just decrement the ref count of the only + // sizeable component, so that when the shared pointer goes out of scope in the calling function, the + // memory is freed + registry_.get>(id).reset(); } + } // namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/processing/component_manager.hpp b/cpp/arcticdb/processing/component_manager.hpp index 6b7d6e22c5..c7410cb53f 100644 --- a/cpp/arcticdb/processing/component_manager.hpp +++ b/cpp/arcticdb/processing/component_manager.hpp @@ -9,212 +9,111 @@ #include +#include + #include #include namespace arcticdb { using namespace pipelines; -using EntityId = size_t; +using EntityId = entt::entity; +using EntityFetchCount = uint64_t; using bucket_id = uint8_t; +using namespace entt::literals; + +constexpr auto remaining_entity_fetch_count_id = "remaining_entity_fetch_count"_hs; + class ComponentManager { public: ComponentManager() = default; ARCTICDB_NO_MOVE_OR_COPY(ComponentManager) - void set_next_entity_id(EntityId id); - - template - std::vector add(std::vector&& components, const std::optional>& ids=std::nullopt) { - std::vector insertion_ids; - if (ids.has_value()) { - insertion_ids = *ids; - } else { - insertion_ids.reserve(components.size()); - for (size_t idx = 0; idx < components.size(); ++idx) { - insertion_ids.emplace_back(next_entity_id_.fetch_add(1)); + std::vector get_new_entity_ids(size_t count); + + // Add a single entity with the components defined by args + template + void add_entity(EntityId id, Args... args) { + std::lock_guard lock(mtx_); + ([&]{ + registry_.emplace(id, args); + // Store the initial entity fetch count component as a "first-class" entity, accessible by + // registry_.get(id), as this is external facing (used by resample) + // The remaining entity fetch count below will be decremented each time an entity is fetched, but is never + // accessed externally, so make this a named component. + if constexpr (std::is_same_v) { + auto&& remaining_entity_fetch_count_registry = registry_.storage(remaining_entity_fetch_count_id); + remaining_entity_fetch_count_registry.emplace(id, args); } - } - - if constexpr(std::is_same_v>) { - segment_map_.add(insertion_ids, std::move(components)); - } else if constexpr(std::is_same_v>) { - row_range_map_.add(insertion_ids, std::move(components)); - } else if constexpr(std::is_same_v>) { - col_range_map_.add(insertion_ids, std::move(components)); - } else if constexpr(std::is_same_v>) { - atom_key_map_.add(insertion_ids, std::move(components)); - } else if constexpr(std::is_same_v) { - bucket_map_.add(insertion_ids, std::move(components)); - } else { - // Hacky workaround for static_assert(false) not being allowed - // See https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2022/p2593r0.html - static_assert(sizeof(T) == 0, "Unsupported component type passed to ComponentManager::add"); - } - return insertion_ids; + }(), ...); } - template - EntityId add(T component, std::optional id=std::nullopt, std::optional expected_get_calls=std::nullopt) { - auto insertion_id = entity_id(id); - if constexpr(std::is_same_v>) { - segment_map_.add(insertion_id, std::move(component), expected_get_calls); - } else if constexpr(std::is_same_v>) { - row_range_map_.add(insertion_id, std::move(component)); - } else if constexpr(std::is_same_v>) { - col_range_map_.add(insertion_id, std::move(component)); - } else if constexpr(std::is_same_v>) { - atom_key_map_.add(insertion_id, std::move(component)); - } else if constexpr(std::is_same_v) { - bucket_map_.add(insertion_id, std::move(component)); - } else { - // Hacky workaround for static_assert(false) not being allowed - // See https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2022/p2593r0.html - static_assert(sizeof(T) == 0, "Unsupported component type passed to ComponentManager::add"); - } - return insertion_id; - } - - template - std::vector get(const std::vector& ids) { - if constexpr(std::is_same_v>) { - return segment_map_.get(ids); - } else if constexpr(std::is_same_v>) { - return row_range_map_.get(ids); - } else if constexpr(std::is_same_v>) { - return col_range_map_.get(ids); - } else if constexpr(std::is_same_v>) { - return atom_key_map_.get(ids); - } else if constexpr(std::is_same_v) { - return bucket_map_.get(ids); - } else { - // Hacky workaround for static_assert(false) not being allowed - // See https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2022/p2593r0.html - static_assert(sizeof(T) == 0, "Unsupported component type passed to ComponentManager::get"); - } - } - - template - uint64_t get_initial_expected_get_calls(EntityId id) { - // Only applies to ComponentMaps tracking expected get calls - if constexpr(std::is_same_v>) { - return segment_map_.get_initial_expected_get_calls(id); - } else { - // Hacky workaround for static_assert(false) not being allowed - // See https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2022/p2593r0.html - static_assert(sizeof(T) == 0, "Unsupported component type passed to ComponentManager::get_initial_expected_get_calls"); - } - } - -private: - template - class ComponentMap { - public: - explicit ComponentMap(std::string&& entity_type, bool track_expected_gets): - entity_type_(std::move(entity_type)), - opt_expected_get_calls_map_(track_expected_gets ? std::make_optional>() : std::nullopt), - opt_expected_get_calls_initial_map_(track_expected_gets ? std::make_optional>() : std::nullopt){ - }; - ARCTICDB_NO_MOVE_OR_COPY(ComponentMap) - - void add(const std::vector& ids, - std::vector&& entities) { - std::lock_guard lock(mtx_); - for (auto [idx, id]: folly::enumerate(ids)) { - ARCTICDB_DEBUG(log::storage(), "Adding {} with id {}", entity_type_, id); - internal::check(map_.try_emplace(id, std::move(entities[idx])).second, - "Failed to insert {} with ID {}, already exists", - entity_type_, id); - if (opt_expected_get_calls_map_.has_value()) { - internal::check( - opt_expected_get_calls_map_->try_emplace(id, 1).second, - "Failed to insert {} with ID {}, already exists", - entity_type_, id); - internal::check( - opt_expected_get_calls_initial_map_->try_emplace(id, 1).second, - "Failed to insert {} with ID {}, already exists", - entity_type_, id); - } + // Add a collection of entities. Each element of args should be a collection of components, all of which have the + // same number of elements + template + std::vector add_entities(Args... args) { + std::vector ids; + size_t entity_count{0}; + std::lock_guard lock(mtx_); + ([&]{ + if (entity_count == 0) { + // Reserve memory for the result on the first pass + entity_count = args.size(); + ids.resize(entity_count); + registry_.create(ids.begin(), ids.end()); + } else { + internal::check( + args.size() == entity_count, + "ComponentManager::add_entities received collections of differing lengths" + ); } - } - void add(EntityId id, T&& entity, std::optional expected_get_calls=std::nullopt) { - std::lock_guard lock(mtx_); - ARCTICDB_DEBUG(log::storage(), "Adding {} with id {}", entity_type_, id); - internal::check(map_.try_emplace(id, std::move(entity)).second, - "Failed to insert {} with ID {}, already exists", - entity_type_, id); - if (opt_expected_get_calls_map_.has_value()) { - internal::check(expected_get_calls.has_value() && *expected_get_calls > 0, - "Failed to insert {} with ID {}, must provide expected gets", - entity_type_, id); - internal::check(opt_expected_get_calls_map_->try_emplace(id, *expected_get_calls).second, - "Failed to insert {} with ID {}, already exists", - entity_type_, id); - internal::check(opt_expected_get_calls_initial_map_->try_emplace(id, *expected_get_calls).second, - "Failed to insert {} with ID {}, already exists", - entity_type_, id); + registry_.insert(ids.cbegin(), ids.cend(), args.begin()); + if constexpr (std::is_same_v) { + auto&& remaining_entity_fetch_count_registry = registry_.storage(remaining_entity_fetch_count_id); + remaining_entity_fetch_count_registry.insert(ids.cbegin(), ids.cend(), args.begin()); } - } - std::vector get(const std::vector& ids) { - std::vector res; - res.reserve(ids.size()); - std::lock_guard lock(mtx_); + }(), ...); + return ids; + } + + // Get a collection of entities. Returns a tuple of vectors, one for each component requested via Args + template + std::tuple...> get_entities(const std::vector& ids) { + std::vector> tuple_res; + tuple_res.reserve(ids.size()); + { + std::lock_guard lock(mtx_); + auto&& remaining_entity_fetch_count_registry = registry_.storage(remaining_entity_fetch_count_id); + // Using view.get theoretically and empirically faster than registry_.get + auto view = registry_.view(); for (auto id: ids) { - ARCTICDB_DEBUG(log::storage(), "Getting {} with id {}", entity_type_, id); - auto entity_it = map_.find(id); - internal::check(entity_it != map_.end(), - "Requested non-existent {} with ID {}", - entity_type_, id); - res.emplace_back(entity_it->second); - if (opt_expected_get_calls_map_.has_value()) { - auto expected_get_calls_it = opt_expected_get_calls_map_->find(id); - internal::check( - expected_get_calls_it != opt_expected_get_calls_map_->end(), - "Requested non-existent {} with ID {}", - entity_type_, id); - if (--expected_get_calls_it->second == 0) { - ARCTICDB_DEBUG(log::storage(), - "{} with id {} has been fetched the expected number of times, erasing from component manager", - entity_type_, id); - map_.erase(entity_it); - opt_expected_get_calls_map_->erase(expected_get_calls_it); - } + tuple_res.emplace_back(std::move(view.get(id))); + auto& remaining_entity_fetch_count = remaining_entity_fetch_count_registry.get(id); + // This entity will never be accessed again + if (--remaining_entity_fetch_count == 0) { + erase_entity(id); } } - return res; } - uint64_t get_initial_expected_get_calls(EntityId id) { - std::lock_guard lock(mtx_); - ARCTICDB_DEBUG(log::storage(), "Getting initial expected get calls of {} with id {}", entity_type_, id); - internal::check(opt_expected_get_calls_initial_map_.has_value(), - "Cannot get initial expected get calls for {} as they are not being tracked", entity_type_); - auto it = opt_expected_get_calls_initial_map_->find(id); - internal::check(it != opt_expected_get_calls_initial_map_->end(), - "Requested non-existent {} with ID {}", - entity_type_, id); - return it->second; + // Convert vector of tuples into tuple of vectors + std::tuple...> res; + ([&]{ + std::get>(res).reserve(ids.size()); + }(), ...); + for (auto&& tuple: tuple_res) { + ([&] { + std::get>(res).emplace_back(std::move(std::get(tuple))); + }(), ...); } - private: - // Just used for logging/exception messages - std::string entity_type_; - ankerl::unordered_dense::map map_; - // If not nullopt, tracks the number of calls to get for each entity id, and erases from maps when it has been - // called this many times - std::optional> opt_expected_get_calls_map_; - std::optional> opt_expected_get_calls_initial_map_; - std::mutex mtx_; - }; - - ComponentMap> segment_map_{"segment", true}; - ComponentMap> row_range_map_{"row range", false}; - ComponentMap> col_range_map_{"col range", false}; - ComponentMap> atom_key_map_{"atom key", false}; - ComponentMap bucket_map_{"bucket", false}; - - // The next ID to use when inserting elements into any of the maps - std::atomic next_entity_id_{0}; - EntityId entity_id(const std::optional& id=std::nullopt); + return res; + } + +private: + void erase_entity(EntityId id); + + entt::registry registry_; + std::mutex mtx_; }; } // namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/processing/processing_unit.cpp b/cpp/arcticdb/processing/processing_unit.cpp index 45ff293e9d..d1eb37a35b 100644 --- a/cpp/arcticdb/processing/processing_unit.cpp +++ b/cpp/arcticdb/processing/processing_unit.cpp @@ -109,7 +109,7 @@ std::vector split_by_row_slice(ProcessingUnit&& proc) { internal::check(input.segments_.has_value(), "split_by_row_slice needs Segments"); internal::check(input.row_ranges_.has_value(), "split_by_row_slice needs RowRanges"); internal::check(input.col_ranges_.has_value(), "split_by_row_slice needs ColRanges"); - auto include_expected_get_calls = input.segment_initial_expected_get_calls_.has_value(); + auto include_entity_fetch_count = input.entity_fetch_count_.has_value(); std::vector output; // Some clauses (e.g. AggregationClause) are lossy about row-ranges. We can assume that if all of the input column @@ -118,8 +118,8 @@ std::vector split_by_row_slice(ProcessingUnit&& proc) { output.reserve(input.segments_->size()); for (size_t idx = 0; idx < input.segments_->size(); ++idx) { ProcessingUnit proc_tmp(std::move(*input.segments_->at(idx)), std::move(*input.row_ranges_->at(idx)), std::move(*input.col_ranges_->at(idx))); - if (include_expected_get_calls) { - proc_tmp.set_segment_initial_expected_get_calls({input.segment_initial_expected_get_calls_->at(idx)}); + if (include_entity_fetch_count) { + proc_tmp.set_entity_fetch_count({input.entity_fetch_count_->at(idx)}); } output.emplace_back(std::move(proc_tmp)); } @@ -130,16 +130,16 @@ std::vector split_by_row_slice(ProcessingUnit&& proc) { it->second.segments_->emplace_back(input.segments_->at(idx)); it->second.row_ranges_->emplace_back(input.row_ranges_->at(idx)); it->second.col_ranges_->emplace_back(input.col_ranges_->at(idx)); - if (include_expected_get_calls) { - it->second.segment_initial_expected_get_calls_->emplace_back(input.segment_initial_expected_get_calls_->at(idx)); + if (include_entity_fetch_count) { + it->second.entity_fetch_count_->emplace_back(input.entity_fetch_count_->at(idx)); } } else { auto [inserted_it, _] = output_map.emplace(*row_range_ptr, ProcessingUnit{}); inserted_it->second.segments_.emplace(1, input.segments_->at(idx)); inserted_it->second.row_ranges_.emplace(1, input.row_ranges_->at(idx)); inserted_it->second.col_ranges_.emplace(1, input.col_ranges_->at(idx)); - if (include_expected_get_calls) { - inserted_it->second.segment_initial_expected_get_calls_.emplace(1, input.segment_initial_expected_get_calls_->at(idx)); + if (include_entity_fetch_count) { + inserted_it->second.entity_fetch_count_.emplace(1, input.entity_fetch_count_->at(idx)); } } } @@ -150,20 +150,20 @@ std::vector split_by_row_slice(ProcessingUnit&& proc) { } internal::check(!output.empty(), "Unexpected empty output in split_by_row_slice"); - if (include_expected_get_calls) { + if (include_entity_fetch_count) { // The expected get counts for all segments in a row slice should be the same // This should always be 1 or 2 for the first/last row slice, and 1 for all of the others for (auto row_slice = output.cbegin(); row_slice != output.cend(); ++row_slice) { - auto expected_get_calls = row_slice->segment_initial_expected_get_calls_->front(); - uint64_t max_expected_get_calls = row_slice == output.cbegin() || row_slice == std::prev(output.cend()) ? 2 : 1; - internal::check(0 < expected_get_calls && expected_get_calls <= max_expected_get_calls, - "expected_get_calls in split_by_row_slice should be 1 or 2, got {}", - expected_get_calls); + auto entity_fetch_count = row_slice->entity_fetch_count_->front(); + uint64_t max_entity_fetch_count = row_slice == output.cbegin() || row_slice == std::prev(output.cend()) ? 2 : 1; + internal::check(0 < entity_fetch_count && entity_fetch_count <= max_entity_fetch_count, + "entity_fetch_count in split_by_row_slice should be 1 or 2, got {}", + entity_fetch_count); internal::check( - std::all_of(row_slice->segment_initial_expected_get_calls_->begin(), - row_slice->segment_initial_expected_get_calls_->end(), - [&expected_get_calls](uint64_t i) { return i == expected_get_calls; }), - "All segments in same row slice should have same expected_get_calls in split_by_row_slice"); + std::all_of(row_slice->entity_fetch_count_->begin(), + row_slice->entity_fetch_count_->end(), + [&entity_fetch_count](uint64_t i) { return i == entity_fetch_count; }), + "All segments in same row slice should have same entity_fetch_count in split_by_row_slice"); } } diff --git a/cpp/arcticdb/processing/processing_unit.hpp b/cpp/arcticdb/processing/processing_unit.hpp index 0491bf7efa..98892afac9 100644 --- a/cpp/arcticdb/processing/processing_unit.hpp +++ b/cpp/arcticdb/processing/processing_unit.hpp @@ -51,7 +51,7 @@ namespace arcticdb { std::optional>> col_ranges_; std::optional>> atom_keys_; std::optional bucket_; - std::optional> segment_initial_expected_get_calls_; + std::optional> entity_fetch_count_; std::shared_ptr expression_context_; std::unordered_map computed_data_; @@ -90,8 +90,8 @@ namespace arcticdb { bucket_.emplace(bucket); } - void set_segment_initial_expected_get_calls(std::vector&& segment_initial_expected_get_calls) { - segment_initial_expected_get_calls_.emplace(segment_initial_expected_get_calls); + void set_entity_fetch_count(std::vector&& entity_fetch_count) { + entity_fetch_count_.emplace(entity_fetch_count); } void apply_filter(util::BitSet&& bitset, PipelineOptimisation optimisation); diff --git a/cpp/arcticdb/processing/test/benchmark_clause.cpp b/cpp/arcticdb/processing/test/benchmark_clause.cpp index ecd78ec98b..748faaf3e7 100644 --- a/cpp/arcticdb/processing/test/benchmark_clause.cpp +++ b/cpp/arcticdb/processing/test/benchmark_clause.cpp @@ -43,7 +43,7 @@ void time_merge_on_segments(const std::vector &segments, benchm std::vector entity_ids; for (auto& segment : segments){ auto proc_unit = ProcessingUnit{segment.clone()}; - entity_ids.push_back(push_entities(component_manager, std::move(proc_unit))[0]); + entity_ids.push_back(push_entities(*component_manager, std::move(proc_unit))[0]); } auto stream_id = StreamId("Merge"); diff --git a/cpp/arcticdb/processing/test/test_clause.cpp b/cpp/arcticdb/processing/test/test_clause.cpp index 442d5821a9..4cfb15cf69 100644 --- a/cpp/arcticdb/processing/test/test_clause.cpp +++ b/cpp/arcticdb/processing/test/test_clause.cpp @@ -52,7 +52,7 @@ TEST(Clause, PartitionEmptyColumn) { partition.set_component_manager(component_manager); auto proc_unit = ProcessingUnit{generate_groupby_testing_empty_segment(100, 10)}; - auto entity_ids = push_entities(component_manager, std::move(proc_unit)); + auto entity_ids = push_entities(*component_manager, std::move(proc_unit)); auto processed = partition.process(std::move(entity_ids)); ASSERT_TRUE(processed.empty()); @@ -73,9 +73,9 @@ TEST(Clause, AggregationEmptyColumn) { size_t num_rows{100}; size_t unique_grouping_values{10}; auto proc_unit = ProcessingUnit{generate_groupby_testing_segment(num_rows, unique_grouping_values)}; - auto entity_ids = push_entities(component_manager, std::move(proc_unit)); + auto entity_ids = push_entities(*component_manager, std::move(proc_unit)); - auto aggregated = gather_entities(component_manager, aggregation.process(std::move(entity_ids))); + auto aggregated = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, aggregation.process(std::move(entity_ids))); ASSERT_TRUE(aggregated.segments_.has_value()); auto segments = aggregated.segments_.value(); ASSERT_EQ(1, segments.size()); @@ -147,9 +147,9 @@ TEST(Clause, AggregationColumn) size_t num_rows{100}; size_t unique_grouping_values{10}; auto proc_unit = ProcessingUnit{generate_groupby_testing_segment(num_rows, unique_grouping_values)}; - auto entity_ids = push_entities(component_manager, std::move(proc_unit)); + auto entity_ids = push_entities(*component_manager, std::move(proc_unit)); - auto aggregated = gather_entities(component_manager, aggregation.process(std::move(entity_ids))); + auto aggregated = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, aggregation.process(std::move(entity_ids))); ASSERT_TRUE(aggregated.segments_.has_value()); auto segments = aggregated.segments_.value(); ASSERT_EQ(1, segments.size()); @@ -178,9 +178,9 @@ TEST(Clause, AggregationSparseColumn) size_t num_rows{100}; size_t unique_grouping_values{10}; auto proc_unit = ProcessingUnit{generate_groupby_testing_sparse_segment(num_rows, unique_grouping_values)}; - auto entity_ids = push_entities(component_manager, std::move(proc_unit)); + auto entity_ids = push_entities(*component_manager, std::move(proc_unit)); - auto aggregated = gather_entities(component_manager, aggregation.process(std::move(entity_ids))); + auto aggregated = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, aggregation.process(std::move(entity_ids))); ASSERT_TRUE(aggregated.segments_.has_value()); auto segments = aggregated.segments_.value(); ASSERT_EQ(1, segments.size()); @@ -242,9 +242,9 @@ TEST(Clause, AggregationSparseGroupby) { // 1 more group because of missing values size_t unique_groups{unique_grouping_values + 1}; auto proc_unit = ProcessingUnit{generate_sparse_groupby_testing_segment(num_rows, unique_grouping_values)}; - auto entity_ids = push_entities(component_manager, std::move(proc_unit)); + auto entity_ids = push_entities(*component_manager, std::move(proc_unit)); - auto aggregated = gather_entities(component_manager, aggregation.process(std::move(entity_ids))); + auto aggregated = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, aggregation.process(std::move(entity_ids))); ASSERT_TRUE(aggregated.segments_.has_value()); auto segments = aggregated.segments_.value(); ASSERT_EQ(1, segments.size()); @@ -300,9 +300,9 @@ TEST(Clause, Passthrough) { auto seg = get_standard_timeseries_segment("passthrough"); auto copied = seg.clone(); auto proc_unit = ProcessingUnit{std::move(seg)};; - auto entity_ids = push_entities(component_manager, std::move(proc_unit)); + auto entity_ids = push_entities(*component_manager, std::move(proc_unit)); - auto ret = gather_entities(component_manager, passthrough.process(std::move(entity_ids))); + auto ret = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, passthrough.process(std::move(entity_ids))); ASSERT_TRUE(ret.segments_.has_value()); ASSERT_EQ(ret.segments_->size(), 1); ASSERT_EQ(*ret.segments_->at(0), copied); @@ -321,9 +321,9 @@ TEST(Clause, Sort) { std::mt19937 urng(rng()); std::shuffle(seg.begin(), seg.end(), urng); auto proc_unit = ProcessingUnit{std::move(seg)}; - auto entity_ids = push_entities(component_manager, std::move(proc_unit)); + auto entity_ids = push_entities(*component_manager, std::move(proc_unit)); - auto res = gather_entities(component_manager, sort_clause.process(std::move(entity_ids))); + auto res = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, sort_clause.process(std::move(entity_ids))); ASSERT_TRUE(res.segments_.has_value()); ASSERT_EQ(*res.segments_->at(0), copied); } @@ -339,9 +339,9 @@ TEST(Clause, Split) { auto seg = get_standard_timeseries_segment(symbol, 100); auto copied = seg.clone(); auto proc_unit = ProcessingUnit{std::move(seg)}; - auto entity_ids = push_entities(component_manager, std::move(proc_unit)); + auto entity_ids = push_entities(*component_manager, std::move(proc_unit)); - auto res = gather_entities(component_manager, split_clause.process(std::move(entity_ids))); + auto res = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, split_clause.process(std::move(entity_ids))); ASSERT_TRUE(res.segments_.has_value()); ASSERT_EQ(res.segments_->size(), 10); @@ -402,14 +402,14 @@ TEST(Clause, Merge) { std::vector entity_ids; for(auto x = 0u; x < num_segs; ++x) { auto proc_unit = ProcessingUnit{std::move(segs[x])}; - entity_ids.push_back(push_entities(component_manager, std::move(proc_unit))[0]); + entity_ids.push_back(push_entities(*component_manager, std::move(proc_unit))[0]); } std::vector processed_ids = merge_clause.process(std::move(entity_ids)); std::vector> vec; vec.emplace_back(std::move(processed_ids)); auto repartitioned = merge_clause.repartition(std::move(vec)); - auto res = gather_entities(component_manager, std::move(repartitioned->at(0))); + auto res = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, std::move(repartitioned->at(0))); ASSERT_TRUE(res.segments_.has_value()); ASSERT_EQ(res.segments_->size(), 1u); ASSERT_EQ(*res.segments_->at(0), seg); diff --git a/cpp/arcticdb/processing/test/test_component_manager.cpp b/cpp/arcticdb/processing/test/test_component_manager.cpp index 9bc4b3ee66..a7c41752df 100644 --- a/cpp/arcticdb/processing/test/test_component_manager.cpp +++ b/cpp/arcticdb/processing/test/test_component_manager.cpp @@ -18,39 +18,33 @@ TEST(ComponentManager, Simple) { auto row_range_0 = std::make_shared(0, 10); auto col_range_0 = std::make_shared(10, 20); auto key_0 = std::make_shared(AtomKeyBuilder().version_id(1).build("symbol_0", KeyType::TABLE_DATA)); - uint64_t expected_get_calls_0{1}; + uint64_t entity_fetch_count_0{1}; auto segment_1 = std::make_shared(); auto row_range_1 = std::make_shared(20, 30); auto col_range_1 = std::make_shared(30, 40); auto key_1 = std::make_shared(AtomKeyBuilder().version_id(2).build("symbol_1", KeyType::TABLE_DATA)); - uint64_t expected_get_calls_1{2}; - - auto id_0 = component_manager.add(segment_0, std::nullopt, expected_get_calls_0); - ASSERT_EQ(component_manager.add(row_range_0, id_0), id_0); - ASSERT_EQ(component_manager.add(col_range_0, id_0), id_0); - ASSERT_EQ(component_manager.add(key_0, id_0), id_0); - - auto id_1 = component_manager.add(segment_1, std::nullopt, expected_get_calls_1); - ASSERT_EQ(component_manager.add(row_range_1, id_1), id_1); - ASSERT_EQ(component_manager.add(col_range_1, id_1), id_1); - ASSERT_EQ(component_manager.add(key_1, id_1), id_1); - - ASSERT_EQ(id_0, 0); - ASSERT_EQ(id_1, 1); - - ASSERT_EQ(component_manager.get>({id_0})[0], segment_0); - ASSERT_EQ(component_manager.get_initial_expected_get_calls>(id_0), expected_get_calls_0); - ASSERT_EQ(component_manager.get>({id_0})[0], row_range_0); - ASSERT_EQ(component_manager.get>({id_0})[0], col_range_0); - ASSERT_EQ(component_manager.get>({id_0})[0], key_0); - EXPECT_THROW(component_manager.get>({id_0}), InternalException); - - ASSERT_EQ(component_manager.get>({id_1})[0], segment_1); - ASSERT_EQ(component_manager.get>({id_1})[0], segment_1); - ASSERT_EQ(component_manager.get_initial_expected_get_calls>(id_1), expected_get_calls_1); - ASSERT_EQ(component_manager.get>({id_1})[0], row_range_1); - ASSERT_EQ(component_manager.get>({id_1})[0], col_range_1); - ASSERT_EQ(component_manager.get>({id_1})[0], key_1); - EXPECT_THROW(component_manager.get>({id_1}), InternalException); + uint64_t entity_fetch_count_1{2}; + + auto ids = component_manager.get_new_entity_ids(2); + component_manager.add_entity(ids[0], segment_0, row_range_0, col_range_0, key_0, entity_fetch_count_0); + + component_manager.add_entity(ids[1], segment_1, row_range_1, col_range_1, key_1, entity_fetch_count_1); + + auto [segments, row_ranges, col_ranges, keys, entity_fetch_counts] = component_manager.get_entities, std::shared_ptr, std::shared_ptr, std::shared_ptr, EntityFetchCount>(ids); + + ASSERT_EQ(segments[0], segment_0); + ASSERT_EQ(row_ranges[0], row_range_0); + ASSERT_EQ(col_ranges[0], col_range_0); + ASSERT_EQ(keys[0], key_0); + ASSERT_EQ(entity_fetch_counts[0], entity_fetch_count_0); + + ASSERT_EQ(segments[1], segment_1); + ASSERT_EQ(row_ranges[1], row_range_1); + ASSERT_EQ(col_ranges[1], col_range_1); + ASSERT_EQ(keys[1], key_1); + ASSERT_EQ(entity_fetch_counts[1], entity_fetch_count_1); + + // EntityFetchCount for entity with id_1 is 2, so can be fetched again without exceptions + component_manager.get_entities, std::shared_ptr, std::shared_ptr, std::shared_ptr, EntityFetchCount>({ids[1]}); } diff --git a/cpp/arcticdb/processing/test/test_resample.cpp b/cpp/arcticdb/processing/test/test_resample.cpp index f21519f7ec..1c8ebc87ed 100644 --- a/cpp/arcticdb/processing/test/test_resample.cpp +++ b/cpp/arcticdb/processing/test/test_resample.cpp @@ -242,9 +242,9 @@ TEST(Resample, ProcessOneSegment) { seg.set_row_id(num_rows - 1); auto proc_unit = ProcessingUnit{std::move(seg)}; - auto entity_ids = push_entities(component_manager, std::move(proc_unit)); + auto entity_ids = push_entities(*component_manager, std::move(proc_unit)); - auto resampled = gather_entities(component_manager, resample.process(std::move(entity_ids))); + auto resampled = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, resample.process(std::move(entity_ids))); ASSERT_TRUE(resampled.segments_.has_value()); auto segments = resampled.segments_.value(); ASSERT_EQ(1, segments.size()); @@ -329,22 +329,16 @@ TEST(Resample, ProcessMultipleSegments) { auto row_range_2 = std::make_shared(5, 6); auto col_range_2 = std::make_shared(1, 2); - auto id_0 = component_manager->add(seg_0, std::nullopt, 1); - component_manager->add(row_range_0, id_0); - component_manager->add(col_range_0, id_0); - auto id_1 = component_manager->add(seg_1, std::nullopt, 2); - component_manager->add(row_range_1, id_1); - component_manager->add(col_range_1, id_1); - auto id_2 = component_manager->add(seg_2, std::nullopt, 1); - component_manager->add(row_range_2, id_2); - component_manager->add(col_range_2, id_2); + auto ids = component_manager->get_new_entity_ids(3); + component_manager->add_entity(ids[0], seg_0, row_range_0, col_range_0, EntityFetchCount(1)); + component_manager->add_entity(ids[1], seg_1, row_range_1, col_range_1, EntityFetchCount(2)); + component_manager->add_entity(ids[2], seg_2, row_range_2, col_range_2, EntityFetchCount(1)); + std::vector ids_0{ids[0], ids[1]}; + std::vector ids_1{ids[1]}; + std::vector ids_2{ids[2]}; - std::vector ids_0{id_0, id_1}; - std::vector ids_1{id_1}; - std::vector ids_2{id_2}; - - auto resampled_0 = gather_entities(component_manager, resample.process(std::move(ids_0))); + auto resampled_0 = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, resample.process(std::move(ids_0))); auto resampled_seg_0 = *resampled_0.segments_.value()[0]; auto& resampled_index_column_0 = resampled_seg_0.column(0); auto& resampled_sum_column_0 = resampled_seg_0.column(1); @@ -353,7 +347,7 @@ TEST(Resample, ProcessMultipleSegments) { ASSERT_EQ(0, resampled_sum_column_0.scalar_at(0)); ASSERT_EQ(30, resampled_sum_column_0.scalar_at(1)); - auto resampled_1 = gather_entities(component_manager, resample.process(std::move(ids_1))); + auto resampled_1 = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, resample.process(std::move(ids_1))); auto resampled_seg_1 = *resampled_1.segments_.value()[0]; auto& resampled_index_column_1 = resampled_seg_1.column(0); auto& resampled_sum_column_1 = resampled_seg_1.column(1); @@ -362,7 +356,7 @@ TEST(Resample, ProcessMultipleSegments) { ASSERT_EQ(30, resampled_sum_column_1.scalar_at(0)); ASSERT_EQ(40, resampled_sum_column_1.scalar_at(1)); - auto resampled_2 = gather_entities(component_manager, resample.process(std::move(ids_2))); + auto resampled_2 = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, resample.process(std::move(ids_2))); auto resampled_seg_2 = *resampled_2.segments_.value()[0]; auto& resampled_index_column_2 = resampled_seg_2.column(0); auto& resampled_sum_column_2 = resampled_seg_2.column(1); diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index 4f57d60b7a..7dfa1dd685 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -492,7 +492,7 @@ std::vector process_clauses( } // Map from index in segment_and_slice_future_splitters to the number of processing units that require that segment - auto segment_proc_unit_counts = std::make_shared>(segment_and_slice_futures.size(), 0); + auto segment_proc_unit_counts = std::make_shared>(segment_and_slice_futures.size(), 0); for (const auto& list: processing_unit_indexes) { for (auto idx: list) { internal::check( @@ -504,12 +504,31 @@ std::vector process_clauses( internal::check( std::all_of(segment_proc_unit_counts->begin(), segment_proc_unit_counts->end(), [](const size_t& val) { return val != 0; }), "All segments should be needed by at least one ProcessingUnit"); + // Map from position in segment_and_slice_futures to entity ids + std::vector pos_to_id; + // Map from entity id to position in segment_and_slice_futures + auto id_to_pos = std::make_shared>(); + pos_to_id.reserve(segment_and_slice_futures.size()); + auto ids = component_manager->get_new_entity_ids(segment_and_slice_futures.size()); + for (auto&& [idx, id]: folly::enumerate(ids)) { + pos_to_id.emplace_back(id); + id_to_pos->emplace(id, idx); + } + // Give this a more descriptive name as we modify it between clauses - std::vector> entity_ids_vec{std::move(processing_unit_indexes)}; + std::vector> entity_ids_vec; + entity_ids_vec.reserve(processing_unit_indexes.size()); + for (const auto& indexes: processing_unit_indexes) { + entity_ids_vec.emplace_back(); + entity_ids_vec.back().reserve(indexes.size()); + for (auto index: indexes) { + entity_ids_vec.back().emplace_back(pos_to_id[index]); + } + } // Used to make sure each entity is only added into the component manager once - auto entity_added_mtx = std::make_shared>(segment_and_slice_futures.size()); - auto entity_added = std::make_shared>(segment_and_slice_futures.size(), false); + auto slice_added_mtx = std::make_shared>(segment_and_slice_futures.size()); + auto slice_added = std::make_shared>(segment_and_slice_futures.size(), false); std::vector>> futures; bool first_clause{true}; // Reverse the order of clauses and iterate through them backwards so that the erase is efficient @@ -520,34 +539,32 @@ std::vector process_clauses( std::vector> local_futs; local_futs.reserve(entity_ids.size()); for (auto id: entity_ids) { - local_futs.emplace_back(segment_and_slice_future_splitters[id].getFuture()); + local_futs.emplace_back(segment_and_slice_future_splitters[id_to_pos->at(id)].getFuture()); } futures.emplace_back( folly::collect(local_futs) .via(&async::cpu_executor()) .thenValue([component_manager, segment_proc_unit_counts, - entity_added_mtx, - entity_added, + id_to_pos, + slice_added_mtx, + slice_added, clauses, entity_ids = std::move(entity_ids)](std::vector&& segment_and_slices) mutable { for (auto&& [idx, segment_and_slice]: folly::enumerate(segment_and_slices)) { auto entity_id = entity_ids[idx]; - std::lock_guard lock((*entity_added_mtx)[entity_id]); - if (!(*entity_added)[entity_id]) { - component_manager->add( + auto pos = id_to_pos->at(entity_id); + std::lock_guard lock((*slice_added_mtx)[pos]); + if (!(*slice_added)[pos]) { + component_manager->add_entity( + entity_id, std::make_shared(std::move(segment_and_slice.segment_in_memory_)), - entity_id, (*segment_proc_unit_counts)[entity_id]); - component_manager->add( std::make_shared(std::move(segment_and_slice.ranges_and_key_.row_range_)), - entity_id); - component_manager->add( std::make_shared(std::move(segment_and_slice.ranges_and_key_.col_range_)), - entity_id); - component_manager->add( std::make_shared(std::move(segment_and_slice.ranges_and_key_.key_)), - entity_id); - (*entity_added)[entity_id] = true; + (*segment_proc_unit_counts)[pos] + ); + (*slice_added)[pos] = true; } } return async::MemSegmentProcessingTask(*clauses, std::move(entity_ids))(); @@ -703,7 +720,6 @@ std::vector read_and_process( // i.e. if the first processing unit needs ranges_and_keys[0] and ranges_and_keys[1], and the second needs ranges_and_keys[2] and ranges_and_keys[3] // then the structure will be {{0, 1}, {2, 3}} std::vector> processing_unit_indexes = read_query.clauses_[0]->structure_for_processing(ranges_and_keys, start_from); - component_manager->set_next_entity_id(ranges_and_keys.size()); // Start reading as early as possible auto segment_and_slice_futures = store->batch_read_uncompressed(std::move(ranges_and_keys), columns_to_decode(pipeline_context)); @@ -712,7 +728,7 @@ std::vector read_and_process( std::move(segment_and_slice_futures), std::move(processing_unit_indexes), std::make_shared>>(read_query.clauses_)); - auto proc = gather_entities(component_manager, std::move(processed_entity_ids)); + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, std::move(processed_entity_ids)); if (std::any_of(read_query.clauses_.begin(), read_query.clauses_.end(), [](const std::shared_ptr& clause) { return clause->clause_info().modifies_output_descriptor_; diff --git a/cpp/third_party/CMakeLists.txt b/cpp/third_party/CMakeLists.txt index aaf5cc4e58..9d1d086981 100644 --- a/cpp/third_party/CMakeLists.txt +++ b/cpp/third_party/CMakeLists.txt @@ -6,3 +6,4 @@ if(NOT ${ARCTICDB_USING_CONDA}) add_subdirectory(Remotery) add_subdirectory(lmdbcxx) endif() +add_subdirectory(entt) diff --git a/cpp/third_party/entt b/cpp/third_party/entt new file mode 160000 index 0000000000..7821307565 --- /dev/null +++ b/cpp/third_party/entt @@ -0,0 +1 @@ +Subproject commit 78213075654a688e9da6bc49f7f873d25c26d12c