Skip to content

Commit

Permalink
move redirects to base class (ydb-platform#8934)
Browse files Browse the repository at this point in the history
  • Loading branch information
adameat committed Oct 1, 2024
1 parent e09fe6a commit 50ae5d1
Show file tree
Hide file tree
Showing 11 changed files with 274 additions and 375 deletions.
124 changes: 98 additions & 26 deletions ydb/core/viewer/json_pipe_req.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,26 +93,34 @@ void TViewerPipeClient::SendDelayedRequests() {
}
}

TPathId TViewerPipeClient::GetPathId(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
if (ev->Get()->Request->ResultSet.size() == 1) {
if (ev->Get()->Request->ResultSet.begin()->Self) {
const auto& info = ev->Get()->Request->ResultSet.begin()->Self->Info;
TPathId TViewerPipeClient::GetPathId(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& ev) {
if (ev.Request->ResultSet.size() == 1) {
if (ev.Request->ResultSet.begin()->Self) {
const auto& info = ev.Request->ResultSet.begin()->Self->Info;
return TPathId(info.GetSchemeshardId(), info.GetPathId());
}
if (ev->Get()->Request->ResultSet.begin()->TableId) {
return ev->Get()->Request->ResultSet.begin()->TableId.PathId;
if (ev.Request->ResultSet.begin()->TableId) {
return ev.Request->ResultSet.begin()->TableId.PathId;
}
}
return {};
}

TString TViewerPipeClient::GetPath(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
if (ev->Get()->Request->ResultSet.size() == 1) {
return CanonizePath(ev->Get()->Request->ResultSet.begin()->Path);
TString TViewerPipeClient::GetPath(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& ev) {
if (ev.Request->ResultSet.size() == 1) {
return CanonizePath(ev.Request->ResultSet.begin()->Path);
}
return {};
}

TPathId TViewerPipeClient::GetPathId(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
return GetPathId(*ev->Get());
}

TString TViewerPipeClient::GetPath(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
return GetPath(*ev->Get());
}

bool TViewerPipeClient::IsSuccess(const std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySetResult>& ev) {
return (ev->Request->ResultSet.size() == 1) && (ev->Request->ResultSet.begin()->Status == NSchemeCache::TSchemeCacheNavigate::EStatus::Ok);
}
Expand Down Expand Up @@ -147,6 +155,23 @@ TString TViewerPipeClient::GetError(const std::unique_ptr<TEvTxProxySchemeCache:
}
}

bool TViewerPipeClient::IsSuccess(const std::unique_ptr<TEvStateStorage::TEvBoardInfo>& ev) {
return ev->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok;
}

TString TViewerPipeClient::GetError(const std::unique_ptr<TEvStateStorage::TEvBoardInfo>& ev) {
switch (ev->Status) {
case TEvStateStorage::TEvBoardInfo::EStatus::Unknown:
return "Unknown";
case TEvStateStorage::TEvBoardInfo::EStatus::Ok:
return "Ok";
case TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable:
return "NotAvailable";
default:
return ::ToString(static_cast<int>(ev->Status));
}
}

void TViewerPipeClient::RequestHiveDomainStats(NNodeWhiteboard::TTabletId hiveId) {
TActorId pipeClient = ConnectTabletPipe(hiveId);
THolder<TEvHive::TEvRequestHiveDomainStats> request = MakeHolder<TEvHive::TEvRequestHiveDomainStats>();
Expand Down Expand Up @@ -540,10 +565,10 @@ void TViewerPipeClient::RequestStateStorageMetadataCacheEndpointsLookup(const TS
++Requests;
}

std::vector<TNodeId> TViewerPipeClient::GetNodesFromBoardReply(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
std::vector<TNodeId> TViewerPipeClient::GetNodesFromBoardReply(const TEvStateStorage::TEvBoardInfo& ev) {
std::vector<TNodeId> databaseNodes;
if (ev->Get()->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok) {
for (const auto& [actorId, infoEntry] : ev->Get()->InfoEntries) {
if (ev.Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok) {
for (const auto& [actorId, infoEntry] : ev.InfoEntries) {
databaseNodes.emplace_back(actorId.NodeId());
}
}
Expand All @@ -552,11 +577,20 @@ std::vector<TNodeId> TViewerPipeClient::GetNodesFromBoardReply(TEvStateStorage::
return databaseNodes;
}

std::vector<TNodeId> TViewerPipeClient::GetNodesFromBoardReply(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
return GetNodesFromBoardReply(*ev->Get());
}

void TViewerPipeClient::InitConfig(const TCgiParameters& params) {
Followers = FromStringWithDefault(params.Get("followers"), Followers);
Metrics = FromStringWithDefault(params.Get("metrics"), Metrics);
WithRetry = FromStringWithDefault(params.Get("with_retry"), WithRetry);
MaxRequestsInFlight = FromStringWithDefault(params.Get("max_requests_in_flight"), MaxRequestsInFlight);
Database = params.Get("database");
if (!Database) {
Database = params.Get("tenant");
}
Direct = FromStringWithDefault<bool>(params.Get("direct"), Direct);
}

void TViewerPipeClient::InitConfig(const TRequestSettings& settings) {
Expand Down Expand Up @@ -653,23 +687,43 @@ void TViewerPipeClient::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
}
}

void TViewerPipeClient::HandleResolveResource(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
ResourceNavigateResponse->Set(std::move(ev));
if (ResourceNavigateResponse->IsOk()) {
TSchemeCacheNavigate::TEntry& entry(ResourceNavigateResponse->Get()->Request->ResultSet.front());
SharedDatabase = CanonizePath(entry.Path);
if (SharedDatabase == AppData()->TenantName) {
Direct = true;
return Bootstrap(); // retry bootstrap without redirect this time
}
DatabaseBoardInfoResponse = MakeRequestStateStorageEndpointsLookup(SharedDatabase);
} else {
ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Failed to resolve database - shared database not found"));
}
}

void TViewerPipeClient::HandleResolveDatabase(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
if (ev->Get()->Request->ResultSet.size() == 1 && ev->Get()->Request->ResultSet.begin()->Status == NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
TSchemeCacheNavigate::TEntry& entry(ev->Get()->Request->ResultSet.front());
if (entry.DomainInfo) {
if (entry.DomainInfo->ResourcesDomainKey && entry.DomainInfo->DomainKey != entry.DomainInfo->ResourcesDomainKey) {
RequestSchemeCacheNavigate(TPathId(entry.DomainInfo->ResourcesDomainKey));
} else {
RequestStateStorageEndpointsLookup(CanonizePath(entry.Path));
}
DatabaseNavigateResponse->Set(std::move(ev));
if (DatabaseNavigateResponse->IsOk()) {
TSchemeCacheNavigate::TEntry& entry(DatabaseNavigateResponse->Get()->Request->ResultSet.front());
if (entry.DomainInfo && entry.DomainInfo->ResourcesDomainKey && entry.DomainInfo->DomainKey != entry.DomainInfo->ResourcesDomainKey) {
ResourceNavigateResponse = MakeRequestSchemeCacheNavigate(TPathId(entry.DomainInfo->ResourcesDomainKey));
Become(&TViewerPipeClient::StateResolveResource);
return;
}
DatabaseBoardInfoResponse = MakeRequestStateStorageEndpointsLookup(CanonizePath(entry.Path));
} else {
ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Failed to resolve database"));
ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Failed to resolve database - not found"));
}
}

void TViewerPipeClient::HandleResolveDatabase(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
ReplyAndPassAway(MakeForward(GetNodesFromBoardReply(ev)));
void TViewerPipeClient::HandleResolve(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
DatabaseBoardInfoResponse->Set(std::move(ev));
if (DatabaseBoardInfoResponse->IsOk()) {
ReplyAndPassAway(MakeForward(GetNodesFromBoardReply(DatabaseBoardInfoResponse->GetRef())));
} else {
ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Failed to resolve database - no nodes found"));
}
}

void TViewerPipeClient::HandleTimeout() {
Expand All @@ -678,15 +732,33 @@ void TViewerPipeClient::HandleTimeout() {

STATEFN(TViewerPipeClient::StateResolveDatabase) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvStateStorage::TEvBoardInfo, HandleResolveDatabase);
hFunc(TEvStateStorage::TEvBoardInfo, HandleResolve);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleResolveDatabase);
cFunc(TEvents::TEvWakeup::EventType, HandleTimeout);
}
}

STATEFN(TViewerPipeClient::StateResolveResource) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvStateStorage::TEvBoardInfo, HandleResolve);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleResolveResource);
cFunc(TEvents::TEvWakeup::EventType, HandleTimeout);
}
}

void TViewerPipeClient::RedirectToDatabase(const TString& database) {
RequestSchemeCacheNavigate(database);
Become(&TViewerPipeClient::StateResolveDatabase, TDuration::MilliSeconds(1000), new TEvents::TEvWakeup());
DatabaseNavigateResponse = MakeRequestSchemeCacheNavigate(database);
Become(&TViewerPipeClient::StateResolveDatabase);
}

bool TViewerPipeClient::NeedToRedirect() {
Direct |= !Event->Get()->Request.GetHeader("X-Forwarded-From-Node").empty(); // we're already forwarding
Direct |= (Database == AppData()->TenantName) || Database.empty(); // we're already on the right node or don't use database filter
if (Database && !Direct) {
RedirectToDatabase(Database); // to find some dynamic node and redirect query there
return true;
}
return false;
}

void TViewerPipeClient::PassAway() {
Expand Down
19 changes: 18 additions & 1 deletion ydb/core/viewer/json_pipe_req.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
bool Followers = true;
bool Metrics = true;
bool WithRetry = true;
TString Database;
TString SharedDatabase;
bool Direct = false;
ui32 Requests = 0;
ui32 MaxRequestsInFlight = 200;
NWilson::TSpan Span;
Expand Down Expand Up @@ -158,6 +161,10 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
}
};

std::optional<TRequestResponse<TEvTxProxySchemeCache::TEvNavigateKeySetResult>> DatabaseNavigateResponse;
std::optional<TRequestResponse<TEvTxProxySchemeCache::TEvNavigateKeySetResult>> ResourceNavigateResponse;
std::optional<TRequestResponse<TEvStateStorage::TEvBoardInfo>> DatabaseBoardInfoResponse;

NTabletPipe::TClientConfig GetPipeClientConfig();

~TViewerPipeClient();
Expand Down Expand Up @@ -211,12 +218,18 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
return MakeBSControllerID();
}

static TPathId GetPathId(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& ev);
static TString GetPath(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& ev);

static TPathId GetPathId(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
static TString GetPath(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);

static bool IsSuccess(const std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySetResult>& ev);
static TString GetError(const std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySetResult>& ev);

static bool IsSuccess(const std::unique_ptr<TEvStateStorage::TEvBoardInfo>& ev);
static TString GetError(const std::unique_ptr<TEvStateStorage::TEvBoardInfo>& ev);

TRequestResponse<TEvHive::TEvResponseHiveDomainStats> MakeRequestHiveDomainStats(TTabletId hiveId);
TRequestResponse<TEvHive::TEvResponseHiveStorageStats> MakeRequestHiveStorageStats(TTabletId hiveId);
TRequestResponse<TEvHive::TEvResponseHiveNodeStats> MakeRequestHiveNodeStats(TTabletId hiveId, TEvHive::TEvRequestHiveNodeStats* request);
Expand Down Expand Up @@ -252,6 +265,7 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
void RequestStateStorageMetadataCacheEndpointsLookup(const TString& path);
TRequestResponse<TEvStateStorage::TEvBoardInfo> MakeRequestStateStorageEndpointsLookup(const TString& path, ui64 cookie = 0);
std::vector<TNodeId> GetNodesFromBoardReply(TEvStateStorage::TEvBoardInfo::TPtr& ev);
std::vector<TNodeId> GetNodesFromBoardReply(const TEvStateStorage::TEvBoardInfo& ev);
void InitConfig(const TCgiParameters& params);
void InitConfig(const TRequestSettings& settings);
void ClosePipes();
Expand Down Expand Up @@ -285,9 +299,12 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
void AddEvent(const TString& name);
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev);
void HandleResolveDatabase(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
void HandleResolveDatabase(TEvStateStorage::TEvBoardInfo::TPtr& ev);
void HandleResolveResource(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
void HandleResolve(TEvStateStorage::TEvBoardInfo::TPtr& ev);
STATEFN(StateResolveDatabase);
STATEFN(StateResolveResource);
void RedirectToDatabase(const TString& database);
bool NeedToRedirect();
void HandleTimeout();
void PassAway() override;
};
Expand Down
87 changes: 42 additions & 45 deletions ydb/core/viewer/storage_groups.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ class TStorageGroups : public TViewerPipeClient {
using TFieldsType = std::bitset<+EGroupFields::COUNT>;

// Common
std::optional<TRequestResponse<TEvTxProxySchemeCache::TEvNavigateKeySetResult>> DatabaseNavigateResult;
std::unordered_map<TPathId, TRequestResponse<TEvTxProxySchemeCache::TEvNavigateKeySetResult>> NavigateKeySetResult;
std::unordered_map<TPathId, TTabletId> PathId2HiveId;
std::unordered_map<TTabletId, TRequestResponse<TEvHive::TEvResponseHiveStorageStats>> HiveStorageStats;
Expand All @@ -145,8 +144,6 @@ class TStorageGroups : public TViewerPipeClient {
ui64 PDiskStateRequestsInFlight = 0;

ui32 Timeout = 0;
TString Database;
bool Direct = false;
TString Filter;
std::unordered_set<TString> DatabaseStoragePools;
std::unordered_set<TString> FilterStoragePools;
Expand Down Expand Up @@ -610,18 +607,11 @@ class TStorageGroups : public TViewerPipeClient {
: TBase(viewer, ev)
{
const auto& params(Event->Get()->Request.GetParams());
InitConfig(params);
Timeout = FromStringWithDefault<ui32>(params.Get("timeout"), 10000);
Database = params.Get("tenant");
if (Database.empty()) {
Database = params.Get("database");
}
if (!Database.empty()) {
FieldsRequired.set(+EGroupFields::PoolName);
NeedFilter = true;
}
Direct = FromStringWithDefault<bool>(params.Get("direct"), Direct);

FieldsRequired.set(+EGroupFields::GroupId);
TString filterStoragePool = params.Get("pool");
if (!filterStoragePool.empty()) {
Expand Down Expand Up @@ -726,32 +716,35 @@ class TStorageGroups : public TViewerPipeClient {

public:
void Bootstrap() override {
Direct |= !TBase::Event->Get()->Request.GetHeader("X-Forwarded-From-Node").empty(); // we're already forwarding
Direct |= (Database == AppData()->TenantName) || Database.empty(); // we're already on the right node or don't use database filter

if (Database && !Direct) {
return RedirectToDatabase(Database); // to find some dynamic node and redirect query there
if (TBase::NeedToRedirect()) {
return;
}
if (Database) {
if (!DatabaseNavigateResponse) {
DatabaseNavigateResponse = MakeRequestSchemeCacheNavigate(Database, 0);
} else {
auto pathId = GetPathId(DatabaseNavigateResponse->GetRef());
auto result = NavigateKeySetResult.emplace(pathId, std::move(*DatabaseNavigateResponse));
ProcessNavigate(result.first->second, true);
}
}
if (FallbackToWhiteboard) {
RequestWhiteboard();
} else {
if (Database) {
DatabaseNavigateResult = MakeRequestSchemeCacheNavigate(Database, 0);
if (FieldsNeeded(FieldsBsGroups)) {
GetGroupsResponse = RequestBSControllerGroups();
}
if (FallbackToWhiteboard) {
RequestWhiteboard();
} else {
if (FieldsNeeded(FieldsBsGroups)) {
GetGroupsResponse = RequestBSControllerGroups();
}
if (FieldsNeeded(FieldsBsPools)) {
GetStoragePoolsResponse = RequestBSControllerPools();
}
if (FieldsNeeded(FieldsBsVSlots)) {
GetVSlotsResponse = RequestBSControllerVSlots();
}
if (FieldsNeeded(FieldsBsPDisks)) {
GetPDisksResponse = RequestBSControllerPDisks();
}
if (FieldsNeeded(FieldsBsPools)) {
GetStoragePoolsResponse = RequestBSControllerPools();
}
if (FieldsNeeded(FieldsBsVSlots)) {
GetVSlotsResponse = RequestBSControllerVSlots();
}
if (FieldsNeeded(FieldsBsPDisks)) {
GetPDisksResponse = RequestBSControllerPDisks();
}
}

if (Requests == 0) {
return ReplyAndPassAway();
}
Expand Down Expand Up @@ -1100,19 +1093,7 @@ class TStorageGroups : public TViewerPipeClient {
}
}

void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
bool firstNavigate = (ev->Cookie == 0);
TPathId pathId = GetPathId(ev);
if (firstNavigate && DatabaseNavigateResult.has_value() && pathId) {
NavigateKeySetResult.emplace(pathId, std::move(*DatabaseNavigateResult));
}
auto itNavigateKeySetResult = NavigateKeySetResult.find(pathId);
if (itNavigateKeySetResult == NavigateKeySetResult.end()) {
BLOG_W("Invalid NavigateKeySetResult PathId: " << pathId << " Path: " << CanonizePath(ev->Get()->Request->ResultSet.begin()->Path));
return RequestDone();
}
auto& navigateResult(itNavigateKeySetResult->second);
navigateResult.Set(std::move(ev));
void ProcessNavigate(TRequestResponse<TEvTxProxySchemeCache::TEvNavigateKeySetResult>& navigateResult, bool firstNavigate) {
if (navigateResult.IsOk()) {
TString path = CanonizePath(navigateResult->Request->ResultSet.begin()->Path);
TIntrusiveConstPtr<TSchemeCacheNavigate::TDomainDescription> domainDescription = navigateResult->Request->ResultSet.begin()->DomainDescription;
Expand All @@ -1134,6 +1115,22 @@ class TStorageGroups : public TViewerPipeClient {
}
}
}
}

void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
bool firstNavigate = (ev->Cookie == 0);
TPathId pathId = GetPathId(ev);
if (firstNavigate && DatabaseNavigateResponse && pathId) {
NavigateKeySetResult.emplace(pathId, std::move(*DatabaseNavigateResponse));
}
auto itNavigateKeySetResult = NavigateKeySetResult.find(pathId);
if (itNavigateKeySetResult == NavigateKeySetResult.end()) {
BLOG_W("Invalid NavigateKeySetResult PathId: " << pathId << " Path: " << CanonizePath(ev->Get()->Request->ResultSet.begin()->Path));
return RequestDone();
}
auto& navigateResult(itNavigateKeySetResult->second);
navigateResult.Set(std::move(ev));
ProcessNavigate(navigateResult, firstNavigate);
RequestDone();
}

Expand Down
Loading

0 comments on commit 50ae5d1

Please sign in to comment.