Skip to content

Commit

Permalink
Iterate over deleted rows in read iterators only once KIKIMR-19276
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury committed Sep 16, 2023
1 parent 5e9a163 commit ebcd04f
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 71 deletions.
60 changes: 52 additions & 8 deletions ydb/core/tablet_flat/flat_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,12 @@ class TTableItBase : TNonCopyable {

if (!TOps::EndKey(NextEntry)) {
// We know that everything to +inf is erased
It->Iterators.clear();
It->Active = It->Iterators.end();
It->Clear();
It->Ready = EReady::Gone;
return;
}

if (!It->SkipTo(TOps::EndKey(NextEntry), !TOps::EndInclusive(NextEntry))) {
if (!It->SkipErase(TOps::EndKey(NextEntry), TOps::EndInclusive(NextEntry))) {
// We've got some missing page, cannot iterate further
return;
}
Expand Down Expand Up @@ -149,7 +148,7 @@ class TTableItBase : TNonCopyable {
}

Cache.Touch(res.first);
if (!It->SkipTo(TOps::EndKey(res.first), !TOps::EndInclusive(res.first))) {
if (!It->SkipErase(TOps::EndKey(res.first), TOps::EndInclusive(res.first))) {
// We've got some missing page, cannot iterate further
return;
}
Expand Down Expand Up @@ -229,6 +228,20 @@ class TTableItBase : TNonCopyable {
bool FutureEntryValid = false;
};

bool SkipErase(TArrayRef<const TCell> endKey, bool inclusive = true) noexcept {
if (inclusive) {
// Pretend we saw endKey last, but the pointer correctness is very
// subtle. We only seek to the erased range end when we don't have
// a new cached range, so we would either reposition to a new key,
// or there would be a page fault after which this iterator is
// unusable, and Flush will not disturb erase cache records on the
// way out.
LastKey.assign(endKey.begin(), endKey.end());
LastKeyPage = {};
}
return SkipTo(endKey, !inclusive);
}

public:
TTableItBase(
const TRowScheme* scheme, TTagsRef tags, ui64 lim = Max<ui64>(),
Expand Down Expand Up @@ -328,6 +341,8 @@ class TTableItBase : TNonCopyable {
ui64 Limit = 0;

TRowState State;
TVector<TCell> LastKey;
TSharedData LastKeyPage;

// RowVersion of a persistent snapshot that we are reading
// By default iterator is initialized with the HEAD snapshot
Expand Down Expand Up @@ -392,6 +407,12 @@ class TTableItBase : TNonCopyable {
Iterators.clear();
Active = Iterators.end();
Inactive = Active;
ClearKey();
}

void ClearKey() {
LastKey.clear();
LastKeyPage = {};
}

// ITERATORS STORAGE
Expand Down Expand Up @@ -557,6 +578,7 @@ inline EReady TTableItBase<TIteratorOps>::Start() noexcept
Iterators.front().IteratorId.Type == EType::Stop ||
Limit == 0)
{
ClearKey();
return EReady::Gone;
}

Expand Down Expand Up @@ -588,6 +610,7 @@ inline EReady TTableItBase<TIteratorOps>::Turn() noexcept
{
if (!Limit) {
// Optimization: avoid calling Next after returning the last row
ClearKey();
return EReady::Gone;
}

Expand Down Expand Up @@ -854,10 +877,10 @@ inline EReady TTableItBase<TIteratorOps>::Apply() noexcept
{
State.Reset(Remap.CellDefaults());

const TDbTupleRef key = GetKey();
TArrayRef<const TCell> key = Iterators.back().Key;

for (auto &pin: Remap.KeyPins())
State.Set(pin.Pos, { ECellOp::Set, ELargeObj::Inline }, key.Columns[pin.Key]);
State.Set(pin.Pos, { ECellOp::Set, ELargeObj::Inline }, key[pin.Key]);

// We must have at least one active iterator
Y_VERIFY_DEBUG(Active != Inactive);
Expand Down Expand Up @@ -918,15 +941,36 @@ inline EReady TTableItBase<TIteratorOps>::Apply() noexcept
return EReady::Page;
}

LastKey.assign(key.begin(), key.end());

TIteratorId ai = Iterators.back().IteratorId;
switch (ai.Type) {
case EType::Mem: {
// We keep mem table snapshot in memory, no page reference needed
LastKeyPage = {};
break;
}
case EType::Run: {
auto& it = *RunIters[ai.Index];
const TSharedData& page = it.GetKeyPage();
if (LastKeyPage.data() != page.data()) {
LastKeyPage = page;
}
break;
}
default: {
Y_FAIL("Unexpected iterator type");
}
}

Stage = EStage::Done;
return EReady::Data;
}

template<class TIteratorOps>
inline TDbTupleRef TTableItBase<TIteratorOps>::GetKey() const noexcept
{
auto key = Iterators.back().Key;
return { Scheme->Keys->BasicTypes().data(), key.data(), static_cast<ui32>(key.size()) };
return { Scheme->Keys->BasicTypes().data(), LastKey.data(), static_cast<ui32>(LastKey.size()) };
}

template<class TIteratorOps>
Expand Down
36 changes: 22 additions & 14 deletions ydb/core/tablet_flat/flat_page_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,12 @@ namespace NPage {

NPage::TLabel Label() const noexcept
{
return ReadUnaligned<NPage::TLabel>(Raw.data());
return ReadUnaligned<NPage::TLabel>(Decoded.data());
}

explicit operator bool() const noexcept
{
return bool(Raw);
return bool(Decoded);
}

const TBlock* operator->() const noexcept
Expand All @@ -192,16 +192,16 @@ namespace NPage {

TRowId BaseRow() const noexcept
{
return Raw ? BaseRow_ : Max<TRowId>();
return BaseRow_;
}

TDataPage& Set(const TSharedData *raw = nullptr) noexcept
{
Page = { };

if (Raw = raw ? *raw : TSharedData{ }) {
const void* base = Raw.data();
auto data = NPage::TLabelWrapper().Read(Raw, EPage::DataPage);
if (raw) {
const void* base = raw->data();
auto data = NPage::TLabelWrapper().Read(*raw, EPage::DataPage);

Y_VERIFY(data.Version == 1, "Unknown EPage::DataPage version");

Expand All @@ -216,15 +216,17 @@ namespace NPage {
// We expect original page had the same label size as a compressed page
size_t labelSize = reinterpret_cast<const char*>(data.Page.data()) - reinterpret_cast<const char*>(base);

Decoded.Resize(labelSize + size);
Decoded = TSharedData::Uninitialized(labelSize + size);

size = Codec->Decompress(data.Page, Decoded.Begin() + labelSize);
size = Codec->Decompress(data.Page, Decoded.mutable_begin() + labelSize);

Decoded.Resize(labelSize + size);
::memset(Decoded.Begin(), 0, labelSize);
Decoded.TrimBack(labelSize + size);
::memcpy(Decoded.mutable_begin(), base, labelSize);

base = Decoded.Begin();
data.Page = { Decoded.Begin() + labelSize, Decoded.End() };
base = Decoded.begin();
data.Page = { Decoded.begin() + labelSize, Decoded.end() };
} else {
Decoded = *raw;
}

auto *recordsHeader = TDeref<TRecordsHeader>::At(data.Page.data(), 0);
Expand All @@ -235,11 +237,18 @@ namespace NPage {
auto offsetsOffset = data.Page.size() - count * sizeof(TPgSize);
Page.Offsets = TDeref<const TRecordsEntry>::At(recordsHeader, offsetsOffset);
Page.Count = count;
} else {
Decoded = {};
BaseRow_ = Max<TRowId>();
}

return *this;
}

const TSharedData& GetData() const noexcept {
return Decoded;
}

TIter LookupKey(TCells key, const TPartScheme::TGroupInfo &group,
ESeek seek, const TKeyCellDefaults *keyDefaults) const noexcept
{
Expand Down Expand Up @@ -321,8 +330,7 @@ namespace NPage {
private:
using ICodec = NBlockCodecs::ICodec;

TBuffer Decoded;
TSharedData Raw;
TSharedData Decoded;
TBlock Page;
TRowId BaseRow_ = Max<TRowId>();
const ICodec *Codec = nullptr;
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/tablet_flat/flat_part_iter_multi.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,10 @@ namespace NTable {
return RowId;
}

const TSharedData& GetPageData() const noexcept {
return Page.GetData();
}

private:
Y_FORCE_INLINE EReady Exhausted() noexcept
{
Expand Down Expand Up @@ -813,6 +817,11 @@ namespace NTable {
return TDbTupleRef(KeyCellDefaults->BasicTypes().begin(), Key.begin(), Key.size());
}

const TSharedData& GetKeyPage() const noexcept
{
return Main.GetPageData();
}

TCells GetRawKey() const noexcept
{
InitKey();
Expand Down Expand Up @@ -1595,6 +1604,12 @@ namespace NTable {
return CurrentIt->GetKey();
}

const TSharedData& GetKeyPage() const noexcept
{
Y_VERIFY_DEBUG(CurrentIt);
return CurrentIt->GetKeyPage();
}

void Apply(TRowState& row,
NTable::ITransactionMapSimplePtr committedTransactions,
NTable::ITransactionObserverSimplePtr transactionObserver) const noexcept
Expand Down
Loading

0 comments on commit ebcd04f

Please sign in to comment.