Skip to content

Commit

Permalink
split table and portion cleanups in different activities (#2737)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Mar 14, 2024
1 parent b6f128a commit 6b2e97b
Show file tree
Hide file tree
Showing 18 changed files with 298 additions and 102 deletions.
31 changes: 22 additions & 9 deletions ydb/core/tx/columnshard/background_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class TBackgroundController {
using TCurrentCompaction = THashMap<ui64, NOlap::TPlanCompactionInfo>;
TCurrentCompaction ActiveCompactionInfo;

bool ActiveCleanup = false;
bool ActiveCleanupPortions = false;
bool ActiveCleanupTables = false;
YDB_READONLY(TMonotonic, LastIndexationInstant, TMonotonic::Zero());
public:
THashSet<NOlap::TPortionAddress> GetConflictTTLPortions() const;
Expand All @@ -78,16 +79,28 @@ class TBackgroundController {
return ActiveIndexationTasks.size();
}

void StartCleanup() {
Y_ABORT_UNLESS(!ActiveCleanup);
ActiveCleanup = true;
void StartCleanupPortions() {
Y_ABORT_UNLESS(!ActiveCleanupPortions);
ActiveCleanupPortions = true;
}
void FinishCleanup() {
Y_ABORT_UNLESS(ActiveCleanup);
ActiveCleanup = false;
void FinishCleanupPortions() {
Y_ABORT_UNLESS(ActiveCleanupPortions);
ActiveCleanupPortions = false;
}
bool IsCleanupActive() const {
return ActiveCleanup;
bool IsCleanupPortionsActive() const {
return ActiveCleanupPortions;
}

void StartCleanupTables() {
Y_ABORT_UNLESS(!ActiveCleanupTables);
ActiveCleanupTables = true;
}
void FinishCleanupTables() {
Y_ABORT_UNLESS(ActiveCleanupTables);
ActiveCleanupTables = false;
}
bool IsCleanupTablesActive() const {
return ActiveCleanupTables;
}
};

Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/common/const.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "const.h"

namespace NKikimr::NOlap::NBlobOperations {

}
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/blobs_action/common/const.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include <util/generic/string.h>

namespace NKikimr::NOlap::NBlobOperations {

Expand Down
58 changes: 36 additions & 22 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
#include "common/tablet_id.h"
#include "blobs_reader/task.h"
#include "blobs_reader/events.h"
#include "engines/changes/ttl.h"
#include "engines/changes/cleanup.h"
#include "blobs_action/bs/storage.h"
#include "resource_subscriber/task.h"

Expand All @@ -23,6 +21,11 @@
#include "data_sharing/source/session/source.h"
#include "data_sharing/common/transactions/tx_extension.h"

#include "engines/changes/indexation.h"
#include "engines/changes/cleanup_portions.h"
#include "engines/changes/cleanup_tables.h"
#include "engines/changes/ttl.h"

#include "resource_subscriber/counters.h"

#include "hooks/abstract/abstract.h"
Expand Down Expand Up @@ -522,24 +525,12 @@ void TColumnShard::EnqueueBackgroundActivities(bool periodic, TBackgroundActivit
// !!!!!! MUST BE FIRST THROUGH DATA HAVE TO BE SAME IN SESSIONS AFTER TABLET RESTART
SharingSessionsManager->Start(*this);

if (activity.HasIndexation()) {
SetupIndexation();
}

if (activity.HasCompaction()) {
SetupCompaction();
}

if (activity.HasCleanup()) {
SetupCleanup();
}

if (activity.HasTtl()) {
SetupTtl();
}

SetupIndexation();
SetupCompaction();
SetupCleanupPortions();
SetupCleanupTables();
SetupTtl();
SetupGC();

SetupCleanupInsertTable();
}

Expand Down Expand Up @@ -764,16 +755,39 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls) {
return true;
}

void TColumnShard::SetupCleanup() {
void TColumnShard::SetupCleanupPortions() {
CSCounters.OnSetupCleanup();
if (BackgroundController.IsCleanupActive()) {
if (BackgroundController.IsCleanupPortionsActive()) {
ACFL_DEBUG("background", "cleanup")("skip_reason", "in_progress");
return;
}

NOlap::TSnapshot cleanupSnapshot{GetMinReadStep(), 0};

auto changes = TablesManager.MutablePrimaryIndex().StartCleanup(cleanupSnapshot, TablesManager.MutablePathsToDrop(), DataLocksManager);
auto changes = TablesManager.MutablePrimaryIndex().StartCleanupPortions(cleanupSnapshot, TablesManager.GetPathsToDrop(), DataLocksManager);
if (!changes) {
ACFL_DEBUG("background", "cleanup")("skip_reason", "no_changes");
return;
}

ACFL_DEBUG("background", "cleanup")("changes_info", changes->DebugString());
auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(actualIndexInfo, changes, false);
ev->SetPutStatus(NKikimrProto::OK); // No new blobs to write

changes->Start(*this);

Send(SelfId(), ev.release());
}

void TColumnShard::SetupCleanupTables() {
CSCounters.OnSetupCleanup();
if (BackgroundController.IsCleanupTablesActive()) {
ACFL_DEBUG("background", "cleanup")("skip_reason", "in_progress");
return;
}

auto changes = TablesManager.MutablePrimaryIndex().StartCleanupTables(TablesManager.MutablePathsToDrop());
if (!changes) {
ACFL_DEBUG("background", "cleanup")("skip_reason", "no_changes");
return;
Expand Down
9 changes: 6 additions & 3 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
#include <ydb/services/metadata/service.h>

namespace NKikimr::NOlap {
class TCleanupColumnEngineChanges;
class TCleanupPortionsColumnEngineChanges;
class TCleanupTablesColumnEngineChanges;
class TTTLColumnEngineChanges;
class TChangesWithAppend;
class TCompactColumnEngineChanges;
Expand Down Expand Up @@ -142,7 +143,8 @@ class TColumnShard
friend class TTxMonitoring;
friend class TTxRemoveSharedBlobs;

friend class NOlap::TCleanupColumnEngineChanges;
friend class NOlap::TCleanupPortionsColumnEngineChanges;
friend class NOlap::TCleanupTablesColumnEngineChanges;
friend class NOlap::TTTLColumnEngineChanges;
friend class NOlap::TChangesWithAppend;
friend class NOlap::TCompactColumnEngineChanges;
Expand Down Expand Up @@ -543,7 +545,8 @@ class TColumnShard
void SetupIndexation();
void SetupCompaction();
bool SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls = {});
void SetupCleanup();
void SetupCleanupPortions();
void SetupCleanupTables();
void SetupCleanupInsertTable();
void SetupGC();

Expand Down
40 changes: 34 additions & 6 deletions ydb/core/tx/columnshard/data_locks/locks/list.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class TListPortionsLock: public ILock {
private:
using TBase = ILock;
THashSet<TPortionAddress> Portions;
THashSet<TTabletId> Granules;
THashSet<ui64> Granules;
protected:
virtual std::optional<TString> DoIsLocked(const TPortionInfo& portion) const override {
if (Portions.contains(portion.GetAddress())) {
Expand All @@ -18,7 +18,7 @@ class TListPortionsLock: public ILock {
return {};
}
virtual std::optional<TString> DoIsLocked(const TGranuleMeta& granule) const override {
if (Granules.contains((TTabletId)granule.GetPathId())) {
if (Granules.contains(granule.GetPathId())) {
return GetLockName();
}
return {};
Expand All @@ -32,15 +32,15 @@ class TListPortionsLock: public ILock {
{
for (auto&& p : portions) {
Portions.emplace(p->GetAddress());
Granules.emplace((TTabletId)p->GetPathId());
Granules.emplace(p->GetPathId());
}
}

TListPortionsLock(const TString& lockName, const std::vector<TPortionInfo>& portions, const bool readOnly = false)
: TBase(lockName, readOnly) {
for (auto&& p : portions) {
Portions.emplace(p.GetAddress());
Granules.emplace((TTabletId)p.GetPathId());
Granules.emplace(p.GetPathId());
}
}

Expand All @@ -50,7 +50,7 @@ class TListPortionsLock: public ILock {
for (auto&& p : portions) {
const auto address = g(p);
Portions.emplace(address);
Granules.emplace((TTabletId)address.GetPathId());
Granules.emplace(address.GetPathId());
}
}

Expand All @@ -60,9 +60,37 @@ class TListPortionsLock: public ILock {
for (auto&& p : portions) {
const auto address = p.first;
Portions.emplace(address);
Granules.emplace((TTabletId)address.GetPathId());
Granules.emplace(address.GetPathId());
}
}
};

class TListTablesLock: public ILock {
private:
using TBase = ILock;
THashSet<ui64> Tables;
protected:
virtual std::optional<TString> DoIsLocked(const TPortionInfo& portion) const override {
if (Tables.contains(portion.GetPathId())) {
return GetLockName();
}
return {};
}
virtual std::optional<TString> DoIsLocked(const TGranuleMeta& granule) const override {
if (Tables.contains(granule.GetPathId())) {
return GetLockName();
}
return {};
}
bool DoIsEmpty() const override {
return Tables.empty();
}
public:
TListTablesLock(const TString& lockName, const THashSet<ui64>& tables, const bool readOnly = false)
: TBase(lockName, readOnly)
, Tables(tables)
{
}
};

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#include "cleanup.h"
#include "cleanup_portions.h"
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>

namespace NKikimr::NOlap {

void TCleanupColumnEngineChanges::DoDebugString(TStringOutput& out) const {
void TCleanupPortionsColumnEngineChanges::DoDebugString(TStringOutput& out) const {
if (ui32 dropped = PortionsToDrop.size()) {
out << "drop " << dropped << " portions";
for (auto& portionInfo : PortionsToDrop) {
Expand All @@ -15,7 +15,7 @@ void TCleanupColumnEngineChanges::DoDebugString(TStringOutput& out) const {
}
}

void TCleanupColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) {
void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) {
THashSet<ui64> pathIds;
if (self) {
THashMap<TString, THashSet<TUnifiedBlobId>> blobIdsByStorage;
Expand All @@ -31,22 +31,15 @@ void TCleanupColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TColumnSha
action->DeclareRemove((TTabletId)self->TabletID(), b);
}
}

if (context.DB) {
for (auto&& p : pathIds) {
self->TablesManager.TryFinalizeDropPath(*context.DB, p);
}
}
}
}

void TCleanupColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) {
void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) {
for (auto& portionInfo : PortionsToDrop) {
if (!context.EngineLogs.ErasePortion(portionInfo)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Cannot erase portion")("portion", portionInfo.DebugString());
}
}
context.TriggerActivity = NeedRepeat ? NColumnShard::TBackgroundActivity::Cleanup() : NColumnShard::TBackgroundActivity::None();
if (self) {
self->IncCounter(NColumnShard::COUNTER_PORTIONS_ERASED, PortionsToDrop.size());
for (auto&& p : PortionsToDrop) {
Expand All @@ -55,15 +48,15 @@ void TCleanupColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnSh
}
}

void TCleanupColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
self.BackgroundController.StartCleanup();
void TCleanupPortionsColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
self.BackgroundController.StartCleanupPortions();
}

void TCleanupColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& /*context*/) {
self.BackgroundController.FinishCleanup();
void TCleanupPortionsColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& /*context*/) {
self.BackgroundController.FinishCleanupPortions();
}

NColumnShard::ECumulativeCounters TCleanupColumnEngineChanges::GetCounterIndex(const bool isSuccess) const {
NColumnShard::ECumulativeCounters TCleanupPortionsColumnEngineChanges::GetCounterIndex(const bool isSuccess) const {
return isSuccess ? NColumnShard::COUNTER_CLEANUP_SUCCESS : NColumnShard::COUNTER_CLEANUP_FAIL;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace NKikimr::NOlap {

class TCleanupColumnEngineChanges: public TColumnEngineChanges {
class TCleanupPortionsColumnEngineChanges: public TColumnEngineChanges {
private:
using TBase = TColumnEngineChanges;
THashMap<TString, THashSet<NOlap::TEvictedBlob>> BlobsToForget;
Expand Down Expand Up @@ -32,13 +32,12 @@ class TCleanupColumnEngineChanges: public TColumnEngineChanges {
}

public:
TCleanupColumnEngineChanges(const std::shared_ptr<IStoragesManager>& storagesManager)
TCleanupPortionsColumnEngineChanges(const std::shared_ptr<IStoragesManager>& storagesManager)
: TBase(storagesManager, StaticTypeName()) {

}

std::vector<TPortionInfo> PortionsToDrop;
bool NeedRepeat = false;

virtual ui32 GetWritePortionsCount() const override {
return 0;
Expand All @@ -47,11 +46,11 @@ class TCleanupColumnEngineChanges: public TColumnEngineChanges {
return nullptr;
}
virtual bool NeedWritePortion(const ui32 /*index*/) const override {
return true;
return false;
}

static TString StaticTypeName() {
return "CS::CLEANUP";
return "CS::CLEANUP::PORTIONS";
}

virtual TString TypeString() const override {
Expand Down
Loading

0 comments on commit 6b2e97b

Please sign in to comment.