Skip to content

Commit

Permalink
Added Hot Archive BucketListDB tests and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
SirTyson committed Aug 1, 2024
1 parent 2154b5a commit 76b6594
Show file tree
Hide file tree
Showing 11 changed files with 306 additions and 55 deletions.
1 change: 1 addition & 0 deletions src/bucket/BucketIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class BucketIndex : public NonMovableOrCopyable
// the largest buckets) and should only be called once. If pageSize == 0 or
// if file size is less than the cutoff, individual key index is used.
// Otherwise range index is used, with the range defined by pageSize.
template <class BucketEntryT>
static std::unique_ptr<BucketIndex const>
createIndex(BucketManager& bm, std::filesystem::path const& filename,
Hash const& hash);
Expand Down
93 changes: 65 additions & 28 deletions src/bucket/BucketIndexImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "util/XDRStream.h"

#include "lib/bloom_filter.hpp"
#include "xdr/Stellar-ledger.h"
#include <Tracy.hpp>
#include <cereal/archives/binary.hpp>
#include <cereal/types/memory.hpp>
Expand All @@ -23,6 +24,7 @@
#include <fmt/format.h>

#include <thread>
#include <type_traits>

namespace stellar
{
Expand Down Expand Up @@ -63,13 +65,18 @@ BucketIndex::typeNotSupported(LedgerEntryType t)
}

template <class IndexT>
template <class BucketEntryT>
BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager& bm,
std::filesystem::path const& filename,
std::streamoff pageSize,
Hash const& hash)
Hash const& hash,
BucketEntryT const& typeTag)
: mBloomMissMeter(bm.getBloomMissMeter())
, mBloomLookupMeter(bm.getBloomLookupMeter())
{
static_assert(std::is_same_v<BucketEntryT, BucketEntry> ||
std::is_same_v<BucketEntryT, HotArchiveBucketEntry>);

ZoneScoped;
releaseAssert(!filename.empty());

Expand Down Expand Up @@ -123,7 +130,7 @@ BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager& bm,
in.open(filename.string());
std::streamoff pos = 0;
std::streamoff pageUpperBound = 0;
BucketEntry be;
BucketEntryT be;
size_t iter = 0;
size_t count = 0;
while (in && in.readOne(be))
Expand All @@ -140,35 +147,51 @@ BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager& bm,
}
}

if (be.type() != METAENTRY)
auto isMeta = [](auto const& be) {
if constexpr (std::is_same<BucketEntryT, BucketEntry>::value)
{
return be.type() == METAENTRY;
}
else
{
return be.type() == HOT_ARCHIVE_METAENTRY;
}
};

if (!isMeta(be))
{
++count;
LedgerKey key = getBucketLedgerKey(be);

// We need an asset to poolID mapping for
// loadPoolshareTrustlineByAccountAndAsset queries. For this
// query, we only need to index INIT entries because:
// 1. PoolID is the hash of the Assets it refers to, so this
// index cannot be invalidated by newer LIVEENTRY updates
// 2. We do a join over all bucket indexes so we avoid storing
// multiple redundant index entries (i.e. LIVEENTRY updates)
// 3. We only use this index to collect the possible set of
// Trustline keys, then we load those keys. This means that
// we don't need to keep track of DEADENTRY. Even if a given
// INITENTRY has been deleted by a newer DEADENTRY, the
// trustline load will not return deleted trustlines, so the
// load result is still correct even if the index has a few
// deleted mappings.
if (be.type() == INITENTRY && key.type() == LIQUIDITY_POOL)
if constexpr (std::is_same_v<BucketEntryT, BucketEntry>)
{
auto const& poolParams = be.liveEntry()
.data.liquidityPool()
.body.constantProduct()
.params;
mData.assetToPoolID[poolParams.assetA].emplace_back(
key.liquidityPool().liquidityPoolID);
mData.assetToPoolID[poolParams.assetB].emplace_back(
key.liquidityPool().liquidityPoolID);
// We need an asset to poolID mapping for
// loadPoolshareTrustlineByAccountAndAsset queries. For this
// query, we only need to index INIT entries because:
// 1. PoolID is the hash of the Assets it refers to, so this
// index cannot be invalidated by newer LIVEENTRY updates
// 2. We do a join over all bucket indexes so we avoid
// storing
// multiple redundant index entries (i.e. LIVEENTRY
// updates)
// 3. We only use this index to collect the possible set of
// Trustline keys, then we load those keys. This means
// that we don't need to keep track of DEADENTRY. Even if
// a given INITENTRY has been deleted by a newer
// DEADENTRY, the trustline load will not return deleted
// trustlines, so the load result is still correct even
// if the index has a few deleted mappings.
if (be.type() == INITENTRY && key.type() == LIQUIDITY_POOL)
{
auto const& poolParams = be.liveEntry()
.data.liquidityPool()
.body.constantProduct()
.params;
mData.assetToPoolID[poolParams.assetA].emplace_back(
key.liquidityPool().liquidityPoolID);
mData.assetToPoolID[poolParams.assetB].emplace_back(
key.liquidityPool().liquidityPoolID);
}
}

if constexpr (std::is_same<IndexT, RangeIndex>::value)
Expand Down Expand Up @@ -329,11 +352,15 @@ upper_bound_pred(LedgerKey const& key, IndexEntryT const& indexEntry)
}
}

template <class BucketEntryT>
std::unique_ptr<BucketIndex const>
BucketIndex::createIndex(BucketManager& bm,
std::filesystem::path const& filename,
Hash const& hash)
{
static_assert(std::is_same_v<BucketEntryT, BucketEntry> ||
std::is_same_v<BucketEntryT, HotArchiveBucketEntry>);

ZoneScoped;
auto const& cfg = bm.getConfig();
releaseAssertOrThrow(cfg.isUsingBucketListDB());
Expand All @@ -349,7 +376,8 @@ BucketIndex::createIndex(BucketManager& bm,
"bucket {}",
filename);
return std::unique_ptr<BucketIndexImpl<IndividualIndex> const>(
new BucketIndexImpl<IndividualIndex>(bm, filename, 0, hash));
new BucketIndexImpl<IndividualIndex>(bm, filename, 0, hash,
BucketEntryT{}));
}
else
{
Expand All @@ -359,7 +387,8 @@ BucketIndex::createIndex(BucketManager& bm,
"{} in bucket {}",
pageSize, filename);
return std::unique_ptr<BucketIndexImpl<RangeIndex> const>(
new BucketIndexImpl<RangeIndex>(bm, filename, pageSize, hash));
new BucketIndexImpl<RangeIndex>(bm, filename, pageSize, hash,
BucketEntryT{}));
}
}
// BucketIndexImpl throws if BucketManager shuts down before index finishes,
Expand Down Expand Up @@ -596,4 +625,12 @@ BucketIndexImpl<BucketIndex::RangeIndex>::markBloomLookup() const
{
mBloomLookupMeter.Mark();
}

template std::unique_ptr<BucketIndex const>
BucketIndex::createIndex<BucketEntry>(BucketManager& bm,
std::filesystem::path const& filename,
Hash const& hash);
template std::unique_ptr<BucketIndex const>
BucketIndex::createIndex<HotArchiveBucketEntry>(
BucketManager& bm, std::filesystem::path const& filename, Hash const& hash);
}
7 changes: 6 additions & 1 deletion src/bucket/BucketIndexImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,13 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
medida::Meter& mBloomMissMeter;
medida::Meter& mBloomLookupMeter;

// Templated constructors are valid C++, but since this is a templated class
// already, there's no way for the compiler to deduce the type without a
// templated parameter, hence the tag
template <class BucketEntryT>
BucketIndexImpl(BucketManager& bm, std::filesystem::path const& filename,
std::streamoff pageSize, Hash const& hash);
std::streamoff pageSize, Hash const& hash,
BucketEntryT const& typeTag);

template <class Archive>
BucketIndexImpl(BucketManager const& bm, Archive& ar,
Expand Down
5 changes: 3 additions & 2 deletions src/bucket/BucketManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,9 @@ class BucketManager : NonMovableOrCopyable
virtual void
addHotArchiveBatch(Application& app, uint32_t currLedger,
uint32_t currLedgerProtocol,
std::vector<LedgerEntry> const& initEntries,
std::vector<LedgerKey> const& deadEntries) = 0;
std::vector<LedgerEntry> const& archivedEntries,
std::vector<LedgerKey> const& restoredEntries,
std::vector<LedgerKey> const& deletedEntries) = 0;

// Update the given LedgerHeader's bucketListHash to reflect the current
// state of the bucket list.
Expand Down
13 changes: 8 additions & 5 deletions src/bucket/BucketManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1017,8 +1017,9 @@ BucketManagerImpl::addLiveBatch(Application& app, uint32_t currLedger,
void
BucketManagerImpl::addHotArchiveBatch(
Application& app, uint32_t currLedger, uint32_t currLedgerProtocol,
std::vector<LedgerEntry> const& initEntries,
std::vector<LedgerKey> const& deadEntries)
std::vector<LedgerEntry> const& archivedEntries,
std::vector<LedgerKey> const& restoredEntries,
std::vector<LedgerKey> const& deletedEntries)
{
ZoneScoped;
releaseAssertOrThrow(app.getConfig().MODE_ENABLES_BUCKETLIST);
Expand All @@ -1031,13 +1032,15 @@ BucketManagerImpl::addHotArchiveBatch(
}
#endif
auto timer = mBucketAddArchiveBatch.TimeScope();
mBucketArchiveObjectInsertBatch.Mark(initEntries.size() +
deadEntries.size());
mBucketArchiveObjectInsertBatch.Mark(archivedEntries.size() +
restoredEntries.size() +
deletedEntries.size());

// Hot archive should never modify an existing entry, so there are never
// live entries
mHotArchiveBucketList->addBatch(app, currLedger, currLedgerProtocol,
initEntries, {}, deadEntries);
archivedEntries, restoredEntries,
deletedEntries);
mArchiveBucketListSizeCounter.set_count(mHotArchiveBucketList->getSize());

if (app.getConfig().isUsingBucketListDB())
Expand Down
10 changes: 6 additions & 4 deletions src/bucket/BucketManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,12 @@ class BucketManagerImpl : public BucketManager
std::vector<LedgerEntry> const& initEntries,
std::vector<LedgerEntry> const& liveEntries,
std::vector<LedgerKey> const& deadEntries) override;
void addHotArchiveBatch(Application& app, uint32_t currLedger,
uint32_t currLedgerProtocol,
std::vector<LedgerEntry> const& initEntries,
std::vector<LedgerKey> const& deadEntries) override;
void
addHotArchiveBatch(Application& app, uint32_t currLedger,
uint32_t currLedgerProtocol,
std::vector<LedgerEntry> const& archivedEntries,
std::vector<LedgerKey> const& restoredEntries,
std::vector<LedgerKey> const& deletedEntries) override;
void snapshotLedger(LedgerHeader& currentHeader) override;
void maybeSetIndex(std::shared_ptr<Bucket> b,
std::unique_ptr<BucketIndex const>&& index) override;
Expand Down
3 changes: 2 additions & 1 deletion src/bucket/BucketOutputIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ BucketOutputIterator<BucketT>::getBucket(BucketManager& bucketManager,
if (auto b = bucketManager.getBucketIfExists(hash);
!b || !b->isIndexed())
{
index = BucketIndex::createIndex(bucketManager, mFilename, hash);
index = BucketIndex::createIndex<BucketEntryT>(bucketManager,
mFilename, hash);
}
}

Expand Down
19 changes: 10 additions & 9 deletions src/bucket/BucketSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,15 @@ BucketSnapshotBase<BucketT>::loadKeys(
{
auto [entryOp, bloomMiss] = getEntryAtOffset(
*currKeyIt, *offOp, mBucket->getIndex().getPageSize());

if (entryOp)
{
// Only live bucket loads can be metered
if constexpr (std::is_same_v<BucketT, LiveBucket>)
// Don't return tombstone entries, as these do not exist wrt
// ledger state
if (!BucketT::isTombstoneEntry(*entryOp))
{
// Don't meter tombstone entries, as these do not exist wrt
// ledger state
if (!LiveBucket::isTombstoneEntry(*entryOp))
// Only live bucket loads can be metered
if constexpr (std::is_same_v<BucketT, LiveBucket>)
{
bool addEntry = true;
if (lkMeter)
Expand All @@ -165,10 +166,10 @@ BucketSnapshotBase<BucketT>::loadKeys(
result.push_back(entryOp->liveEntry());
}
}
}
else
{
result.push_back(*entryOp);
else
{
result.push_back(*entryOp);
}
}

currKeyIt = keys.erase(currKeyIt);
Expand Down
5 changes: 3 additions & 2 deletions src/bucket/BucketSnapshotManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ class BucketSnapshotManager : NonMovableOrCopyable

friend void BucketManagerImpl::addHotArchiveBatch(
Application& app, uint32_t currLedger, uint32_t currLedgerProtocol,
std::vector<LedgerEntry> const& initEntries,
std::vector<LedgerKey> const& deadEntries);
std::vector<LedgerEntry> const& archivedEntries,
std::vector<LedgerKey> const& restoredEntries,
std::vector<LedgerKey> const& deletedEntries);
friend void
BucketManagerImpl::addLiveBatch(Application& app, uint32_t currLedger,
uint32_t currLedgerProtocol,
Expand Down
Loading

0 comments on commit 76b6594

Please sign in to comment.