diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 7446eeb8812..200b4ead567 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -71,6 +71,8 @@ namespace DB M(force_remote_read_for_batch_cop_once) \ M(exception_new_dynamic_thread) \ M(force_wait_index_timeout) \ + M(force_local_index_task_memory_limit_exceeded) \ + M(exception_build_local_index_for_file) \ M(force_not_support_vector_index) \ M(sync_schema_request_failure) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 7610e106732..b05d77d2499 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -39,7 +39,7 @@ #include #include #include -#include +#include #include #include #include @@ -363,7 +363,7 @@ DeltaMergeStorePtr DeltaMergeStore::create( settings_, thread_pool); std::shared_ptr store_shared_ptr(store); - store_shared_ptr->checkAllSegmentsLocalIndex(); + store_shared_ptr->checkAllSegmentsLocalIndex({}); return store_shared_ptr; } @@ -2032,20 +2032,20 @@ void DeltaMergeStore::applySchemaChanges(TiDB::TableInfo & table_info) void DeltaMergeStore::applyLocalIndexChange(const TiDB::TableInfo & new_table_info) { // Get a snapshot on the local_index_infos to check whether any new index is created - auto new_local_index_infos = generateLocalIndexInfos(getLocalIndexInfosSnapshot(), new_table_info, log); + auto changeset = generateLocalIndexInfos(getLocalIndexInfosSnapshot(), new_table_info, log); // no index is created or dropped - if (!new_local_index_infos) + if (!changeset.new_local_index_infos) return; { // new index created, update the info in-memory thread safety between `getLocalIndexInfosSnapshot` std::unique_lock index_write_lock(mtx_local_index_infos); - local_index_infos.swap(new_local_index_infos); + local_index_infos.swap(changeset.new_local_index_infos); } // generate async tasks for building local index for all segments - checkAllSegmentsLocalIndex(); + checkAllSegmentsLocalIndex(std::move(changeset.dropped_indexes)); } SortDescription DeltaMergeStore::getPrimarySortDescription() const diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 32f332d4558..341f8efb91f 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -29,7 +29,7 @@ #include #include #include -#include +#include #include #include #include @@ -70,6 +70,7 @@ using NotCompress = std::unordered_set; using SegmentIdSet = std::unordered_set; struct ExternalDTFileInfo; struct GCOptions; +struct LocalIndexBuildInfo; namespace tests { @@ -184,6 +185,9 @@ struct LocalIndexStats UInt64 rows_stable_not_indexed{}; // Total rows UInt64 rows_delta_indexed{}; // Total rows UInt64 rows_delta_not_indexed{}; // Total rows + + // If the index is finally failed to be built, then this is not empty + String error_message{}; }; using LocalIndexesStats = std::vector; @@ -730,11 +734,9 @@ class DeltaMergeStore MergeDeltaReason reason, SegmentSnapshotPtr segment_snap = nullptr); - void segmentEnsureStableIndex( - DMContext & dm_context, - const LocalIndexInfosPtr & index_info, - const DMFiles & dm_files, - const String & source_segment_info); + void segmentEnsureStableIndex(DMContext & dm_context, const LocalIndexBuildInfo & index_build_info); + + void segmentEnsureStableIndexWithErrorReport(DMContext & dm_context, const LocalIndexBuildInfo & index_build_info); /** * Ingest a DMFile into the segment, optionally causing a new segment being created. @@ -877,8 +879,9 @@ class DeltaMergeStore /** * Check whether there are new local indexes should be built for all segments. + * If dropped_indexes is not empty, try to cleanup the dropped_indexes */ - void checkAllSegmentsLocalIndex(); + void checkAllSegmentsLocalIndex(std::vector && dropped_indexes); /** * Ensure the segment has stable index. diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp index f63e7f79788..2a4b1c28965 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -21,6 +22,7 @@ #include #include #include +#include #include @@ -450,7 +452,7 @@ SegmentPtr DeltaMergeStore::segmentMerge( return merged; } -void DeltaMergeStore::checkAllSegmentsLocalIndex() +void DeltaMergeStore::checkAllSegmentsLocalIndex(std::vector && dropped_indexes) { if (!getLocalIndexInfosSnapshot()) return; @@ -512,6 +514,9 @@ void DeltaMergeStore::checkAllSegmentsLocalIndex() for (const auto & [end, segment] : segments) { UNUSED(end); + // cleanup the index error messaage for dropped indexes + segment->clearIndexBuildError(dropped_indexes); + if (segmentEnsureStableIndexAsync(segment)) ++segments_missing_indexes; } @@ -533,14 +538,15 @@ bool DeltaMergeStore::segmentEnsureStableIndexAsync(const SegmentPtr & segment) return false; // No lock is needed, stable meta is immutable. - auto dm_files = segment->getStable()->getDMFiles(); - auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, dm_files); - if (!build_info.indexes_to_build || build_info.indexes_to_build->empty()) + const auto build_info + = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, segment->getStable()->getDMFiles()); + if (!build_info.indexes_to_build || build_info.indexes_to_build->empty() || build_info.dm_files.empty()) return false; auto store_weak_ptr = weak_from_this(); - auto tracing_id = fmt::format("segmentEnsureStableIndex<{}>", log->identifier()); - auto workload = [store_weak_ptr, build_info, dm_files, segment, tracing_id]() -> void { + auto tracing_id + = fmt::format("segmentEnsureStableIndex<{}> source_segment={}", log->identifier(), segment->simpleInfo()); + auto workload = [store_weak_ptr, build_info, tracing_id]() -> void { auto store = store_weak_ptr.lock(); if (store == nullptr) // Store is destroyed before the task is executed. return; @@ -548,20 +554,45 @@ bool DeltaMergeStore::segmentEnsureStableIndexAsync(const SegmentPtr & segment) store->global_context, store->global_context.getSettingsRef(), tracing_id); - const auto source_segment_info = segment->simpleInfo(); - store->segmentEnsureStableIndex(*dm_context, build_info.indexes_to_build, dm_files, source_segment_info); + store->segmentEnsureStableIndexWithErrorReport(*dm_context, build_info); }; auto indexer_scheduler = global_context.getGlobalLocalIndexerScheduler(); RUNTIME_CHECK(indexer_scheduler != nullptr); - indexer_scheduler->pushTask(LocalIndexerScheduler::Task{ - .keyspace_id = keyspace_id, - .table_id = physical_table_id, - .file_ids = build_info.file_ids, - .request_memory = build_info.estimated_memory_bytes, - .workload = workload, - }); - return true; + try + { + // new task of these index are generated, clear existing error_message in segment + segment->clearIndexBuildError(build_info.indexesIDs()); + + auto [ok, reason] = indexer_scheduler->pushTask(LocalIndexerScheduler::Task{ + .keyspace_id = keyspace_id, + .table_id = physical_table_id, + .file_ids = build_info.filesIDs(), + .request_memory = build_info.estimated_memory_bytes, + .workload = workload, + }); + if (ok) + return true; + + segment->setIndexBuildError(build_info.indexesIDs(), reason); + LOG_ERROR( + log->getChild(tracing_id), + "Failed to generate async segment stable index task, index_ids={} reason={}", + build_info.indexesIDs(), + reason); + return false; + } + catch (...) + { + const auto message = getCurrentExceptionMessage(false, false); + segment->setIndexBuildError(build_info.indexesIDs(), message); + + tryLogCurrentException(log); + + // catch and ignore the exception + // not able to push task to index scheduler + return false; + } } bool DeltaMergeStore::segmentWaitStableIndexReady(const SegmentPtr & segment) const @@ -574,8 +605,8 @@ bool DeltaMergeStore::segmentWaitStableIndexReady(const SegmentPtr & segment) co // No lock is needed, stable meta is immutable. auto segment_id = segment->segmentId(); - auto dm_files = segment->getStable()->getDMFiles(); - auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, dm_files); + auto build_info + = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, segment->getStable()->getDMFiles()); if (!build_info.indexes_to_build || build_info.indexes_to_build->empty()) return true; @@ -650,11 +681,7 @@ SegmentPtr DeltaMergeStore::segmentUpdateMeta( return new_segment; } -void DeltaMergeStore::segmentEnsureStableIndex( - DMContext & dm_context, - const LocalIndexInfosPtr & index_info, - const DMFiles & dm_files, - const String & source_segment_info) +void DeltaMergeStore::segmentEnsureStableIndex(DMContext & dm_context, const LocalIndexBuildInfo & index_build_info) { // 1. Acquire a snapshot for PageStorage, and keep the snapshot until index is built. // This helps keep DMFile valid during the index build process. @@ -674,8 +701,10 @@ void DeltaMergeStore::segmentEnsureStableIndex( dm_context.tracing_id, /*snapshot_read*/ true); - RUNTIME_CHECK(dm_files.size() == 1); // size > 1 is currently not supported. - const auto & dm_file = dm_files[0]; + auto tracing_logger = log->getChild(getLogTracingId(dm_context)); + + RUNTIME_CHECK(index_build_info.dm_files.size() == 1); // size > 1 is currently not supported. + const auto & dm_file = index_build_info.dm_files[0]; auto is_file_valid = [this, dm_file] { std::shared_lock lock(read_write_mutex); @@ -686,24 +715,20 @@ void DeltaMergeStore::segmentEnsureStableIndex( // 2. Check whether the DMFile has been referenced by any valid segment. if (!is_file_valid()) { - LOG_DEBUG( - log, - "EnsureStableIndex - Give up because no segment to update, source_segment={}", - source_segment_info); + LOG_DEBUG(tracing_logger, "EnsureStableIndex - Give up because no segment to update"); return; } LOG_INFO( - log, - "EnsureStableIndex - Begin building index, dm_files={} source_segment={}", - DMFile::info(dm_files), - source_segment_info); + tracing_logger, + "EnsureStableIndex - Begin building index, dm_files={}", + DMFile::info(index_build_info.dm_files)); // 2. Build the index. DMFileIndexWriter iw(DMFileIndexWriter::Options{ .path_pool = path_pool, - .index_infos = index_info, - .dm_files = dm_files, + .index_infos = index_build_info.indexes_to_build, + .dm_files = index_build_info.dm_files, .dm_context = dm_context, }); @@ -719,11 +744,9 @@ void DeltaMergeStore::segmentEnsureStableIndex( if (e.code() == ErrorCodes::ABORTED) { LOG_INFO( - log, - "EnsureStableIndex - Build index aborted because DMFile is no longer valid, dm_files={} " - "source_segment={}", - DMFile::info(dm_files), - source_segment_info); + tracing_logger, + "EnsureStableIndex - Build index aborted because DMFile is no longer valid, dm_files={}", + DMFile::info(index_build_info.dm_files)); return; } throw; @@ -732,10 +755,9 @@ void DeltaMergeStore::segmentEnsureStableIndex( RUNTIME_CHECK(!new_dmfiles.empty()); LOG_INFO( - log, - "EnsureStableIndex - Finish building index, dm_files={} source_segment={}", - DMFile::info(dm_files), - source_segment_info); + tracing_logger, + "EnsureStableIndex - Finish building index, dm_files={}", + DMFile::info(index_build_info.dm_files)); // 3. Update the meta version of the segments to the latest one. // To avoid logical split between step 2 and 3, get lastest segments to update again. @@ -751,8 +773,66 @@ void DeltaMergeStore::segmentEnsureStableIndex( auto segment = id_to_segment[seg_id]; auto new_segment = segmentUpdateMeta(lock, dm_context, segment, new_dmfiles); // Expect update meta always success, because the segment must be valid and bump meta should succeed. - RUNTIME_CHECK_MSG(new_segment != nullptr, "Update meta failed for segment {}", segment->simpleInfo()); + RUNTIME_CHECK_MSG( + new_segment != nullptr, + "Update meta failed for segment {} ident={}", + segment->simpleInfo(), + tracing_logger->identifier()); + } + } +} + +// A wrapper of `segmentEnsureStableIndex` +// If any exception thrown, the error message will be recorded to +// the related segment(s) +void DeltaMergeStore::segmentEnsureStableIndexWithErrorReport( + DMContext & dm_context, + const LocalIndexBuildInfo & index_build_info) +{ + auto handle_error = [this, &index_build_info](const std::vector & index_ids) { + const auto message = getCurrentExceptionMessage(false, false); + std::unordered_map segment_to_add_msg; + { + std::unique_lock lock(read_write_mutex); + for (const auto & dmf : index_build_info.dm_files) + { + const auto segment_ids = dmfile_id_to_segment_ids.get(dmf->fileId()); + for (const auto & seg_id : segment_ids) + { + if (segment_to_add_msg.contains(seg_id)) + continue; + segment_to_add_msg.emplace(seg_id, id_to_segment[seg_id]); + } + } + } + + for (const auto & [seg_id, seg] : segment_to_add_msg) + { + UNUSED(seg_id); + seg->setIndexBuildError(index_ids, message); } + }; + + try + { + segmentEnsureStableIndex(dm_context, index_build_info); + } + catch (DB::Exception & e) + { + const auto index_ids = index_build_info.indexesIDs(); + e.addMessage(fmt::format("while building stable index for index_ids={}", index_ids)); + handle_error(index_ids); + + // rethrow + throw; + } + catch (...) + { + const auto index_ids = index_build_info.indexesIDs(); + handle_error(index_ids); + + // rethrow + throw; } } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp index 7a0a553fbc9..29fb04e42cf 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp @@ -16,6 +16,7 @@ #include #include #include +#include namespace DB { @@ -206,7 +207,7 @@ LocalIndexesStats DeltaMergeStore::getLocalIndexStats() LocalIndexStats index_stats; index_stats.column_id = index_info.column_id; index_stats.index_id = index_info.index_id; - index_stats.index_kind = "HNSW"; // TODO: Support more. + index_stats.index_kind = tipb::VectorIndexKind_Name(index_info.index_definition->kind); for (const auto & [handle, segment] : segments) { @@ -241,6 +242,14 @@ LocalIndexesStats DeltaMergeStore::getLocalIndexStats() { index_stats.rows_stable_not_indexed += stable->getRows(); } + + const auto index_build_error = segment->getIndexBuildError(); + // Set error_message to the first error_message we meet among all segments + if (auto err_iter = index_build_error.find(index_info.index_id); + err_iter != index_build_error.end() && index_stats.error_message.empty()) + { + index_stats.error_message = err_iter->second; + } } stats.emplace_back(index_stats); diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp index da58ca0696e..809743603b3 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp @@ -19,7 +19,7 @@ #include #include #include -#include +#include #include #include #include @@ -32,11 +32,15 @@ namespace DB::ErrorCodes { extern const int ABORTED; } +namespace DB::FailPoints +{ +extern const char exception_build_local_index_for_file[]; +} // namespace DB::FailPoints namespace DB::DM { -DMFileIndexWriter::LocalIndexBuildInfo DMFileIndexWriter::getLocalIndexBuildInfo( +LocalIndexBuildInfo DMFileIndexWriter::getLocalIndexBuildInfo( const LocalIndexInfosSnapshot & index_infos, const DMFiles & dm_files) { @@ -48,7 +52,7 @@ DMFileIndexWriter::LocalIndexBuildInfo DMFileIndexWriter::getLocalIndexBuildInfo // We can support dropping the vector index more quickly later. LocalIndexBuildInfo build; build.indexes_to_build = std::make_shared(); - build.file_ids.reserve(dm_files.size()); + build.dm_files.reserve(dm_files.size()); for (const auto & dmfile : dm_files) { bool any_new_index_build = false; @@ -74,10 +78,10 @@ DMFileIndexWriter::LocalIndexBuildInfo DMFileIndexWriter::getLocalIndexBuildInfo } if (any_new_index_build) - build.file_ids.emplace_back(LocalIndexerScheduler::DMFileID(dmfile->fileId())); + build.dm_files.emplace_back(dmfile); } - build.file_ids.shrink_to_fit(); + build.dm_files.shrink_to_fit(); return build; } @@ -188,6 +192,8 @@ size_t DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable, P } } + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_build_local_index_for_file); + // Write down the index size_t total_built_index_bytes = 0; std::unordered_map> new_indexes_on_cols; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h index 2747a40d260..76727f3eebf 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h @@ -17,7 +17,8 @@ #include #include #include -#include +#include +#include #include #include @@ -27,26 +28,45 @@ class StoragePathPool; using StoragePathPoolPtr = std::shared_ptr; } // namespace DB -namespace DB::DM -{ -class DMFile; -using DMFilePtr = std::shared_ptr; -using DMFiles = std::vector; -} // namespace DB::DM namespace DB::DM { -class DMFileIndexWriter +struct LocalIndexBuildInfo { + DMFiles dm_files; + size_t estimated_memory_bytes = 0; + LocalIndexInfosPtr indexes_to_build; + public: - struct LocalIndexBuildInfo + std::vector filesIDs() const { - std::vector file_ids; - size_t estimated_memory_bytes = 0; - LocalIndexInfosPtr indexes_to_build; - }; + std::vector ids; + ids.reserve(dm_files.size()); + for (const auto & dmf : dm_files) + { + ids.emplace_back(LocalIndexerScheduler::DMFileID(dmf->fileId())); + } + return ids; + } + std::vector indexesIDs() const + { + std::vector ids; + if (indexes_to_build) + { + ids.reserve(indexes_to_build->size()); + for (const auto & index : *indexes_to_build) + { + ids.emplace_back(index.index_id); + } + } + return ids; + } +}; +class DMFileIndexWriter +{ +public: static LocalIndexBuildInfo getLocalIndexBuildInfo( const LocalIndexInfosSnapshot & index_infos, const DMFiles & dm_files); diff --git a/dbms/src/Storages/DeltaMerge/Index/IndexInfo.cpp b/dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.cpp similarity index 92% rename from dbms/src/Storages/DeltaMerge/Index/IndexInfo.cpp rename to dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.cpp index 95e76f6eb04..fe527ac7ad2 100644 --- a/dbms/src/Storages/DeltaMerge/Index/IndexInfo.cpp +++ b/dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.cpp @@ -13,7 +13,7 @@ // limitations under the License. #include -#include +#include #include #include #include @@ -120,10 +120,10 @@ ColumnID getVectorIndxColumnID( LocalIndexInfosPtr initLocalIndexInfos(const TiDB::TableInfo & table_info, const LoggerPtr & logger) { // The same as generate local index infos with no existing_indexes - return generateLocalIndexInfos(nullptr, table_info, logger); + return generateLocalIndexInfos(nullptr, table_info, logger).new_local_index_infos; } -LocalIndexInfosPtr generateLocalIndexInfos( +LocalIndexInfosChangeset generateLocalIndexInfos( const LocalIndexInfosSnapshot & existing_indexes, const TiDB::TableInfo & new_table_info, const LoggerPtr & logger) @@ -135,7 +135,9 @@ LocalIndexInfosPtr generateLocalIndexInfos( bool is_storage_format_support = isVectorIndexSupported(logger); fiu_do_on(FailPoints::force_not_support_vector_index, { is_storage_format_support = false; }); if (!is_storage_format_support) - return new_index_infos; + return LocalIndexInfosChangeset{ + .new_local_index_infos = new_index_infos, + }; } // Keep a map of "indexes in existing_indexes" -> "offset in new_index_infos" @@ -253,7 +255,9 @@ LocalIndexInfosPtr generateLocalIndexInfos( return buf.toString(); }; LOG_DEBUG(logger, "Local index info does not changed, {}", get_logging()); - return nullptr; + return LocalIndexInfosChangeset{ + .new_local_index_infos = nullptr, + }; } auto get_changed_logging = [&]() -> String { @@ -295,7 +299,19 @@ LocalIndexInfosPtr generateLocalIndexInfos( return buf.toString(); }; LOG_INFO(logger, "Local index info generated, {}", get_changed_logging()); - return new_index_infos; + + // only return the newly dropped index with index_id > EmptyIndexID + std::vector dropped_indexes; + for (const auto & i : newly_dropped) + { + if (i.index_id <= EmptyIndexID) + continue; + dropped_indexes.emplace_back(i.index_id); + } + return LocalIndexInfosChangeset{ + .new_local_index_infos = new_index_infos, + .dropped_indexes = std::move(dropped_indexes), + }; } } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Index/IndexInfo.h b/dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.h similarity index 81% rename from dbms/src/Storages/DeltaMerge/Index/IndexInfo.h rename to dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.h index 711ed349646..68f3fd82111 100644 --- a/dbms/src/Storages/DeltaMerge/Index/IndexInfo.h +++ b/dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.h @@ -53,7 +53,17 @@ using LocalIndexInfosPtr = std::shared_ptr; using LocalIndexInfosSnapshot = std::shared_ptr; LocalIndexInfosPtr initLocalIndexInfos(const TiDB::TableInfo & table_info, const LoggerPtr & logger); -LocalIndexInfosPtr generateLocalIndexInfos( + +struct LocalIndexInfosChangeset +{ + LocalIndexInfosPtr new_local_index_infos; + std::vector dropped_indexes; +}; + +// Generate a changeset according to `existing_indexes` and `new_table_info` +// If there are newly added or dropped indexes according to `new_table_info`, +// return a changeset with changeset.new_local_index_infos != nullptr +LocalIndexInfosChangeset generateLocalIndexInfos( const LocalIndexInfosSnapshot & existing_indexes, const TiDB::TableInfo & new_table_info, const LoggerPtr & logger); diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp index 5abb0d414f2..01866ec5c8e 100644 --- a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp @@ -16,6 +16,12 @@ #include #include #include +#include + +namespace DB::FailPoints +{ +extern const char force_local_index_task_memory_limit_exceeded[]; +} // namespace DB::FailPoints namespace DB::DM @@ -99,13 +105,19 @@ void LocalIndexerScheduler::waitForFinish() } } -void LocalIndexerScheduler::pushTask(const Task & task) +std::tuple LocalIndexerScheduler::pushTask(const Task & task) { - if (pool_max_memory_limit > 0 && task.request_memory > pool_max_memory_limit) - throw Exception(fmt::format( - "Requests memory exceeds limit (request={} limit={})", - task.request_memory, - pool_max_memory_limit)); + bool memory_limit_exceed = pool_max_memory_limit > 0 && task.request_memory > pool_max_memory_limit; + fiu_do_on(FailPoints::force_local_index_task_memory_limit_exceeded, { memory_limit_exceed = true; }); + + if (unlikely(memory_limit_exceed)) + return { + false, + fmt::format( + "Requests memory to build local index exceeds limit (request={} limit={})", + task.request_memory, + pool_max_memory_limit), + }; std::unique_lock lock(mutex); @@ -123,6 +135,7 @@ void LocalIndexerScheduler::pushTask(const Task & task) scheduler_need_wakeup = true; scheduler_notifier.notify_all(); + return {true, ""}; } size_t LocalIndexerScheduler::dropTasks(KeyspaceID keyspace_id, TableID table_id) diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h index 0f64f68cddb..53349740918 100644 --- a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h @@ -107,10 +107,10 @@ class LocalIndexerScheduler /** * @brief Push a task to the pool. The task may not be scheduled immediately. - * Support adding the same task multiple times, but they are not allowed to execute at the same time. - * If the request_memory of task is larger than the memory_limit, will throw an exception. + * Return if pushing the task is done. + * Return if the task is not valid. */ - void pushTask(const Task & task); + std::tuple pushTask(const Task & task); /** * @brief Drop all tasks matching specified keyspace id and table id. diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 498b66635c6..0e3581d7268 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -37,7 +37,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index dcb5151818d..97ec82972d3 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -622,6 +622,30 @@ class Segment last_check_gc_safe_point.store(gc_safe_point, std::memory_order_relaxed); } + void setIndexBuildError(const std::vector & index_ids, const String & err_msg) + { + std::scoped_lock lock(mtx_local_index_message); + for (const auto & id : index_ids) + { + local_indexed_build_error.emplace(id, err_msg); + } + } + + std::unordered_map getIndexBuildError() const + { + std::scoped_lock lock(mtx_local_index_message); + return local_indexed_build_error; + } + + void clearIndexBuildError(const std::vector & index_ids) + { + std::scoped_lock lock(mtx_local_index_message); + for (const auto & id : index_ids) + { + local_indexed_build_error.erase(id); + } + } + #ifndef DBMS_PUBLIC_GTEST private: #else @@ -778,6 +802,9 @@ class Segment // and to avoid doing this check repeatedly, we add this flag to indicate whether the valid data ratio has already been checked. std::atomic check_valid_data_ratio = false; + mutable std::mutex mtx_local_index_message; + std::unordered_map local_indexed_build_error; + const LoggerPtr parent_log; // Used when constructing new segments in split const LoggerPtr log; }; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp index f9360b2830a..fe9d6899ad0 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp @@ -13,13 +13,20 @@ // limitations under the License. #include +#include #include -#include +#include +#include #include #include #include #include +namespace DB::FailPoints +{ +extern const char force_local_index_task_memory_limit_exceeded[]; +extern const char exception_build_local_index_for_file[]; +} // namespace DB::FailPoints namespace DB::DM::tests { @@ -687,4 +694,113 @@ try } CATCH +TEST_F(DeltaMergeStoreVectorTest, DDLAddVectorIndexErrorMemoryExceed) +try +{ + { + auto indexes = std::make_shared(); + store = reload(indexes); + ASSERT_EQ(store->getLocalIndexInfosSnapshot(), nullptr); + } + + const size_t num_rows_write = 128; + + // write to store before index built + write(num_rows_write); + // trigger mergeDelta for all segments + triggerMergeDelta(); + + IndexID index_id = 2; + // Add vecotr index + TiDB::TableInfo new_table_info_with_vector_index; + TiDB::ColumnInfo column_info; + column_info.name = VectorIndexTestUtils::vec_column_name; + column_info.id = VectorIndexTestUtils::vec_column_id; + new_table_info_with_vector_index.columns.emplace_back(column_info); + TiDB::IndexInfo index; + index.id = index_id; + TiDB::IndexColumnInfo index_col_info; + index_col_info.name = VectorIndexTestUtils::vec_column_name; + index_col_info.offset = 0; + index.idx_cols.emplace_back(index_col_info); + index.vector_index = TiDB::VectorIndexDefinitionPtr(new TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 1, + .distance_metric = tipb::VectorDistanceMetric::L2, + }); + new_table_info_with_vector_index.index_infos.emplace_back(index); + + // enable failpoint to mock fail to build index due to memory limit + FailPointHelper::enableFailPoint(FailPoints::force_local_index_task_memory_limit_exceeded); + store->applyLocalIndexChange(new_table_info_with_vector_index); + ASSERT_EQ(store->local_index_infos->size(), 1); + + { + auto indexes_stat = store->getLocalIndexStats(); + ASSERT_EQ(indexes_stat.size(), 1); + auto index_stat = indexes_stat[0]; + ASSERT_EQ(index_id, index_stat.index_id); + ASSERT_EQ(VectorIndexTestUtils::vec_column_id, index_stat.column_id); + ASSERT_FALSE(index_stat.error_message.empty()) << index_stat.error_message; + ASSERT_NE(index_stat.error_message.find("exceeds limit"), std::string::npos) << index_stat.error_message; + } +} +CATCH + +TEST_F(DeltaMergeStoreVectorTest, DDLAddVectorIndexErrorBuildException) +try +{ + { + auto indexes = std::make_shared(); + store = reload(indexes); + ASSERT_EQ(store->getLocalIndexInfosSnapshot(), nullptr); + } + + const size_t num_rows_write = 128; + + // write to store before index built + write(num_rows_write); + // trigger mergeDelta for all segments + triggerMergeDelta(); + + IndexID index_id = 2; + // Add vecotr index + TiDB::TableInfo new_table_info_with_vector_index; + TiDB::ColumnInfo column_info; + column_info.name = VectorIndexTestUtils::vec_column_name; + column_info.id = VectorIndexTestUtils::vec_column_id; + new_table_info_with_vector_index.columns.emplace_back(column_info); + TiDB::IndexInfo index; + index.id = index_id; + TiDB::IndexColumnInfo index_col_info; + index_col_info.name = VectorIndexTestUtils::vec_column_name; + index_col_info.offset = 0; + index.idx_cols.emplace_back(index_col_info); + index.vector_index = TiDB::VectorIndexDefinitionPtr(new TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 1, + .distance_metric = tipb::VectorDistanceMetric::L2, + }); + new_table_info_with_vector_index.index_infos.emplace_back(index); + + // enable failpoint to mock fail to build index due to memory limit + FailPointHelper::enableFailPoint(FailPoints::exception_build_local_index_for_file); + store->applyLocalIndexChange(new_table_info_with_vector_index); + ASSERT_EQ(store->local_index_infos->size(), 1); + + auto scheduler = db_context->getGlobalLocalIndexerScheduler(); + scheduler->waitForFinish(); + + { + auto indexes_stat = store->getLocalIndexStats(); + ASSERT_EQ(indexes_stat.size(), 1); + auto index_stat = indexes_stat[0]; + ASSERT_EQ(index_id, index_stat.index_id); + ASSERT_EQ(VectorIndexTestUtils::vec_column_id, index_stat.column_id); + ASSERT_FALSE(index_stat.error_message.empty()) << index_stat.error_message; + ASSERT_NE(index_stat.error_message.find("Fail point"), std::string::npos) << index_stat.error_message; + } +} +CATCH + } // namespace DB::DM::tests diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp index ab37b728f4c..5dde081769c 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp @@ -21,7 +21,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h index f01d08899a8..37c93696c21 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h @@ -16,7 +16,7 @@ #include #include -#include +#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_local_index_info.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_local_index_info.cpp index 33496a5f0c5..4ea8ab1f4e5 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_local_index_info.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_local_index_info.cpp @@ -13,7 +13,7 @@ // limitations under the License. #include -#include +#include #include #include #include @@ -42,10 +42,10 @@ try LocalIndexInfosPtr index_info = nullptr; // check the same { - auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger); + auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger).new_local_index_infos; ASSERT_EQ(new_index_info, nullptr); // check again, nothing changed, return nullptr - ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger)); + ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger).new_local_index_infos); // update index_info = new_index_info; @@ -71,7 +71,7 @@ try FailPointHelper::enableFailPoint(FailPoints::force_not_support_vector_index); // check the result when storage format not support - auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger); + auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger).new_local_index_infos; ASSERT_NE(new_index_info, nullptr); // always return empty index_info, we need to drop all existing indexes ASSERT_TRUE(new_index_info->empty()); @@ -93,10 +93,10 @@ try LocalIndexInfosPtr index_info = nullptr; // check the same { - auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger); + auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger).new_local_index_infos; ASSERT_EQ(new_index_info, nullptr); // check again, nothing changed, return nullptr - ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger)); + ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger).new_local_index_infos); // update index_info = new_index_info; @@ -121,7 +121,7 @@ try // check the different { - auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger); + auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger).new_local_index_infos; ASSERT_NE(new_index_info, nullptr); ASSERT_EQ(new_index_info->size(), 1); const auto & idx = (*new_index_info)[0]; @@ -134,7 +134,7 @@ try ASSERT_EQ(expect_idx.vector_index->distance_metric, idx.index_definition->distance_metric); // check again, nothing changed, return nullptr - ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger)); + ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger).new_local_index_infos); // update index_info = new_index_info; @@ -154,7 +154,7 @@ try } // check the different { - auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger); + auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger).new_local_index_infos; ASSERT_NE(new_index_info, nullptr); ASSERT_EQ(new_index_info->size(), 2); const auto & idx0 = (*new_index_info)[0]; @@ -175,7 +175,7 @@ try ASSERT_EQ(expect_idx2.vector_index->distance_metric, idx1.index_definition->distance_metric); // check again, nothing changed, return nullptr - ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger)); + ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger).new_local_index_infos); // update index_info = new_index_info; @@ -198,7 +198,7 @@ try } // check the different { - auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger); + auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger).new_local_index_infos; ASSERT_NE(new_index_info, nullptr); ASSERT_EQ(new_index_info->size(), 2); const auto & idx0 = (*new_index_info)[0]; @@ -219,7 +219,7 @@ try ASSERT_EQ(expect_idx3.vector_index->distance_metric, idx1.index_definition->distance_metric); // check again, nothing changed, return nullptr - ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger)); + ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger).new_local_index_infos); } } CATCH @@ -269,7 +269,7 @@ try auto logger = Logger::get(); LocalIndexInfosPtr index_info = nullptr; { - auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger); + auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger).new_local_index_infos; ASSERT_NE(new_index_info, nullptr); ASSERT_EQ(new_index_info->size(), 2); @@ -292,9 +292,9 @@ try ASSERT_EQ(expect_idx.vector_index->distance_metric, idx1.index_definition->distance_metric); // check again, table_info.index_infos doesn't change and return them LocalIndexInfosPtr empty_index_info = nullptr; - ASSERT_EQ(2, generateLocalIndexInfos(empty_index_info, table_info, logger)->size()); + ASSERT_EQ(2, generateLocalIndexInfos(empty_index_info, table_info, logger).new_local_index_infos->size()); // check again with the same table_info, nothing changed, return nullptr - ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger)); + ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger).new_local_index_infos); // update index_info = new_index_info; @@ -317,7 +317,7 @@ try } // check the different { - auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger); + auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger).new_local_index_infos; ASSERT_NE(new_index_info, nullptr); ASSERT_EQ(new_index_info->size(), 2); @@ -340,7 +340,7 @@ try ASSERT_EQ(expect_idx2.vector_index->distance_metric, idx1.index_definition->distance_metric); // check again, nothing changed, return nullptr - ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger)); + ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger).new_local_index_infos); } } CATCH @@ -373,7 +373,7 @@ TEST(LocalIndexInfoTest, CheckIndexDropDefinedInColumnInfo) LocalIndexInfosPtr index_info = nullptr; { // check the different with nullptr - auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger); + auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger).new_local_index_infos; ASSERT_NE(nullptr, new_index_info); ASSERT_EQ(new_index_info->size(), 1); const auto & idx0 = (*new_index_info)[0]; @@ -386,7 +386,7 @@ TEST(LocalIndexInfoTest, CheckIndexDropDefinedInColumnInfo) ASSERT_EQ(tipb::VectorDistanceMetric::INNER_PRODUCT, idx0.index_definition->distance_metric); // check again, nothing changed, return nullptr - ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger)); + ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger).new_local_index_infos); // update index_info = new_index_info; @@ -396,7 +396,7 @@ TEST(LocalIndexInfoTest, CheckIndexDropDefinedInColumnInfo) table_info.columns.erase(table_info.columns.begin()); { // check the different with existing index_info - auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger); + auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger).new_local_index_infos; ASSERT_NE(nullptr, new_index_info); // not null ASSERT_NE(new_index_info, nullptr); @@ -404,7 +404,7 @@ TEST(LocalIndexInfoTest, CheckIndexDropDefinedInColumnInfo) ASSERT_EQ(new_index_info->size(), 0); // check again, nothing changed, return nullptr - ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger)); + ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger).new_local_index_infos); // update index_info = new_index_info; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_local_indexer_scheduler.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_local_indexer_scheduler.cpp index dbd53d3b00a..43a9808e78c 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_local_indexer_scheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_local_indexer_scheduler.cpp @@ -225,26 +225,26 @@ try .auto_start = false, }); - ASSERT_THROW( - { - scheduler->pushTask({ - .keyspace_id = 1, - .table_id = 1, - .file_ids = {LocalIndexerScheduler::DMFileID(1)}, - .request_memory = 100, - .workload = [&]() { pushResult("foo"); }, - }); - }, - DB::Exception); - ASSERT_NO_THROW({ - scheduler->pushTask({ + { + auto [ok, reason] = scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .file_ids = {LocalIndexerScheduler::DMFileID(1)}, + .request_memory = 100, // exceed memory limit + .workload = [&]() { pushResult("foo"); }, + }); + ASSERT_FALSE(ok); + } + { + auto [ok, reason] = scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, .file_ids = {LocalIndexerScheduler::DMFileID(2)}, .request_memory = 0, .workload = [&]() { pushResult("bar"); }, }); - }); + ASSERT_TRUE(ok); + } scheduler->start(); scheduler->waitForFinish(); @@ -259,24 +259,26 @@ try .memory_limit = 0, }); - ASSERT_NO_THROW({ - scheduler->pushTask({ + { + auto [ok, reason] = scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, .file_ids = {LocalIndexerScheduler::DMFileID(3)}, .request_memory = 100, .workload = [&]() { pushResult("foo"); }, }); - }); - ASSERT_NO_THROW({ - scheduler->pushTask({ + ASSERT_TRUE(ok); + } + { + auto [ok, reason] = scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, .file_ids = {LocalIndexerScheduler::DMFileID(4)}, .request_memory = 0, .workload = [&]() { pushResult("bar"); }, }); - }); + ASSERT_TRUE(ok); + }; scheduler->start(); scheduler->waitForFinish(); diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 9188e84f4a6..30a86d897c3 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -44,7 +44,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index a66b9de2572..4d9813a4c44 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -24,7 +24,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp b/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp index 1a1daf2b447..6da97c76aca 100644 --- a/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp +++ b/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp @@ -43,7 +43,6 @@ StorageSystemDTLocalIndexes::StorageSystemDTLocalIndexes(const std::string & nam {"table_id", std::make_shared()}, {"belonging_table_id", std::make_shared()}, - {"column_name", std::make_shared()}, {"column_id", std::make_shared()}, {"index_id", std::make_shared()}, {"index_kind", std::make_shared()}, @@ -52,6 +51,10 @@ StorageSystemDTLocalIndexes::StorageSystemDTLocalIndexes(const std::string & nam {"rows_stable_not_indexed", std::make_shared()}, // Total rows {"rows_delta_indexed", std::make_shared()}, // Total rows {"rows_delta_not_indexed", std::make_shared()}, // Total rows + + // Fatal message when building local index + // when this is not an empty string, it means the build job of this local is aborted + {"error_message", std::make_shared()}, })); } @@ -119,7 +122,6 @@ BlockInputStreams StorageSystemDTLocalIndexes::read( res_columns[j++]->insert(table_id); res_columns[j++]->insert(table_info.belonging_table_id); - res_columns[j++]->insert(String("")); // TODO: let tidb set the column_name and index_name by itself res_columns[j++]->insert(stat.column_id); res_columns[j++]->insert(stat.index_id); res_columns[j++]->insert(stat.index_kind); @@ -128,6 +130,8 @@ BlockInputStreams StorageSystemDTLocalIndexes::read( res_columns[j++]->insert(stat.rows_stable_not_indexed); res_columns[j++]->insert(stat.rows_delta_indexed); res_columns[j++]->insert(stat.rows_delta_not_indexed); + + res_columns[j++]->insert(stat.error_message); } } }