Skip to content

Commit

Permalink
fix race on scan start with indexation cleaning blobs (ydb-platform#7968
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ivanmorozov333 authored Aug 19, 2024
1 parent 274bfdd commit 5522a96
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 170 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#include "constructor.h"
#include "resolver.h"
#include "read_metadata.h"
#include "resolver.h"

#include <ydb/core/tx/columnshard/columnshard_impl.h>

namespace NKikimr::NOlap::NReader::NPlain {

NKikimr::TConclusionStatus TIndexScannerConstructor::ParseProgram(const TVersionedIndex* vIndex, const NKikimrTxDataShard::TEvKqpScan& proto, TReadDescription& read) const {
NKikimr::TConclusionStatus TIndexScannerConstructor::ParseProgram(
const TVersionedIndex* vIndex, const NKikimrTxDataShard::TEvKqpScan& proto, TReadDescription& read) const {
AFL_VERIFY(vIndex);
auto& indexInfo = vIndex->GetSchema(Snapshot)->GetIndexInfo();
TIndexColumnResolver columnResolver(indexInfo);
Expand All @@ -17,15 +19,17 @@ std::vector<TNameTypeInfo> TIndexScannerConstructor::GetPrimaryKeyScheme(const N
return indexInfo.GetPrimaryKeyColumns();
}

NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> TIndexScannerConstructor::DoBuildReadMetadata(const NColumnShard::TColumnShard* self, const TReadDescription& read) const {
NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> TIndexScannerConstructor::DoBuildReadMetadata(
const NColumnShard::TColumnShard* self, const TReadDescription& read) const {
auto& insertTable = self->InsertTable;
auto& index = self->TablesManager.GetPrimaryIndex();
if (!insertTable || !index) {
return std::shared_ptr<TReadMetadataBase>();
}

if (read.GetSnapshot().GetPlanInstant() < self->GetMinReadSnapshot().GetPlanInstant()) {
return TConclusionStatus::Fail(TStringBuilder() << "Snapshot too old: " << read.GetSnapshot());
return TConclusionStatus::Fail(TStringBuilder() << "Snapshot too old: " << read.GetSnapshot() << ". CS min read snapshot: "
<< self->GetMinReadSnapshot() << ". now: " << TInstant::Now());
}

TDataStorageAccessor dataAccessor(insertTable, index);
Expand All @@ -39,4 +43,4 @@ NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> TIndexScannerConstructo
return static_pointer_cast<TReadMetadataBase>(readMetadata);
}

}
} // namespace NKikimr::NOlap::NReader::NPlain
Original file line number Diff line number Diff line change
@@ -1,94 +1,86 @@
#include "tx_internal_scan.h"
#include <ydb/core/tx/columnshard/engines/reader/actor/actor.h>
#include <ydb/core/tx/columnshard/engines/reader/sys_view/constructor/constructor.h>
#include <ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.h>

#include <ydb/core/formats/arrow/arrow_batch_builder.h>
#include <ydb/core/sys_view/common/schema.h>
#include <ydb/core/tx/columnshard/engines/reader/actor/actor.h>
#include <ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.h>
#include <ydb/core/tx/columnshard/engines/reader/sys_view/abstract/policy.h>
#include <ydb/core/tx/columnshard/engines/reader/sys_view/constructor/constructor.h>

namespace NKikimr::NOlap::NReader {

bool TTxInternalScan::Execute(TTransactionContext& /*txc*/, const TActorContext& /*ctx*/) {
TMemoryProfileGuard mpg("TTxInternalScan::Execute");
void TTxInternalScan::SendError(const TString& problem, const TString& details, const TActorContext& ctx) const {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan failed")("problem", problem)("details", details);
auto& request = *InternalScanEvent->Get();
const TSnapshot snapshot = request.ReadToSnapshot.value_or(NOlap::TSnapshot(Self->LastPlannedStep, Self->LastPlannedTxId));
auto scanComputeActor = InternalScanEvent->Sender;

TReadDescription read(snapshot, request.GetReverse());
read.PathId = request.GetPathId();
read.ReadNothing = !Self->TablesManager.HasTable(read.PathId);
std::unique_ptr<IScannerConstructor> scannerConstructor(new NPlain::TIndexScannerConstructor(snapshot, request.GetItemsLimit(), request.GetReverse()));
read.ColumnIds = request.GetColumnIds();
read.ColumnNames = request.GetColumnNames();
if (request.RangesFilter) {
read.PKRangesFilter = std::move(*request.RangesFilter);
}
auto ev = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(ScanGen, Self->TabletID());
ev->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST);
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_BAD_REQUEST,
TStringBuilder() << "Table " << request.GetPathId() << " (shard " << Self->TabletID() << ") scan failed, reason: " << problem << "/"
<< details);
NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add());

const TVersionedIndex* vIndex = Self->GetIndexOptional() ? &Self->GetIndexOptional()->GetVersionedIndex() : nullptr;
AFL_VERIFY(vIndex);
{
TProgramContainer pContainer;
pContainer.OverrideProcessingColumns(read.ColumnNames);
read.SetProgram(std::move(pContainer));
}
ctx.Send(scanComputeActor, ev.Release());
}

{
auto newRange = scannerConstructor->BuildReadMetadata(Self, read);
if (!newRange) {
ErrorDescription = newRange.GetErrorMessage();
ReadMetadataRange = nullptr;
return true;
}
ReadMetadataRange = newRange.DetachResult();
}
AFL_VERIFY(ReadMetadataRange);
bool TTxInternalScan::Execute(TTransactionContext& /*txc*/, const TActorContext& /*ctx*/) {
return true;
}

void TTxInternalScan::Complete(const TActorContext& ctx) {
TMemoryProfileGuard mpg("TTxInternalScan::Complete");

auto& request = *InternalScanEvent->Get();
auto scanComputeActor = InternalScanEvent->Sender;
const NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build()("tablet", Self->TabletID());

if (!ReadMetadataRange) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan failed")("reason", "no metadata")("error", ErrorDescription);
const TSnapshot snapshot = request.ReadToSnapshot.value_or(NOlap::TSnapshot(Self->LastPlannedStep, Self->LastPlannedTxId));
const NActors::TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build()("tablet", Self->TabletID())("snapshot", snapshot.DebugString());
TReadMetadataPtr readMetadataRange;
{
TReadDescription read(snapshot, request.GetReverse());
read.PathId = request.GetPathId();
read.ReadNothing = !Self->TablesManager.HasTable(read.PathId);
std::unique_ptr<IScannerConstructor> scannerConstructor(
new NPlain::TIndexScannerConstructor(snapshot, request.GetItemsLimit(), request.GetReverse()));
read.ColumnIds = request.GetColumnIds();
read.ColumnNames = request.GetColumnNames();
if (request.RangesFilter) {
read.PKRangesFilter = std::move(*request.RangesFilter);
}

auto ev = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(ScanGen, Self->TabletID());
ev->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST);
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_BAD_REQUEST, TStringBuilder()
<< "Table " << request.GetPathId() << " (shard " << Self->TabletID() << ") scan failed, reason: " << ErrorDescription ? ErrorDescription : "no metadata ranges");
NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add());
const TVersionedIndex* vIndex = Self->GetIndexOptional() ? &Self->GetIndexOptional()->GetVersionedIndex() : nullptr;
AFL_VERIFY(vIndex);
{
TProgramContainer pContainer;
pContainer.OverrideProcessingColumns(read.ColumnNames);
read.SetProgram(std::move(pContainer));
}

ctx.Send(scanComputeActor, ev.Release());
return;
{
auto newRange = scannerConstructor->BuildReadMetadata(Self, read);
if (!newRange) {
return SendError("cannot create read metadata", newRange.GetErrorMessage(), ctx);
}
readMetadataRange = TValidator::CheckNotNull(newRange.DetachResult());
}
}

TStringBuilder detailedInfo;
if (IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_TRACE, NKikimrServices::TX_COLUMNSHARD)) {
detailedInfo << " read metadata: (" << *ReadMetadataRange << ")";
detailedInfo << " read metadata: (" << *readMetadataRange << ")";
}

const TVersionedIndex* index = nullptr;
if (Self->HasIndex()) {
index = &Self->GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex();
}
const TConclusion<ui64> requestCookie = Self->InFlightReadsTracker.AddInFlightRequest(ReadMetadataRange, index);
if (!requestCookie) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan failed")("reason", requestCookie.GetErrorMessage())("trace_details", detailedInfo);
auto ev = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(ScanGen, Self->TabletID());

ev->Record.SetStatus(Ydb::StatusIds::INTERNAL_ERROR);
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, TStringBuilder()
<< "Table " << request.GetPathId() << " (shard " << Self->TabletID() << ") scan failed, reason: " << requestCookie.GetErrorMessage());
NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add());
Self->Counters.GetScanCounters().OnScanFinished(NColumnShard::TScanCounters::EStatusFinish::CannotAddInFlight, TDuration::Zero());
ctx.Send(scanComputeActor, ev.Release());
return;
}
auto scanActor = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor, Self->GetStoragesManager(),
TComputeShardingPolicy(), ScanId, TxId, ScanGen, *requestCookie, Self->TabletID(), TDuration::Max(), ReadMetadataRange,
NKikimrDataEvents::FORMAT_ARROW, Self->Counters.GetScanCounters()));
const ui64 requestCookie = Self->InFlightReadsTracker.AddInFlightRequest(readMetadataRange, index);
auto scanActor = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor, Self->GetStoragesManager(), TComputeShardingPolicy(),
ScanId, TxId, ScanGen, requestCookie, Self->TabletID(), TDuration::Max(), readMetadataRange, NKikimrDataEvents::FORMAT_ARROW,
Self->Counters.GetScanCounters()));

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxInternalScan started")("actor_id", scanActor)("trace_detailed", detailedInfo);
}

}
} // namespace NKikimr::NOlap::NReader
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class TTxInternalScan: public NTabletFlatExecutor::TTransactionBase<NColumnShard
const ui32 ScanGen = 1;
const ui32 TxId = 1;
const ui32 ScanId = 1;
void SendError(const TString& problem, const TString& details, const TActorContext& ctx) const;

public:
using TReadMetadataPtr = TReadMetadataBase::TConstPtr;

Expand All @@ -23,9 +25,7 @@ class TTxInternalScan: public NTabletFlatExecutor::TTransactionBase<NColumnShard
TTxType GetTxType() const override { return NColumnShard::TXTYPE_START_INTERNAL_SCAN; }

private:
TString ErrorDescription;
TEvColumnShard::TEvInternalScan::TPtr InternalScanEvent;
TReadMetadataPtr ReadMetadataRange;
};

}
Loading

0 comments on commit 5522a96

Please sign in to comment.