Skip to content

Commit

Permalink
Merge 273e018 into 9b503c2
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru authored Feb 7, 2024
2 parents 9b503c2 + 273e018 commit 0ac74bb
Show file tree
Hide file tree
Showing 15 changed files with 255 additions and 112 deletions.
4 changes: 4 additions & 0 deletions ydb/core/blobstorage/backpressure/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ class TBlobStorageQueue {
return Queues.InFlight.size();
}

ui64 GetInFlightCost() const {
return InFlightCost;
}

void UpdateCostModel(TInstant now, const NKikimrBlobStorage::TVDiskCostSettings& settings,
const TBlobStorageGroupType& type);
void InvalidateCosts();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,11 @@ class TVDiskBackpressureClientActor : public TActorBootstrapped<TVDiskBackpressu
<< " msgId# " << msgId << " sequenceId# " << sequenceId
<< " expectedMsgId# " << expectedMsgId << " expectedSequenceId# " << expectedSequenceId
<< " status# " << NKikimrProto::EReplyStatus_Name(status)
<< " ws# " << NKikimrBlobStorage::TWindowFeedback_EStatus_Name(ws));
<< " ws# " << NKikimrBlobStorage::TWindowFeedback_EStatus_Name(ws)
<< " InFlightCost# " << Queue.GetInFlightCost()
<< " InFlightCount# " << Queue.InFlightCount()
<< " ItemsWaiting# " << Queue.GetItemsWaiting()
<< " BytesWaiting# " << Queue.GetBytesWaiting());

switch (ws) {
case NKikimrBlobStorage::TWindowFeedback::IncorrectMsgId:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,7 @@ namespace NKikimr {
}
}

TWindowStatus *Processed(bool checkMsgId, const TMessageId &msgId, ui64 cost, TWindowStatus *opStatus) {
Y_UNUSED(checkMsgId);
Y_UNUSED(msgId);
TWindowStatus *Processed(bool /*checkMsgId*/, const TMessageId& /*msgId*/, ui64 cost, TWindowStatus *opStatus) {
Y_ABORT_UNLESS(Cost >= cost);
Cost -= cost;
--InFlight;
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,13 @@ void TGetImpl::PrepareRequests(TLogContext &logCtx, TDeque<std::unique_ptr<TEvBl
msg->SetId(ReaderTabletData->Id);
msg->SetGeneration(ReaderTabletData->Generation);
}
R_LOG_DEBUG_SX(logCtx, "BPG14", "Send get to orderNumber# " << get.OrderNumber
<< " vget# " << vget->ToString());
}

for (auto& vget : gets) {
if (vget) {
R_LOG_DEBUG_SX(logCtx, "BPG14", "Send get to orderNumber# "
<< Info->GetTopology().GetOrderNumber(VDiskIDFromVDiskID(vget->Record.GetVDiskID()))
<< " vget# " << vget->ToString());
outVGets.push_back(std::move(vget));
++RequestIndex;
}
Expand Down
26 changes: 17 additions & 9 deletions ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
TStackVec<bool, TypicalDisksInSubring> ReceivedResponseFlags;
TStackVec<bool, TypicalDisksInSubring> EmptyResponseFlags;
TStackVec<bool, TypicalDisksInSubring> ErrorResponseFlags;
TStackVec<bool, TypicalDisksInSubring> ForceStopFlags;
TBlobStorageGroupInfo::TVDiskIds VDisks;

bool UseVPatch = false;
Expand Down Expand Up @@ -332,8 +333,15 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
}

void Handle(TEvBlobStorage::TEvVPatchResult::TPtr &ev) {
ReceivedResults++;
NKikimrBlobStorage::TEvVPatchResult &record = ev->Get()->Record;

Y_ABORT_UNLESS(record.HasCookie());
ui8 subgroupIdx = record.GetCookie();
if (ForceStopFlags[subgroupIdx]) {
return; // ignore force stop response
}
ReceivedResults++;

PullOutStatusFlagsAndFressSpace(record);
Y_ABORT_UNLESS(record.HasStatus());
NKikimrProto::EReplyStatus status = record.GetStatus();
Expand All @@ -342,9 +350,6 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
errorReason = record.GetErrorReason();
}

Y_ABORT_UNLESS(record.HasCookie());
ui8 subgroupIdx = record.GetCookie();

PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA23, "Received VPatchResult",
(Status, status),
(SubgroupIdx, (ui32)subgroupIdx),
Expand Down Expand Up @@ -413,15 +418,16 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
void SendStopDiffs() {
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA18, "Send stop diffs");
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPatchDiff>> events;
for (ui32 vdiskIdx = 0; vdiskIdx < VDisks.size(); ++vdiskIdx) {
if (!ErrorResponseFlags[vdiskIdx] && !EmptyResponseFlags[vdiskIdx] && ReceivedResponseFlags[vdiskIdx]) {
for (ui32 subgroupIdx = 0; subgroupIdx < VDisks.size(); ++subgroupIdx) {
if (!ErrorResponseFlags[subgroupIdx] && !EmptyResponseFlags[subgroupIdx] && ReceivedResponseFlags[subgroupIdx]) {
std::unique_ptr<TEvBlobStorage::TEvVPatchDiff> ev = std::make_unique<TEvBlobStorage::TEvVPatchDiff>(
OriginalId, PatchedId, VDisks[vdiskIdx], 0, Deadline, vdiskIdx);
OriginalId, PatchedId, VDisks[subgroupIdx], 0, Deadline, subgroupIdx);
ev->SetForceEnd();
ForceStopFlags[subgroupIdx] = true;
events.emplace_back(std::move(ev));
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA19, "Send stop message",
(VDiskIdxInSubgroup, vdiskIdx),
(VDiskId, VDisks[vdiskIdx]));
(VDiskIdxInSubgroup, subgroupIdx),
(VDiskId, VDisks[subgroupIdx]));
}
}
SendToQueues(events, false);
Expand Down Expand Up @@ -495,6 +501,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA20, "Send TEvVPatchDiff",
(VDiskIdxInSubgroup, idxInSubgroup),
(PatchedVDiskIdxInSubgroup, patchedIdxInSubgroup),
(PartId, (ui64)partPlacement.PartId),
(DiffsForPart, diffsForPart.size()),
(ParityPlacements, parityPlacements.size()),
(WaitedXorDiffs, waitedXorDiffs));
Expand Down Expand Up @@ -586,6 +593,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
ReceivedResponseFlags.assign(VDisks.size(), false);
ErrorResponseFlags.assign(VDisks.size(), false);
EmptyResponseFlags.assign(VDisks.size(), false);
ForceStopFlags.assign(VDisks.size(), false);

TDeque<std::unique_ptr<TEvBlobStorage::TEvVPatchStart>> events;

Expand Down
9 changes: 9 additions & 0 deletions ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ struct TPDiskMockState::TImpl {
}
}

bool HasCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end) {
const ui64 chunkBegin = ui64(chunkIdx) * ChunkSize;
return static_cast<bool>(Corrupted & TIntervalSet{chunkBegin + begin, chunkBegin + end});
}

std::set<ui32> GetChunks() {
std::set<ui32> res;
for (auto& [ownerId, owner] : Owners) {
Expand Down Expand Up @@ -290,6 +295,10 @@ void TPDiskMockState::SetCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end, bool
Impl->SetCorruptedArea(chunkIdx, begin, end, enabled);
}

bool TPDiskMockState::HasCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end) {
return Impl->HasCorruptedArea(chunkIdx, begin, end);
}

std::set<ui32> TPDiskMockState::GetChunks() {
return Impl->GetChunks();
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/pdisk/mock/pdisk_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace NKikimr {
~TPDiskMockState();

void SetCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end, bool enabled);
bool HasCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end);
std::set<ui32> GetChunks();
TMaybe<NPDisk::TOwnerRound> GetOwnerRound(const TVDiskID& vDiskId) const;
ui32 GetChunkSize() const;
Expand Down
152 changes: 109 additions & 43 deletions ydb/core/blobstorage/ut_blobstorage/scrub_fast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,64 +16,130 @@ void Test() {

TString data = TString::Uninitialized(8_MB);
memset(data.Detach(), 'X', data.size());
TLogoBlobID id(1, 1, 1, 0, data.size(), 0);

{ // write data to group
TActorId sender = runtime->AllocateEdgeActor(1);
runtime->WrapInActorContext(sender, [&] {
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvPut(id, data, TInstant::Max()));
});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(sender);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
}

auto checkReadable = [&](NKikimrProto::EReplyStatus status) {
TActorId sender = runtime->AllocateEdgeActor(1);
runtime->WrapInActorContext(sender, [&] {
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(),
NKikimrBlobStorage::EGetHandleClass::FastRead));
});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->ResponseSz, 1);
auto& r = res->Get()->Responses[0];
UNIT_ASSERT_VALUES_EQUAL(r.Status, status);
if (status == NKikimrProto::OK) {
UNIT_ASSERT_VALUES_EQUAL(r.Buffer.ConvertToString(), data);
for (ui32 step = 1; step < 100; ++step) {
TLogoBlobID id(1, 1, step, 0, data.size(), 0);

{ // write data to group
TActorId sender = runtime->AllocateEdgeActor(1);
runtime->WrapInActorContext(sender, [&] {
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvPut(id, data, TInstant::Max()));
});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(sender);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
}
};

checkReadable(NKikimrProto::OK);
auto checkReadable = [&] {
TActorId sender = runtime->AllocateEdgeActor(1);
runtime->WrapInActorContext(sender, [&] {
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(),
NKikimrBlobStorage::EGetHandleClass::FastRead));
});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->ResponseSz, 1);
auto& r = res->Get()->Responses[0];
UNIT_ASSERT_VALUES_EQUAL(r.Status, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(r.Buffer.ConvertToString(), data);

ui32 partsMask = 0;
for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
const TVDiskID& vdiskId = info->GetVDiskId(i);
env.WithQueueId(vdiskId, NKikimrBlobStorage::EVDiskQueueId::GetFastRead, [&](TActorId queueId) {
const TActorId sender = runtime->AllocateEdgeActor(1);
auto ev = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(vdiskId, TInstant::Max(),
NKikimrBlobStorage::EGetHandleClass::FastRead);
ev->AddExtremeQuery(id, 0, 0);
runtime->Send(new IEventHandle(queueId, sender, ev.release()), sender.NodeId());
auto reply = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVGetResult>(sender);
auto& record = reply->Get()->Record;
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(record.ResultSize(), 1);
for (const auto& result : record.GetResult()) {
if (result.GetStatus() == NKikimrProto::OK) {
const TLogoBlobID& id = LogoBlobIDFromLogoBlobID(result.GetBlobID());
UNIT_ASSERT(id.PartId());
const ui32 partIdx = id.PartId() - 1;
const ui32 mask = 1 << partIdx;
UNIT_ASSERT(!(partsMask & mask));
partsMask |= mask;
} else {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NKikimrProto::NODATA);
}
}
});
}
UNIT_ASSERT_VALUES_EQUAL(partsMask, (1 << info->Type.TotalPartCount()) - 1);
};

for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
const TActorId vdiskActorId = info->GetActorId(i);
checkReadable();

ui32 nodeId, pdiskId;
std::tie(nodeId, pdiskId, std::ignore) = DecomposeVDiskServiceId(vdiskActorId);
auto it = env.PDiskMockStates.find(std::make_pair(nodeId, pdiskId));
Y_ABORT_UNLESS(it != env.PDiskMockStates.end());
ui32 mask = 0;

const TActorId sender = runtime->AllocateEdgeActor(vdiskActorId.NodeId());
env.Runtime->Send(new IEventHandle(vdiskActorId, sender, new TEvBlobStorage::TEvCaptureVDiskLayout), sender.NodeId());
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvCaptureVDiskLayoutResult>(sender);
for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
const TActorId vdiskActorId = info->GetActorId(i);

for (auto& item : res->Get()->Layout) {
using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult;
if (item.Database == T::EDatabase::LogoBlobs && item.RecordType == T::ERecordType::HugeBlob) {
const TDiskPart& part = item.Location;
it->second->SetCorruptedArea(part.ChunkIdx, part.Offset, part.Offset + part.Size, true);
break;
ui32 nodeId, pdiskId;
std::tie(nodeId, pdiskId, std::ignore) = DecomposeVDiskServiceId(vdiskActorId);
auto it = env.PDiskMockStates.find(std::make_pair(nodeId, pdiskId));
Y_ABORT_UNLESS(it != env.PDiskMockStates.end());

const TActorId sender = runtime->AllocateEdgeActor(vdiskActorId.NodeId());
env.Runtime->Send(new IEventHandle(vdiskActorId, sender, new TEvBlobStorage::TEvCaptureVDiskLayout), sender.NodeId());
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvCaptureVDiskLayoutResult>(sender);

for (auto& item : res->Get()->Layout) {
using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult;
if (item.Database == T::EDatabase::LogoBlobs && item.RecordType == T::ERecordType::HugeBlob && item.BlobId.FullID() == id) {
const TDiskPart& part = item.Location;
mask |= 1 << i;
it->second->SetCorruptedArea(part.ChunkIdx, part.Offset, part.Offset + 1 + RandomNumber(part.Size), true);
break;
}
}

checkReadable();
}

checkReadable(NKikimrProto::OK);
}
env.Sim(TDuration::Seconds(60));

env.Sim(TDuration::Seconds(60));
for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
if (~mask >> i & 1) {
continue;
}

const TActorId vdiskActorId = info->GetActorId(i);

ui32 nodeId, pdiskId;
std::tie(nodeId, pdiskId, std::ignore) = DecomposeVDiskServiceId(vdiskActorId);
auto it = env.PDiskMockStates.find(std::make_pair(nodeId, pdiskId));
Y_ABORT_UNLESS(it != env.PDiskMockStates.end());

const TActorId sender = runtime->AllocateEdgeActor(vdiskActorId.NodeId());
env.Runtime->Send(new IEventHandle(vdiskActorId, sender, new TEvBlobStorage::TEvCaptureVDiskLayout), sender.NodeId());
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvCaptureVDiskLayoutResult>(sender);

bool anyPartReadable = false;

for (auto& item : res->Get()->Layout) {
using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult;
if (item.Database == T::EDatabase::LogoBlobs && item.RecordType == T::ERecordType::HugeBlob && item.BlobId.FullID() == id) {
const TDiskPart& part = item.Location;
anyPartReadable = !it->second->HasCorruptedArea(part.ChunkIdx, part.Offset, part.Offset + part.Size);
if (anyPartReadable) {
break;
}
}
}

UNIT_ASSERT(anyPartReadable);
}
}
}

Y_UNIT_TEST_SUITE(ScrubFast) {
Y_UNIT_TEST(SingleBlob) {
Test();
}
}

13 changes: 11 additions & 2 deletions ydb/core/blobstorage/vdisk/common/vdisk_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,12 @@ namespace NKikimr {
: public TEventLocal<TEvVDiskRequestCompleted, TEvBlobStorage::EvVDiskRequestCompleted> {
TVMsgContext Ctx;
std::unique_ptr<IEventHandle> Event;
bool DoNotResend;

TEvVDiskRequestCompleted(const TVMsgContext &ctx, std::unique_ptr<IEventHandle> event)
TEvVDiskRequestCompleted(const TVMsgContext &ctx, std::unique_ptr<IEventHandle> event, bool doNotResend = false)
: Ctx(ctx)
, Event(std::move(event))
, DoNotResend(doNotResend)
{
Y_DEBUG_ABORT_UNLESS(Ctx.ExtQueueId != NKikimrBlobStorage::EVDiskQueueId::Unknown);
Y_DEBUG_ABORT_UNLESS(Ctx.IntQueueId != NKikimrBlobStorage::EVDiskInternalQueueId::IntUnknown);
Expand Down Expand Up @@ -468,6 +470,9 @@ namespace NKikimr {
TActorIDPtr SkeletonFrontIDPtr;
THPTimer ExecutionTimer;

protected:
bool DoNotResendFromSkeletonFront = false;

public:
TEvVResultBaseWithQoSPB() = default;

Expand Down Expand Up @@ -526,7 +531,7 @@ namespace NKikimr {
byteSize, this->ToString().data());

if (SkeletonFrontIDPtr && MsgCtx.IntQueueId != NKikimrBlobStorage::IntUnknown) {
ctx.Send(*SkeletonFrontIDPtr, new TEvVDiskRequestCompleted(MsgCtx, std::move(ev)));
ctx.Send(*SkeletonFrontIDPtr, new TEvVDiskRequestCompleted(MsgCtx, std::move(ev), DoNotResendFromSkeletonFront));
} else {
TActivationContext::Send(ev.release());
}
Expand Down Expand Up @@ -2182,6 +2187,10 @@ namespace NKikimr {
Record.SetApproximateFreeSpaceShare(approximateFreeSpaceShare);
}

void SetForceEndResponse() {
DoNotResendFromSkeletonFront = true;
}

void MakeError(NKikimrProto::EReplyStatus status, const TString &errorReason,
const NKikimrBlobStorage::TEvVPatchDiff &request)
{
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/blobstorage/vdisk/scrub/blob_recovery_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ namespace NKikimr {

// a map to fill upon receiving VGet result
struct TPerBlobInfo {
const TInstant Deadline;
std::weak_ptr<TInFlightContext> Context;
TEvRecoverBlobResult::TItem *Item; // item to update
ui32 BlobReplyCounter = 0; // number of unreplied queries for this blob
};
std::unordered_multimap<TLogoBlobID, TPerBlobInfo, THash<TLogoBlobID>> VGetResultMap;
std::set<std::tuple<TVDiskIdShort, TLogoBlobID>> GetsInFlight;

void AddBlobQuery(const TLogoBlobID& id, NMatrix::TVectorType needed, const std::shared_ptr<TInFlightContext>& context, TEvRecoverBlobResult::TItem *item);
void AddExtremeQuery(const TVDiskID& vdiskId, const TLogoBlobID& id, TInstant deadline, ui32 worstReplySize);
void AddExtremeQuery(const TVDiskID& vdiskId, const TLogoBlobID& id, TInstant deadline, ui32 idxInSubgroup);
void SendPendingQueries();
void Handle(TEvBlobStorage::TEvVGetResult::TPtr ev);
NKikimrProto::EReplyStatus ProcessItemData(TEvRecoverBlobResult::TItem& item);
Expand Down
Loading

0 comments on commit 0ac74bb

Please sign in to comment.