Skip to content

Commit

Permalink
Support online SchemeBoard subscriber reconfiguration (#5724)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru authored Jun 19, 2024
1 parent 1266d9f commit 6efb9e8
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 22 deletions.
7 changes: 5 additions & 2 deletions ydb/core/base/statestorage_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,18 @@ struct TEvStateStorage::TEvResolveSchemeBoard : public TEventLocal<TEvResolveSch
const TPathId PathId;

const EKeyType KeyType;
const bool Subscribe;

TEvResolveSchemeBoard(const TString &path)
TEvResolveSchemeBoard(const TString &path, bool subscribe = false)
: Path(path)
, KeyType(KeyTypePath)
, Subscribe(subscribe)
{}

TEvResolveSchemeBoard(const TPathId& pathId)
TEvResolveSchemeBoard(const TPathId& pathId, bool subscribe = false)
: PathId(pathId)
, KeyType(KeyTypePathId)
, Subscribe(subscribe)
{}
};

Expand Down
13 changes: 9 additions & 4 deletions ydb/core/base/statestorage_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ class TStateStorageProxy : public TActor<TStateStorageProxy> {

TMap<TActorId, TActorId> ReplicaProbes;

THashMap<std::tuple<TActorId, ui64>, ui64> Subscriptions;
THashMap<std::tuple<TActorId, ui64>, std::tuple<ui64, TIntrusivePtr<TStateStorageInfo> TThis::*>> Subscriptions;
THashSet<std::tuple<TActorId, ui64>> SchemeBoardSubscriptions;

void Handle(TEvStateStorage::TEvRequestReplicasDumps::TPtr &ev) {
Expand All @@ -768,7 +768,7 @@ class TStateStorageProxy : public TActor<TStateStorageProxy> {

void Handle(TEvStateStorage::TEvResolveReplicas::TPtr &ev) {
if (ev->Get()->Subscribe) {
Subscriptions.emplace(std::make_tuple(ev->Sender, ev->Cookie), ev->Get()->TabletID);
Subscriptions.try_emplace(std::make_tuple(ev->Sender, ev->Cookie), ev->Get()->TabletID, &TThis::Info);
}
ResolveReplicas(ev, ev->Get()->TabletID, Info);
}
Expand Down Expand Up @@ -813,6 +813,10 @@ class TStateStorageProxy : public TActor<TStateStorageProxy> {
Y_ABORT("unreachable");
}

if (ev->Get()->Subscribe) {
Subscriptions.try_emplace(std::make_tuple(ev->Sender, ev->Cookie), fakeTabletId, &TThis::SchemeBoardInfo);
}

ResolveReplicas(ev, fakeTabletId, SchemeBoardInfo);
}

Expand Down Expand Up @@ -866,10 +870,11 @@ class TStateStorageProxy : public TActor<TStateStorageProxy> {

RegisterDerivedServices(TlsActivationContext->ExecutorThread.ActorSystem, old.Get());

for (const auto& [key, tabletId] : Subscriptions) {
for (const auto& [key, value] : Subscriptions) {
const auto& [sender, cookie] = key;
const auto& [tabletId, ptr] = value;
struct { TActorId Sender; ui64 Cookie; } ev{sender, cookie};
ResolveReplicas(&ev, tabletId, Info);
ResolveReplicas(&ev, tabletId, this->*ptr);
}
for (const auto& [sender, cookie] : SchemeBoardSubscriptions) {
Send(sender, new TEvStateStorage::TEvListSchemeBoardResult(SchemeBoardInfo), 0, cookie);
Expand Down
13 changes: 5 additions & 8 deletions ydb/core/blobstorage/nodewarden/distconf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@ namespace NKikimr::NStorage {
void TDistributedConfigKeeper::Bootstrap() {
STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap");

// report initial node listing for static node
if (IsSelfStatic) {
auto ns = NNodeBroker::BuildNameserverTable(Cfg->NameserviceConfig);
auto ev = std::make_unique<TEvInterconnect::TEvNodesInfo>();
for (const auto& [nodeId, item] : ns->StaticNodeTable) {
ev->Nodes.emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location);
}
Send(SelfId(), ev.release());
auto ns = NNodeBroker::BuildNameserverTable(Cfg->NameserviceConfig);
auto ev = std::make_unique<TEvInterconnect::TEvNodesInfo>();
for (const auto& [nodeId, item] : ns->StaticNodeTable) {
ev->Nodes.emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location);
}
Send(SelfId(), ev.release());

// and subscribe for the node list too
Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(true));
Expand Down
50 changes: 42 additions & 8 deletions ydb/core/tx/scheme_board/subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ namespace {
struct TEvPrivate {
enum EEv {
EvReplicaMissing = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
EvSwitchReplica,

EvEnd,
};
Expand Down Expand Up @@ -610,11 +611,16 @@ class TSubscriberProxy: public TMonitorableActor<TDerived> {
NJson::TJsonMap MonAttributes() const override {
return {
{"Parent", TMonitorableActor<TDerived>::PrintActorIdAttr(NKikimrServices::TActivity::SCHEME_BOARD_SUBSCRIBER_ACTOR, Parent)},
{"Replica", TMonitorableActor<TDerived>::PrintActorIdAttr(NKikimrServices::TActivity::SCHEME_BOARD_REPLICA_ACTOR, Replica)},
{"ReplicaIndex", TStringBuilder() << ReplicaIndex << '/' << TotalReplicas},
{"Path", ToString(Path)},
};
}

void HandleSwitchReplica(STATEFN_SIG) {
Replica = ev->Sender;
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, ReplicaSubscriber, this->SelfId(), nullptr, 0));
}

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::SCHEME_BOARD_SUBSCRIBER_PROXY_ACTOR;
Expand All @@ -626,10 +632,14 @@ class TSubscriberProxy: public TMonitorableActor<TDerived> {

explicit TSubscriberProxy(
const TActorId& parent,
const ui32 replicaIndex,
const ui32 totalReplicas,
const TActorId& replica,
const TPath& path,
const ui64 domainOwnerId)
: Parent(parent)
, ReplicaIndex(replicaIndex)
, TotalReplicas(totalReplicas)
, Replica(replica)
, Path(path)
, DomainOwnerId(domainOwnerId)
Expand All @@ -656,6 +666,8 @@ class TSubscriberProxy: public TMonitorableActor<TDerived> {
hFunc(TEvents::TEvGone, Handle);
hFunc(TEvPrivate::TEvReplicaMissing, Handle);
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);

fFunc(TEvPrivate::EvSwitchReplica, HandleSwitchReplica);
}
}

Expand All @@ -667,14 +679,18 @@ class TSubscriberProxy: public TMonitorableActor<TDerived> {

CFunc(TEvents::TEvWakeup::EventType, Bootstrap);
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);

fFunc(TEvPrivate::EvSwitchReplica, HandleSwitchReplica);
}
}

using TBase = TSubscriberProxy<TPath, TDerived, TReplicaDerived>;

private:
const TActorId Parent;
const TActorId Replica;
const ui32 ReplicaIndex;
const ui32 TotalReplicas;
TActorId Replica;
const TPath Path;
const ui64 DomainOwnerId;

Expand Down Expand Up @@ -778,7 +794,7 @@ class TSubscriber: public TMonitorableActor<TDerived> {
DelayedSyncRequest = 0;

Y_ABORT_UNLESS(PendingSync.empty());
for (const auto& proxy : Proxies) {
for (const auto& [proxy, replica] : Proxies) {
this->Send(proxy, new NInternalEvents::TEvSyncVersionRequest(Path), 0, CurrentSyncRequest);
PendingSync.emplace(proxy);
}
Expand Down Expand Up @@ -946,13 +962,26 @@ class TSubscriber: public TMonitorableActor<TDerived> {
const auto& replicas = ev->Get()->Replicas;

if (replicas.empty()) {
Y_ABORT_UNLESS(Proxies.empty());
SBS_LOG_E("Subscribe on unconfigured SchemeBoard");
this->Become(&TDerived::StateCalm);
return;
}

for (const auto& replica : replicas) {
Proxies.emplace(this->RegisterWithSameMailbox(new TProxyDerived(this->SelfId(), replica, Path, DomainOwnerId)));
Y_ABORT_UNLESS(Proxies.empty() || Proxies.size() == replicas.size());

if (Proxies.empty()) {
for (size_t i = 0; i < replicas.size(); ++i) {
Proxies.emplace_back(this->RegisterWithSameMailbox(new TProxyDerived(this->SelfId(), i, replicas.size(),
replicas[i], Path, DomainOwnerId)), replicas[i]);
}
} else {
for (size_t i = 0; i < replicas.size(); ++i) {
if (auto& [proxy, replica] = Proxies[i]; replica != replicas[i]) {
TActivationContext::Send(new IEventHandle(TEvPrivate::EvSwitchReplica, 0, proxy, replicas[i], nullptr, 0));
replica = replicas[i];
}
}
}

this->Become(&TDerived::StateWork);
Expand Down Expand Up @@ -1011,10 +1040,13 @@ class TSubscriber: public TMonitorableActor<TDerived> {
}

void PassAway() override {
for (const auto& proxy : Proxies) {
for (const auto& [proxy, replica] : Proxies) {
this->Send(proxy, new TEvents::TEvPoisonPill());
}

TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, MakeStateStorageProxyID(),
this->SelfId(), nullptr, 0));

TMonitorableActor<TDerived>::PassAway();
}

Expand Down Expand Up @@ -1050,7 +1082,7 @@ class TSubscriber: public TMonitorableActor<TDerived> {
TMonitorableActor<TDerived>::Bootstrap();

const TActorId proxy = MakeStateStorageProxyID();
this->Send(proxy, new TEvStateStorage::TEvResolveSchemeBoard(Path), IEventHandle::FlagTrackDelivery);
this->Send(proxy, new TEvStateStorage::TEvResolveSchemeBoard(Path, true), IEventHandle::FlagTrackDelivery);
this->Become(&TDerived::StateResolve);
}

Expand All @@ -1069,6 +1101,8 @@ class TSubscriber: public TMonitorableActor<TDerived> {

STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvStateStorage::TEvResolveReplicasList, Handle);

hFunc(NInternalEvents::TEvNotify, Handle);
hFunc(NInternalEvents::TEvSyncRequest, Handle); // from owner (cache)
hFunc(NInternalEvents::TEvSyncVersionResponse, Handle); // from proxies
Expand All @@ -1094,7 +1128,7 @@ class TSubscriber: public TMonitorableActor<TDerived> {
const TPath Path;
const ui64 DomainOwnerId;

TSet<TActorId> Proxies;
std::vector<std::tuple<TActorId, TActorId>> Proxies;
TMap<TActorId, TState> States;
TMap<TActorId, TNotifyResponse> InitialResponses;
TMaybe<TState> State;
Expand Down

0 comments on commit 6efb9e8

Please sign in to comment.