Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement 1895: Fully parallelise processing in read_batch #1950

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,7 @@ if(${TEST})
processing/test/test_filter_and_project_sparse.cpp
processing/test/test_has_valid_type_promotion.cpp
processing/test/test_operation_dispatch.cpp
processing/test/test_parallel_processing.cpp
processing/test/test_resample.cpp
processing/test/test_set_membership.cpp
processing/test/test_signed_unsigned_comparison.cpp
Expand Down
6 changes: 0 additions & 6 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,6 @@ folly::Future<std::pair<VariantKey, TimeseriesDescriptor>> read_timeseries_descr
return read_and_continue(key, library_, opts, DecodeTimeseriesDescriptorTask{});
}

folly::Future<std::pair<VariantKey, TimeseriesDescriptor>> read_timeseries_descriptor_for_incompletes(
const entity::VariantKey &key,
storage::ReadKeyOpts opts = storage::ReadKeyOpts{}) override {
return read_and_continue(key, library_, opts, DecodeTimeseriesDescriptorForIncompletesTask{});
}

folly::Future<bool> key_exists(entity::VariantKey &&key) {
return async::submit_io_task(KeyExistsTask{std::move(key), library_});
}
Expand Down
22 changes: 0 additions & 22 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,28 +551,6 @@ struct DecodeTimeseriesDescriptorTask : BaseTask {
}
};

struct DecodeTimeseriesDescriptorForIncompletesTask : BaseTask {
ARCTICDB_MOVE_ONLY_DEFAULT(DecodeTimeseriesDescriptorForIncompletesTask)

DecodeTimeseriesDescriptorForIncompletesTask() = default;

std::pair<VariantKey, TimeseriesDescriptor> operator()(storage::KeySegmentPair &&ks) const {
ARCTICDB_SAMPLE(DecodeTimeseriesDescriptorForIncompletesTask, 0)
auto key_seg = std::move(ks);
ARCTICDB_DEBUG(
log::storage(),
"DecodeTimeseriesDescriptorForIncompletesTask decoding segment with key {}",
variant_key_view(key_seg.variant_key()));

auto maybe_desc = decode_timeseries_descriptor_for_incompletes(key_seg.segment());

util::check(static_cast<bool>(maybe_desc), "Failed to decode timeseries descriptor");
return std::make_pair(
std::move(key_seg.variant_key()),
std::move(*maybe_desc));
}
};

struct DecodeMetadataAndDescriptorTask : BaseTask {
ARCTICDB_MOVE_ONLY_DEFAULT(DecodeMetadataAndDescriptorTask)

Expand Down
12 changes: 6 additions & 6 deletions cpp/arcticdb/column_store/test/ingestion_stress_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ TEST_F(IngestionStressStore, ScalarIntAppend) {
ro.allow_sparse_ = true;
ro.set_dynamic_schema(true);
ro.set_incompletes(true);
ReadQuery read_query;
read_query.row_filter = universal_range();
auto read_query = std::make_shared<ReadQuery>();
read_query->row_filter = universal_range();
register_native_handler_data_factory();
auto handler_data = get_type_handler_data();
auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ro, handler_data);
Expand Down Expand Up @@ -213,8 +213,8 @@ TEST_F(IngestionStressStore, ScalarIntDynamicSchema) {
read_options.set_dynamic_schema(true);
read_options.set_allow_sparse(true);
read_options.set_incompletes(true);
ReadQuery read_query;
read_query.row_filter = universal_range();
auto read_query = std::make_shared<ReadQuery>();
read_query->row_filter = universal_range();
register_native_handler_data_factory();
auto handler_data = get_type_handler_data();
auto read_result = test_store_->read_dataframe_version_internal(symbol, VersionQuery{}, read_query, read_options, handler_data);
Expand Down Expand Up @@ -266,8 +266,8 @@ TEST_F(IngestionStressStore, DynamicSchemaWithStrings) {
read_options.set_dynamic_schema(true);
read_options.set_allow_sparse(true);
read_options.set_incompletes(true);
ReadQuery read_query;
read_query.row_filter = universal_range();
auto read_query = std::make_shared<ReadQuery>();
read_query->row_filter = universal_range();
register_native_handler_data_factory();
auto handler_data = get_type_handler_data();
auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, read_options, handler_data);
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/pipeline/frame_slice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace arcticdb::pipelines {

void SliceAndKey::ensure_segment(const std::shared_ptr<Store>& store) const {
if(!segment_)
segment_ = store->read(*key_).get().second;
segment_ = store->read_sync(*key_).second;
}

SegmentInMemory& SliceAndKey::segment(const std::shared_ptr<Store>& store) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/pipeline/pipeline_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ inline ReadResult read_result_from_single_frame(FrameAndDescriptor& frame_and_de
auto descriptor = std::make_shared<StreamDescriptor>(frame_and_desc.frame_.descriptor());
pipeline_context->begin()->set_descriptor(std::move(descriptor));
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data();
reduce_and_fix_columns(pipeline_context, frame_and_desc.frame_, ReadOptions{}, handler_data);
reduce_and_fix_columns(pipeline_context, frame_and_desc.frame_, ReadOptions{}, handler_data).get();
apply_type_handlers(frame_and_desc.frame_, handler_data);
return create_python_read_result(VersionedItem{key}, std::move(frame_and_desc));
}
Expand Down
41 changes: 27 additions & 14 deletions cpp/arcticdb/pipeline/read_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -630,30 +630,42 @@ struct ReduceColumnTask : async::BaseTask {
}
};

void reduce_and_fix_columns(
std::shared_ptr<PipelineContext> &context,
SegmentInMemory &frame,
const ReadOptions& read_options,
std::any& handler_data
folly::Future<folly::Unit> reduce_and_fix_columns(
std::shared_ptr<PipelineContext> &context,
SegmentInMemory &frame,
const ReadOptions& read_options,
std::any& handler_data
) {
ARCTICDB_SAMPLE_DEFAULT(ReduceAndFixStringCol)
ARCTICDB_DEBUG(log::version(), "Reduce and fix columns");
if(frame.empty())
return;
return folly::Unit{};

bool dynamic_schema = opt_false(read_options.dynamic_schema_);
auto slice_map = std::make_shared<FrameSliceMap>(context, dynamic_schema);
DecodePathData shared_data;

static const auto batch_size = ConfigsMap::instance()->get_int("ReduceColumns.BatchSize", 100);
folly::collect(folly::window(frame.descriptor().fields().size(), [&] (size_t field) {
return async::submit_cpu_task(ReduceColumnTask(frame, field, slice_map, context, shared_data, handler_data, dynamic_schema));
}, batch_size)).via(&async::io_executor()).get();
// This logic mimics that in ReduceColumnTask operator() to identify whether the task will actually do any work
// This is to avoid scheduling work that is a no-op
std::vector<size_t> fields_to_reduce;
for (size_t idx=0; idx<frame.descriptor().fields().size(); ++idx) {
const auto& frame_field = frame.field(idx);
if (dynamic_schema ||
(slice_map->columns_.contains(frame_field.name()) && is_sequence_type(frame_field.type().data_type()))) {
fields_to_reduce.emplace_back(idx);
}
}

static const auto batch_size = ConfigsMap::instance()->get_int("ReduceColumns.BatchSize", 100);
return folly::collect(
folly::window(std::move(fields_to_reduce),
[context, frame, slice_map, shared_data, dynamic_schema, &handler_data] (size_t field) mutable {
return async::submit_cpu_task(ReduceColumnTask(frame, field, slice_map, context, shared_data, handler_data, dynamic_schema));
}, batch_size)).via(&async::io_executor()).unit();
}

folly::Future<std::vector<VariantKey>> fetch_data(
const SegmentInMemory& frame,
folly::Future<SegmentInMemory> fetch_data(
SegmentInMemory&& frame,
const std::shared_ptr<PipelineContext> &context,
const std::shared_ptr<stream::StreamSource>& ssource,
bool dynamic_schema,
Expand All @@ -662,7 +674,7 @@ folly::Future<std::vector<VariantKey>> fetch_data(
) {
ARCTICDB_SAMPLE_DEFAULT(FetchSlices)
if (frame.empty())
return {std::vector<VariantKey>{}};
return frame;

std::vector<std::pair<VariantKey, stream::StreamSource::ReadContinuation>> keys_and_continuations;
keys_and_continuations.reserve(context->slice_and_keys_.size());
Expand All @@ -684,7 +696,8 @@ folly::Future<std::vector<VariantKey>> fetch_data(
}
}
ARCTICDB_SUBSAMPLE_DEFAULT(DoBatchReadCompressed)
return ssource->batch_read_compressed(std::move(keys_and_continuations), BatchReadArgs{});
return ssource->batch_read_compressed(std::move(keys_and_continuations), BatchReadArgs{})
.thenValue([frame](auto&&){ return frame; });
}

} // namespace read
6 changes: 3 additions & 3 deletions cpp/arcticdb/pipeline/read_frame.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ void mark_index_slices(
bool dynamic_schema,
bool column_groups);

folly::Future<std::vector<VariantKey>> fetch_data(
const SegmentInMemory& frame,
folly::Future<SegmentInMemory> fetch_data(
SegmentInMemory&& frame,
const std::shared_ptr<PipelineContext> &context,
const std::shared_ptr<stream::StreamSource>& ssource,
bool dynamic_schema,
Expand All @@ -92,7 +92,7 @@ void decode_into_frame_dynamic(
const DecodePathData& shared_data,
std::any& handler_data);

void reduce_and_fix_columns(
folly::Future<folly::Unit> reduce_and_fix_columns(
std::shared_ptr<PipelineContext> &context,
SegmentInMemory &frame,
const ReadOptions& read_options,
Expand Down
25 changes: 25 additions & 0 deletions cpp/arcticdb/processing/clause_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,29 @@ std::vector<EntityId> flatten_entities(std::vector<std::vector<EntityId>>&& enti
return res;
}

std::vector<folly::FutureSplitter<pipelines::SegmentAndSlice>> split_futures(
std::vector<folly::Future<pipelines::SegmentAndSlice>>&& segment_and_slice_futures) {
std::vector<folly::FutureSplitter<pipelines::SegmentAndSlice>> res;
res.reserve(segment_and_slice_futures.size());
for (auto&& future: segment_and_slice_futures) {
res.emplace_back(folly::splitFuture(std::move(future)));
}
return res;
}

std::shared_ptr<std::vector<EntityFetchCount>> generate_segment_fetch_counts(
const std::vector<std::vector<size_t>>& processing_unit_indexes,
size_t num_segments) {
auto res = std::make_shared<std::vector<EntityFetchCount>>(num_segments, 0);
for (const auto& list: processing_unit_indexes) {
for (auto idx: list) {
res->at(idx)++;
}
}
debug::check<ErrorCode::E_ASSERTION_FAILURE>(
std::all_of(res->begin(), res->end(), [](const size_t& val) { return val != 0; }),
"All segments should be needed by at least one ProcessingUnit");
return res;
}

}
9 changes: 9 additions & 0 deletions cpp/arcticdb/processing/clause_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include <string>
#include <unordered_set>

#include <folly/futures/FutureSplitter.h>

#include <arcticdb/pipeline/frame_slice.hpp>
#include <arcticdb/processing/component_manager.hpp>
#include <arcticdb/processing/processing_unit.hpp>
Expand Down Expand Up @@ -242,4 +244,11 @@ std::vector<EntityId> push_entities(ComponentManager& component_manager, Process

std::vector<EntityId> flatten_entities(std::vector<std::vector<EntityId>>&& entity_ids_vec);

std::vector<folly::FutureSplitter<pipelines::SegmentAndSlice>> split_futures(
std::vector<folly::Future<pipelines::SegmentAndSlice>>&& segment_and_slice_futures);

std::shared_ptr<std::vector<EntityFetchCount>> generate_segment_fetch_counts(
const std::vector<std::vector<size_t>>& processing_unit_indexes,
size_t num_segments);

}//namespace arcticdb
23 changes: 16 additions & 7 deletions cpp/arcticdb/processing/component_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,26 @@ namespace arcticdb {

std::vector<EntityId> ComponentManager::get_new_entity_ids(size_t count) {
std::vector<EntityId> ids(count);
std::lock_guard<std::mutex> lock(mtx_);
std::unique_lock lock(mtx_);
registry_.create(ids.begin(), ids.end());
return ids;
}

void ComponentManager::erase_entity(EntityId id) {
// Ideally would call registry_.destroy(id), or at least registry_.erase<std::shared_ptr<SegmentInMemory>>(id)
// at this point. However, they are both slower than this, so just decrement the ref count of the only
// sizeable component, so that when the shared pointer goes out of scope in the calling function, the
// memory is freed
registry_.get<std::shared_ptr<SegmentInMemory>>(id).reset();
void ComponentManager::decrement_entity_fetch_count(EntityId id) {
if (registry_.get<std::atomic<EntityFetchCount>>(id).fetch_sub(1) == 1) {
// This entity will never be accessed again
// Ideally would call registry_.destroy(id), or at least registry_.erase<std::shared_ptr<SegmentInMemory>>(id)
// at this point. However, they are both slower than this, and would require taking a unique_lock on the
// shared_mutex, so just decrement the ref count of the only sizeable component, so that when the shared pointer
// goes out of scope in the calling function, the memory is freed
registry_.get<std::shared_ptr<SegmentInMemory>>(id).reset();
debug::check<ErrorCode::E_ASSERTION_FAILURE>(!registry_.get<std::shared_ptr<SegmentInMemory>>(id),
"SegmentInMemory memory retained in ComponentManager");
}
}

void ComponentManager::update_entity_fetch_count(EntityId id, EntityFetchCount count) {
registry_.get<std::atomic<EntityFetchCount>>(id).store(count);
}


Expand Down
Loading
Loading