Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dont repeat requests for already applied modifications #4376

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void TBehaviourRegistrator::Handle(TEvTableDescriptionFailed::TPtr& ev) {

void TBehaviourRegistrator::Handle(TEvStartRegistration::TPtr& /*ev*/) {
NInitializer::TDSAccessorInitialized::Execute(ReqConfig,
Behaviour->GetTypeId(), Behaviour->GetInitializer(), InternalController, RegistrationData->GetInitializationSnapshot());
Behaviour->GetTypeId(), Behaviour->GetInitializer(), InternalController, RegistrationData->GetSnapshotOwner());
}

void TBehaviourRegistrator::Handle(NInitializer::TEvInitializationFinished::TPtr& ev) {
Expand Down
8 changes: 4 additions & 4 deletions ydb/services/metadata/ds_table/registration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,8 @@ void TRegistrationData::InitializationFinished(const TString& initId) {
}

void TRegistrationData::SetInitializationSnapshot(NFetcher::ISnapshot::TPtr s) {
const bool notInitializedBefore = !InitializationSnapshot;
InitializationSnapshot = dynamic_pointer_cast<NInitializer::TSnapshot>(s);
Y_ABORT_UNLESS(InitializationSnapshot);
const bool notInitializedBefore = !SnapshotOwner->HasInitializationSnapshot();
SnapshotOwner->SetInitializationSnapshot(s);
if (notInitializedBefore) {
EventsWaiting->TryResendOne();
}
Expand All @@ -91,11 +90,12 @@ void TRegistrationData::StartInitialization() {
}

TRegistrationData::TRegistrationData() {
SnapshotOwner = std::make_shared<TInitializationSnapshotOwner>();
InitializationFetcher = std::make_shared<NInitializer::TFetcher>();
}

void TRegistrationData::NoInitializationSnapshot() {
InitializationSnapshot = std::make_shared<NInitializer::TSnapshot>(TInstant::Zero());
SnapshotOwner->NoInitializationSnapshot();
EventsWaiting->TryResendOne();
}

Expand Down
33 changes: 32 additions & 1 deletion ydb/services/metadata/ds_table/registration.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,37 @@ class TEventsCollector {
void Initialized(const TString& initId);
};

class TRegistrationData;

class TInitializationSnapshotOwner {
private:
mutable TMutex Mutex;
std::shared_ptr<NInitializer::TSnapshot> InitializationSnapshot;
friend class TRegistrationData;
void SetInitializationSnapshot(NFetcher::ISnapshot::TPtr s) {
auto snapshot = dynamic_pointer_cast<NInitializer::TSnapshot>(s);
Y_ABORT_UNLESS(snapshot);
TGuard<TMutex> g(Mutex);
InitializationSnapshot = snapshot;
}

void NoInitializationSnapshot() {
TGuard<TMutex> g(Mutex);
InitializationSnapshot = std::make_shared<NInitializer::TSnapshot>(TInstant::Zero());
}
public:
bool HasInitializationSnapshot() const {
TGuard<TMutex> g(Mutex);
return !!InitializationSnapshot;
}
bool HasModification(const TString& componentId, const TString& modificationId) const {
NInitializer::TDBInitializationKey key(componentId, modificationId);
TGuard<TMutex> g(Mutex);
return InitializationSnapshot->GetObjects().contains(key);
}

};

class TRegistrationData {
public:
enum class EStage {
Expand All @@ -170,8 +201,8 @@ class TRegistrationData {
};
private:
YDB_READONLY(EStage, Stage, EStage::Created);
YDB_READONLY_DEF(std::shared_ptr<NInitializer::TSnapshot>, InitializationSnapshot);
YDB_READONLY_DEF(std::shared_ptr<NInitializer::TFetcher>, InitializationFetcher);
YDB_READONLY_DEF(std::shared_ptr<TInitializationSnapshotOwner>, SnapshotOwner);
public:
TRegistrationData();

Expand Down
2 changes: 1 addition & 1 deletion ydb/services/metadata/ds_table/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ IActor* CreateService(const TConfig& config) {

void TService::PrepareManagers(std::vector<IClassBehaviour::TPtr> managers, TAutoPtr<IEventBase> ev, const NActors::TActorId& sender) {
TBehavioursId id(managers);
if (RegistrationData->GetInitializationSnapshot()) {
if (RegistrationData->GetSnapshotOwner()->HasInitializationSnapshot()) {
auto bInitializer = NInitializer::TDBObjectBehaviour::GetInstance();
switch (RegistrationData->GetStage()) {
case TRegistrationData::EStage::Created:
Expand Down
23 changes: 15 additions & 8 deletions ydb/services/metadata/initializer/accessor_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@
namespace NKikimr::NMetadata::NInitializer {

void TDSAccessorInitialized::DoNextModifier(const bool doPop) {
if (InitializationSnapshotOwner->HasInitializationSnapshot() && !doPop) {
while (Modifiers.size() && !doPop) {
if (InitializationSnapshotOwner->HasModification(ComponentId, Modifiers.front()->GetModificationId())) {
Modifiers.pop_front();
} else {
break;
}
}
}
if (doPop) {
Modifiers.pop_front();
}
Expand All @@ -27,11 +36,11 @@ void TDSAccessorInitialized::DoNextModifier(const bool doPop) {
TDSAccessorInitialized::TDSAccessorInitialized(const NRequest::TConfig& config,
const TString& componentId,
IInitializationBehaviour::TPtr initializationBehaviour,
IInitializerOutput::TPtr controller, std::shared_ptr<TSnapshot> initializationSnapshot)
IInitializerOutput::TPtr controller, const std::shared_ptr<NProvider::TInitializationSnapshotOwner>& snapshotOwner)
: Config(config)
, InitializationBehaviour(initializationBehaviour)
, ExternalController(controller)
, InitializationSnapshot(initializationSnapshot)
, InitializationSnapshotOwner(snapshotOwner)
, ComponentId(componentId)
{
}
Expand All @@ -40,7 +49,7 @@ void TDSAccessorInitialized::OnModificationFinished(const TString& modificationI
ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "modifiers count: " << Modifiers.size();
Y_ABORT_UNLESS(Modifiers.size());
Y_ABORT_UNLESS(Modifiers.front()->GetModificationId() == modificationId);
if (NProvider::TServiceOperator::IsEnabled() && InitializationSnapshot) {
if (NProvider::TServiceOperator::IsEnabled() && InitializationSnapshotOwner->HasInitializationSnapshot()) {
TDBInitialization dbInit(ComponentId, Modifiers.front()->GetModificationId());
NModifications::IOperationsManager::TExternalModificationContext extContext;
extContext.SetUserToken(NACLib::TSystemUsers::Metadata());
Expand All @@ -58,9 +67,6 @@ void TDSAccessorInitialized::OnModificationFinished(const TString& modificationI
void TDSAccessorInitialized::OnPreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) {
for (auto&& i : modifiers) {
TDBInitializationKey key(ComponentId, i->GetModificationId());
if (InitializationSnapshot && InitializationSnapshot->GetObjects().contains(key)) {
continue;
}
Modifiers.emplace_back(i);
}
DoNextModifier(false);
Expand Down Expand Up @@ -93,10 +99,11 @@ void TDSAccessorInitialized::OnAlteringFinished() {

void TDSAccessorInitialized::Execute(const NRequest::TConfig& config, const TString& componentId,
IInitializationBehaviour::TPtr initializationBehaviour, IInitializerOutput::TPtr controller,
std::shared_ptr<TSnapshot> initializationSnapshot)
const std::shared_ptr<NProvider::TInitializationSnapshotOwner>& snapshotOwner)
{
AFL_VERIFY(snapshotOwner);
std::shared_ptr<TDSAccessorInitialized> initializer(new TDSAccessorInitialized(config,
componentId, initializationBehaviour, controller, initializationSnapshot));
componentId, initializationBehaviour, controller, snapshotOwner));
initializer->SelfPtr = initializer;

initializationBehaviour->Prepare(initializer);
Expand Down
7 changes: 4 additions & 3 deletions ydb/services/metadata/initializer/accessor_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/services/metadata/abstract/common.h>
#include <ydb/services/metadata/abstract/initialization.h>
#include <ydb/services/metadata/ds_table/config.h>
#include <ydb/services/metadata/ds_table/registration.h>

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/event_local.h>
Expand All @@ -22,7 +23,7 @@ class TDSAccessorInitialized: public IInitializerInput,
const NRequest::TConfig Config;
IInitializationBehaviour::TPtr InitializationBehaviour;
IInitializerOutput::TPtr ExternalController;
std::shared_ptr<TSnapshot> InitializationSnapshot;
const std::shared_ptr<NProvider::TInitializationSnapshotOwner> InitializationSnapshotOwner;
const TString ComponentId;
std::shared_ptr<TDSAccessorInitialized> SelfPtr;

Expand All @@ -38,12 +39,12 @@ class TDSAccessorInitialized: public IInitializerInput,
TDSAccessorInitialized(const NRequest::TConfig& config,
const TString& componentId,
IInitializationBehaviour::TPtr initializationBehaviour,
IInitializerOutput::TPtr controller, std::shared_ptr<TSnapshot> initializationSnapshot);
IInitializerOutput::TPtr controller, const std::shared_ptr<NProvider::TInitializationSnapshotOwner>& snapshotOwner);
public:
static void Execute(const NRequest::TConfig& config,
const TString& componentId,
IInitializationBehaviour::TPtr initializationBehaviour,
IInitializerOutput::TPtr controller, std::shared_ptr<TSnapshot> initializationSnapshot);
IInitializerOutput::TPtr controller, const std::shared_ptr<NProvider::TInitializationSnapshotOwner>& initializationSnapshotOwner);

};

Expand Down
Loading