Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage: Add error message when fail to build local index #9476

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ namespace DB
M(force_remote_read_for_batch_cop_once) \
M(exception_new_dynamic_thread) \
M(force_wait_index_timeout) \
M(force_local_index_task_memory_limit_exceeded) \
M(exception_build_local_index_for_file) \
M(force_not_support_vector_index) \
M(sync_schema_request_failure)

Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/Filter/PushDownFilter.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>
#include <Storages/DeltaMerge/Index/IndexInfo.h>
#include <Storages/DeltaMerge/Index/LocalIndexInfo.h>
#include <Storages/DeltaMerge/LocalIndexerScheduler.h>
#include <Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h>
#include <Storages/DeltaMerge/ReadThread/UnorderedInputStream.h>
Expand Down Expand Up @@ -363,7 +363,7 @@ DeltaMergeStorePtr DeltaMergeStore::create(
settings_,
thread_pool);
std::shared_ptr<DeltaMergeStore> store_shared_ptr(store);
store_shared_ptr->checkAllSegmentsLocalIndex();
store_shared_ptr->checkAllSegmentsLocalIndex({});
return store_shared_ptr;
}

Expand Down Expand Up @@ -2032,20 +2032,20 @@ void DeltaMergeStore::applySchemaChanges(TiDB::TableInfo & table_info)
void DeltaMergeStore::applyLocalIndexChange(const TiDB::TableInfo & new_table_info)
{
// Get a snapshot on the local_index_infos to check whether any new index is created
auto new_local_index_infos = generateLocalIndexInfos(getLocalIndexInfosSnapshot(), new_table_info, log);
auto changeset = generateLocalIndexInfos(getLocalIndexInfosSnapshot(), new_table_info, log);

// no index is created or dropped
if (!new_local_index_infos)
if (!changeset.new_local_index_infos)
return;

{
// new index created, update the info in-memory thread safety between `getLocalIndexInfosSnapshot`
std::unique_lock index_write_lock(mtx_local_index_infos);
local_index_infos.swap(new_local_index_infos);
local_index_infos.swap(changeset.new_local_index_infos);
}

// generate async tasks for building local index for all segments
checkAllSegmentsLocalIndex();
checkAllSegmentsLocalIndex(std::move(changeset.dropped_indexes));
}

SortDescription DeltaMergeStore::getPrimarySortDescription() const
Expand Down
17 changes: 10 additions & 7 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include <Storages/DeltaMerge/DeltaMergeInterfaces.h>
#include <Storages/DeltaMerge/File/DMFile_fwd.h>
#include <Storages/DeltaMerge/Filter/PushDownFilter.h>
#include <Storages/DeltaMerge/Index/IndexInfo.h>
#include <Storages/DeltaMerge/Index/LocalIndexInfo.h>
#include <Storages/DeltaMerge/Remote/DisaggSnapshot_fwd.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>
Expand Down Expand Up @@ -70,6 +70,7 @@ using NotCompress = std::unordered_set<ColId>;
using SegmentIdSet = std::unordered_set<UInt64>;
struct ExternalDTFileInfo;
struct GCOptions;
struct LocalIndexBuildInfo;

namespace tests
{
Expand Down Expand Up @@ -184,6 +185,9 @@ struct LocalIndexStats
UInt64 rows_stable_not_indexed{}; // Total rows
UInt64 rows_delta_indexed{}; // Total rows
UInt64 rows_delta_not_indexed{}; // Total rows

// If the index is finally failed to be built, then this is not empty
String error_message{};
};
using LocalIndexesStats = std::vector<LocalIndexStats>;

Expand Down Expand Up @@ -730,11 +734,9 @@ class DeltaMergeStore
MergeDeltaReason reason,
SegmentSnapshotPtr segment_snap = nullptr);

void segmentEnsureStableIndex(
DMContext & dm_context,
const LocalIndexInfosPtr & index_info,
const DMFiles & dm_files,
const String & source_segment_info);
void segmentEnsureStableIndex(DMContext & dm_context, const LocalIndexBuildInfo & index_build_info);

void segmentEnsureStableIndexWithErrorReport(DMContext & dm_context, const LocalIndexBuildInfo & index_build_info);

/**
* Ingest a DMFile into the segment, optionally causing a new segment being created.
Expand Down Expand Up @@ -877,8 +879,9 @@ class DeltaMergeStore

/**
* Check whether there are new local indexes should be built for all segments.
* If dropped_indexes is not empty, try to cleanup the dropped_indexes
*/
void checkAllSegmentsLocalIndex();
void checkAllSegmentsLocalIndex(std::vector<IndexID> && dropped_indexes);

/**
* Ensure the segment has stable index.
Expand Down
170 changes: 125 additions & 45 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Common/SyncPoint/SyncPoint.h>
#include <Common/TiFlashMetrics.h>
#include <Interpreters/Context.h>
Expand All @@ -21,6 +22,7 @@
#include <Storages/DeltaMerge/LocalIndexerScheduler.h>
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/DeltaMerge/WriteBatchesImpl.h>
#include <common/logger_useful.h>

#include <magic_enum.hpp>

Expand Down Expand Up @@ -450,7 +452,7 @@ SegmentPtr DeltaMergeStore::segmentMerge(
return merged;
}

void DeltaMergeStore::checkAllSegmentsLocalIndex()
void DeltaMergeStore::checkAllSegmentsLocalIndex(std::vector<IndexID> && dropped_indexes)
{
if (!getLocalIndexInfosSnapshot())
return;
Expand Down Expand Up @@ -512,6 +514,9 @@ void DeltaMergeStore::checkAllSegmentsLocalIndex()
for (const auto & [end, segment] : segments)
{
UNUSED(end);
// cleanup the index error messaage for dropped indexes
segment->clearIndexBuildError(dropped_indexes);

if (segmentEnsureStableIndexAsync(segment))
++segments_missing_indexes;
}
Expand All @@ -533,35 +538,61 @@ bool DeltaMergeStore::segmentEnsureStableIndexAsync(const SegmentPtr & segment)
return false;

// No lock is needed, stable meta is immutable.
auto dm_files = segment->getStable()->getDMFiles();
auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, dm_files);
if (!build_info.indexes_to_build || build_info.indexes_to_build->empty())
const auto build_info
= DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, segment->getStable()->getDMFiles());
if (!build_info.indexes_to_build || build_info.indexes_to_build->empty() || build_info.dm_files.empty())
return false;

auto store_weak_ptr = weak_from_this();
auto tracing_id = fmt::format("segmentEnsureStableIndex<{}>", log->identifier());
auto workload = [store_weak_ptr, build_info, dm_files, segment, tracing_id]() -> void {
auto tracing_id
= fmt::format("segmentEnsureStableIndex<{}> source_segment={}", log->identifier(), segment->simpleInfo());
auto workload = [store_weak_ptr, build_info, tracing_id]() -> void {
auto store = store_weak_ptr.lock();
if (store == nullptr) // Store is destroyed before the task is executed.
return;
auto dm_context = store->newDMContext( //
store->global_context,
store->global_context.getSettingsRef(),
tracing_id);
const auto source_segment_info = segment->simpleInfo();
store->segmentEnsureStableIndex(*dm_context, build_info.indexes_to_build, dm_files, source_segment_info);
store->segmentEnsureStableIndexWithErrorReport(*dm_context, build_info);
};

auto indexer_scheduler = global_context.getGlobalLocalIndexerScheduler();
RUNTIME_CHECK(indexer_scheduler != nullptr);
indexer_scheduler->pushTask(LocalIndexerScheduler::Task{
.keyspace_id = keyspace_id,
.table_id = physical_table_id,
.file_ids = build_info.file_ids,
.request_memory = build_info.estimated_memory_bytes,
.workload = workload,
});
return true;
try
{
// new task of these index are generated, clear existing error_message in segment
segment->clearIndexBuildError(build_info.indexesIDs());

auto [ok, reason] = indexer_scheduler->pushTask(LocalIndexerScheduler::Task{
.keyspace_id = keyspace_id,
.table_id = physical_table_id,
.file_ids = build_info.filesIDs(),
.request_memory = build_info.estimated_memory_bytes,
.workload = workload,
});
if (ok)
return true;

segment->setIndexBuildError(build_info.indexesIDs(), reason);
LOG_ERROR(
log->getChild(tracing_id),
"Failed to generate async segment stable index task, index_ids={} reason={}",
build_info.indexesIDs(),
reason);
return false;
}
catch (...)
{
const auto message = getCurrentExceptionMessage(false, false);
segment->setIndexBuildError(build_info.indexesIDs(), message);

tryLogCurrentException(log);

// catch and ignore the exception
// not able to push task to index scheduler
return false;
}
}

bool DeltaMergeStore::segmentWaitStableIndexReady(const SegmentPtr & segment) const
Expand All @@ -574,8 +605,8 @@ bool DeltaMergeStore::segmentWaitStableIndexReady(const SegmentPtr & segment) co

// No lock is needed, stable meta is immutable.
auto segment_id = segment->segmentId();
auto dm_files = segment->getStable()->getDMFiles();
auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, dm_files);
auto build_info
= DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, segment->getStable()->getDMFiles());
if (!build_info.indexes_to_build || build_info.indexes_to_build->empty())
return true;

Expand Down Expand Up @@ -650,11 +681,7 @@ SegmentPtr DeltaMergeStore::segmentUpdateMeta(
return new_segment;
}

void DeltaMergeStore::segmentEnsureStableIndex(
DMContext & dm_context,
const LocalIndexInfosPtr & index_info,
const DMFiles & dm_files,
const String & source_segment_info)
void DeltaMergeStore::segmentEnsureStableIndex(DMContext & dm_context, const LocalIndexBuildInfo & index_build_info)
{
// 1. Acquire a snapshot for PageStorage, and keep the snapshot until index is built.
// This helps keep DMFile valid during the index build process.
Expand All @@ -674,8 +701,10 @@ void DeltaMergeStore::segmentEnsureStableIndex(
dm_context.tracing_id,
/*snapshot_read*/ true);

RUNTIME_CHECK(dm_files.size() == 1); // size > 1 is currently not supported.
const auto & dm_file = dm_files[0];
auto tracing_logger = log->getChild(getLogTracingId(dm_context));

RUNTIME_CHECK(index_build_info.dm_files.size() == 1); // size > 1 is currently not supported.
const auto & dm_file = index_build_info.dm_files[0];

auto is_file_valid = [this, dm_file] {
std::shared_lock lock(read_write_mutex);
Expand All @@ -686,24 +715,20 @@ void DeltaMergeStore::segmentEnsureStableIndex(
// 2. Check whether the DMFile has been referenced by any valid segment.
if (!is_file_valid())
{
LOG_DEBUG(
log,
"EnsureStableIndex - Give up because no segment to update, source_segment={}",
source_segment_info);
LOG_DEBUG(tracing_logger, "EnsureStableIndex - Give up because no segment to update");
return;
}

LOG_INFO(
log,
"EnsureStableIndex - Begin building index, dm_files={} source_segment={}",
DMFile::info(dm_files),
source_segment_info);
tracing_logger,
"EnsureStableIndex - Begin building index, dm_files={}",
DMFile::info(index_build_info.dm_files));

// 2. Build the index.
DMFileIndexWriter iw(DMFileIndexWriter::Options{
.path_pool = path_pool,
.index_infos = index_info,
.dm_files = dm_files,
.index_infos = index_build_info.indexes_to_build,
.dm_files = index_build_info.dm_files,
.dm_context = dm_context,
});

Expand All @@ -719,11 +744,9 @@ void DeltaMergeStore::segmentEnsureStableIndex(
if (e.code() == ErrorCodes::ABORTED)
{
LOG_INFO(
log,
"EnsureStableIndex - Build index aborted because DMFile is no longer valid, dm_files={} "
"source_segment={}",
DMFile::info(dm_files),
source_segment_info);
tracing_logger,
"EnsureStableIndex - Build index aborted because DMFile is no longer valid, dm_files={}",
DMFile::info(index_build_info.dm_files));
return;
}
throw;
Expand All @@ -732,10 +755,9 @@ void DeltaMergeStore::segmentEnsureStableIndex(
RUNTIME_CHECK(!new_dmfiles.empty());

LOG_INFO(
log,
"EnsureStableIndex - Finish building index, dm_files={} source_segment={}",
DMFile::info(dm_files),
source_segment_info);
tracing_logger,
"EnsureStableIndex - Finish building index, dm_files={}",
DMFile::info(index_build_info.dm_files));

// 3. Update the meta version of the segments to the latest one.
// To avoid logical split between step 2 and 3, get lastest segments to update again.
Expand All @@ -751,8 +773,66 @@ void DeltaMergeStore::segmentEnsureStableIndex(
auto segment = id_to_segment[seg_id];
auto new_segment = segmentUpdateMeta(lock, dm_context, segment, new_dmfiles);
// Expect update meta always success, because the segment must be valid and bump meta should succeed.
RUNTIME_CHECK_MSG(new_segment != nullptr, "Update meta failed for segment {}", segment->simpleInfo());
RUNTIME_CHECK_MSG(
new_segment != nullptr,
"Update meta failed for segment {} ident={}",
segment->simpleInfo(),
tracing_logger->identifier());
}
}
}

// A wrapper of `segmentEnsureStableIndex`
// If any exception thrown, the error message will be recorded to
// the related segment(s)
void DeltaMergeStore::segmentEnsureStableIndexWithErrorReport(
DMContext & dm_context,
const LocalIndexBuildInfo & index_build_info)
{
auto handle_error = [this, &index_build_info](const std::vector<IndexID> & index_ids) {
const auto message = getCurrentExceptionMessage(false, false);
std::unordered_map<PageIdU64, SegmentPtr> segment_to_add_msg;
{
std::unique_lock lock(read_write_mutex);
for (const auto & dmf : index_build_info.dm_files)
{
const auto segment_ids = dmfile_id_to_segment_ids.get(dmf->fileId());
for (const auto & seg_id : segment_ids)
{
if (segment_to_add_msg.contains(seg_id))
continue;
segment_to_add_msg.emplace(seg_id, id_to_segment[seg_id]);
}
}
}

for (const auto & [seg_id, seg] : segment_to_add_msg)
{
UNUSED(seg_id);
seg->setIndexBuildError(index_ids, message);
}
};

try
{
segmentEnsureStableIndex(dm_context, index_build_info);
}
catch (DB::Exception & e)
{
const auto index_ids = index_build_info.indexesIDs();
e.addMessage(fmt::format("while building stable index for index_ids={}", index_ids));
handle_error(index_ids);

// rethrow
throw;
}
catch (...)
{
const auto index_ids = index_build_info.indexesIDs();
handle_error(index_ids);

// rethrow
throw;
}
}

Expand Down
Loading