Skip to content

Commit

Permalink
YQ-3459 fix resource pools permissions (#6989)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Jul 24, 2024
1 parent 60451f5 commit 1a58392
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 45 deletions.
113 changes: 96 additions & 17 deletions ydb/core/kqp/workload_service/actors/scheme_actors.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#include "actors.h"

#include <ydb/core/base/path.h>
#include <ydb/core/base/tablet_pipe.h>

#include <ydb/core/kqp/common/simple/services.h>
#include <ydb/core/kqp/workload_service/common/events.h>
#include <ydb/core/kqp/workload_service/common/helpers.h>

#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/tx/tx_proxy/proxy.h>

#include <ydb/library/table_creator/table_creator.h>
Expand Down Expand Up @@ -64,7 +66,13 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {
for (const TString& usedSid : AppData()->AdministrationAllowedSIDs) {
diffAcl.AddAccess(NACLib::EAccessType::Allow, NACLib::EAccessRights::GenericFull, usedSid);
}
diffAcl.AddAccess(NACLib::EAccessType::Allow, NACLib::EAccessRights::SelectRow | NACLib::EAccessRights::DescribeSchema, AppData()->AllAuthenticatedUsers);

auto useAccess = NACLib::EAccessRights::SelectRow | NACLib::EAccessRights::DescribeSchema;
for (const auto& userSID : AppData()->DefaultUserSIDs) {
diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, userSID);
}
diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, AppData()->AllAuthenticatedUsers);
diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, BUILTIN_ACL_ROOT);

auto token = MakeIntrusive<NACLib::TUserToken>(BUILTIN_ACL_METADATA, TVector<NACLib::TSID>{});
Register(CreatePoolCreatorActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, NResourcePool::TPoolSettings(), token, diffAcl));
Expand Down Expand Up @@ -116,7 +124,7 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {

class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
public:
TPoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless)
TPoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless)
: ReplyActorId(replyActorId)
, Database(database)
, PoolId(poolId)
Expand Down Expand Up @@ -255,38 +263,67 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
}

void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) {
const auto ssStatus = ev->Get()->Record.GetSchemeShardStatus();
switch (ev->Get()->Status()) {
const auto& response = ev->Get()->Record;
const auto ssStatus = response.GetSchemeShardStatus();
const auto status = ev->Get()->Status();
switch (status) {
case NTxProxy::TResultStatus::ExecComplete:
case NTxProxy::TResultStatus::ExecAlready:
if (ssStatus == NKikimrScheme::EStatus::StatusSuccess || ssStatus == NKikimrScheme::EStatus::StatusAlreadyExists) {
Reply(Ydb::StatusIds::SUCCESS);
} else {
Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Invalid creation status: " << static_cast<NKikimrScheme::EStatus>(ssStatus));
Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Invalid creation status: " << static_cast<NKikimrScheme::EStatus>(ssStatus)));
}
return;
case NTxProxy::TResultStatus::ExecError:
if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications || ssStatus == NKikimrScheme::EStatus::StatusInvalidParameter) {
ScheduleRetry(ssStatus, "Retry execution error", true);
if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications) {
SubscribeOnTransactionOrRetry(status, response);
} else {
Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Execution error: " << static_cast<NKikimrScheme::EStatus>(ssStatus));
Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Execution error: " << static_cast<NKikimrScheme::EStatus>(ssStatus)));
}
return;
case NTxProxy::TResultStatus::ExecInProgress:
ScheduleRetry(ssStatus, "Retry execution in progress error", true);
SubscribeOnTransactionOrRetry(status, response);
return;
case NTxProxy::TResultStatus::ProxyShardNotAvailable:
ScheduleRetry(ssStatus, "Retry shard unavailable error");
ScheduleRetry(response, "Retry shard unavailable error");
return;
default:
Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Failed to create resource pool: " << static_cast<NKikimrScheme::EStatus>(ssStatus));
Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Failed to create resource pool: " << static_cast<NKikimrScheme::EStatus>(ssStatus)));
return;
}
}

void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
if (ev->Get()->Status == NKikimrProto::OK) {
LOG_T("Tablet to pipe successfully connected");
return;
}

ClosePipeClient();
ScheduleRetry(TStringBuilder() << "Tablet to pipe not connected: " << NKikimrProto::EReplyStatus_Name(ev->Get()->Status));
}

void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) {
const TActorId clientId = ev->Get()->ClientId;
if (!ClosedSchemePipeActors.contains(clientId)) {
ClosePipeClient();
ScheduleRetry("Tablet to pipe destroyed");
}
}

void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) {
ScheduleRetry(TStringBuilder() << "Transaction " << ev->Get()->Record.GetTxId() << " completed, doublechecking");
}

STFUNC(StateFunc) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle)
hFunc(TEvTabletPipe::TEvClientConnected, Handle)
hFunc(TEvTabletPipe::TEvClientDestroyed, Handle)
hFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, Handle)
IgnoreFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered)

default:
StateFuncBase(ev);
}
Expand All @@ -301,13 +338,12 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
schemeTx.SetWorkingDir(JoinPath({Database, ".resource_pools"}));
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateResourcePool);
schemeTx.SetInternal(true);
schemeTx.SetAllowAccessToPrivatePaths(true);

BuildCreatePoolRequest(*schemeTx.MutableCreateResourcePool());
BuildModifyAclRequest(*schemeTx.MutableModifyACL());

if (UserToken) {
event->Record.SetUserToken(UserToken->GetSerializedToken());
event->Record.SetUserToken(UserToken->SerializeAsString());
}

Send(MakeTxProxyID(), std::move(event));
Expand All @@ -322,10 +358,42 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
}

private:
void ScheduleRetry(ui32 status, const TString& message, bool longDelay = false) {
auto ssStatus = static_cast<NKikimrScheme::EStatus>(status);
if (!TBase::ScheduleRetry(TStringBuilder() << message << ", status: " << ssStatus, longDelay)) {
Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on status: " << ssStatus);
void SubscribeOnTransactionOrRetry(NTxProxy::TResultStatus::EStatus status, const NKikimrTxUserProxy::TEvProposeTransactionStatus& response) {
const ui64 txId = status == NTxProxy::TResultStatus::ExecInProgress ? response.GetTxId() : response.GetPathCreateTxId();
if (txId == 0) {
ScheduleRetry(response, "Unable to subscribe to concurrent transaction", true);
return;
}

SchemePipeActorId = Register(NTabletPipe::CreateClient(SelfId(), response.GetSchemeShardTabletId()));

auto request = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>();
request->Record.SetTxId(txId);
NTabletPipe::SendData(SelfId(), SchemePipeActorId, std::move(request));
LOG_D("Subscribe on create pool tx: " << txId);
}

void ClosePipeClient() {
if (SchemePipeActorId) {
ClosedSchemePipeActors.insert(SchemePipeActorId);
NTabletPipe::CloseClient(SelfId(), SchemePipeActorId);
SchemePipeActorId = {};
}
}

void ScheduleRetry(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message, bool longDelay = false) {
ClosePipeClient();

auto ssStatus = static_cast<NKikimrScheme::EStatus>(response.GetSchemeShardStatus());
if (!TBase::ScheduleRetry(ExtractIssues(response, TStringBuilder() << message << ", status: " << ssStatus), longDelay)) {
Reply(Ydb::StatusIds::UNAVAILABLE, ExtractIssues(response, TStringBuilder() << "Retry limit exceeded on status: " << ssStatus));
}
}

void ScheduleRetry(const TString& message, bool longDelay = false) {
ClosePipeClient();
if (!TBase::ScheduleRetry(message, longDelay)) {
Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on error: " << message);
}
}

Expand Down Expand Up @@ -358,18 +426,29 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
LOG_W("Failed to create pool, " << status << ", issues: " << issues.ToOneLineString());
}

ClosePipeClient();

Issues.AddIssues(std::move(issues));
Send(ReplyActorId, new TEvPrivate::TEvCreatePoolResponse(status, std::move(Issues)));
PassAway();
}

static NYql::TIssues ExtractIssues(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message) {
NYql::TIssues issues;
NYql::IssuesFromMessage(response.GetIssues(), issues);
return GroupIssues(issues, message);
}

private:
const TActorId ReplyActorId;
const TString Database;
const TString PoolId;
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
const NACLibProto::TDiffACL DiffAcl;
NResourcePool::TPoolSettings PoolConfig;

std::unordered_set<TActorId> ClosedSchemePipeActors;
TActorId SchemePipeActorId;
};

} // anonymous namespace
Expand Down
10 changes: 7 additions & 3 deletions ydb/core/kqp/workload_service/common/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,25 @@ class TSchemeActorBase : public NActors::TActorBootstrapped<TDerived> {
virtual TString LogPrefix() const = 0;

protected:
bool ScheduleRetry(const TString& message, bool longDelay = false) {
bool ScheduleRetry(NYql::TIssues issues, bool longDelay = false) {
if (!RetryState) {
RetryState = CreateRetryState();
}

if (const auto delay = RetryState->GetNextRetryDelay(longDelay)) {
Issues.AddIssue(message);
Issues.AddIssues(issues);
this->Schedule(*delay, new TEvents::TEvWakeup());
LOG_W("Scheduled retry for error: " << message);
LOG_W("Scheduled retry for error: " << issues.ToOneLineString());
return true;
}

return false;
}

bool ScheduleRetry(const TString& message, bool longDelay = false) {
return ScheduleRetry({NYql::TIssue(message)}, longDelay);
}

private:
static TRetryPolicy::IRetryState::TPtr CreateRetryState() {
return TRetryPolicy::GetFixedIntervalPolicy(
Expand Down
22 changes: 0 additions & 22 deletions ydb/core/kqp/workload_service/kqp_workload_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,6 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
return;
}

// Add AllAuthenticatedUsers group SID into user token
ev->Get()->UserToken = GetUserToken(ev->Get()->UserToken);

LOG_D("Recieved new request from " << workerActorId << ", Database: " << ev->Get()->Database << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId);
bool hasDefaultPool = DatabasesWithDefaultPool.contains(CanonizePath(ev->Get()->Database));
Register(CreatePoolResolverActor(std::move(ev), hasDefaultPool, EnabledResourcePoolsOnServerless));
Expand Down Expand Up @@ -475,25 +472,6 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
Send(replyActorId, new TEvCleanupResponse(status, {NYql::TIssue(message)}));
}

static TIntrusivePtr<NACLib::TUserToken> GetUserToken(TIntrusiveConstPtr<NACLib::TUserToken> userToken) {
auto token = MakeIntrusive<NACLib::TUserToken>(userToken ? userToken->GetUserSID() : NACLib::TSID(), TVector<NACLib::TSID>{});

bool hasAllAuthenticatedUsersSID = false;
const auto& allAuthenticatedUsersSID = AppData()->AllAuthenticatedUsers;
if (userToken) {
for (const auto& groupSID : userToken->GetGroupSIDs()) {
token->AddGroupSID(groupSID);
hasAllAuthenticatedUsersSID = hasAllAuthenticatedUsersSID || groupSID == allAuthenticatedUsersSID;
}
}

if (!hasAllAuthenticatedUsersSID) {
token->AddGroupSID(allAuthenticatedUsersSID);
}

return token;
}

TPoolState* GetPoolState(const TString& database, const TString& poolId) {
return GetPoolState(GetPoolKey(database, poolId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ TEvPrivate::TEvFetchPoolResponse::TPtr FetchPool(TIntrusivePtr<IYdbSetup> ydb, c
auto runtime = ydb->GetRuntime();
const auto& edgeActor = runtime->AllocateEdgeActor();

runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, MakeIntrusive<NACLib::TUserToken>(userSID, TVector<NACLib::TSID>{}), true));
auto userToken = MakeIntrusive<NACLib::TUserToken>(userSID, TVector<NACLib::TSID>{});
userToken->SaveSerializationInfo();
runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, userToken, true));
return runtime->GrabEdgeEvent<TEvPrivate::TEvFetchPoolResponse>(edgeActor, FUTURE_WAIT_TIMEOUT);
}

Expand Down Expand Up @@ -108,7 +110,8 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceActors) {

// Check default pool access
TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(userSID)));
TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID("")));
TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(ydb->GetRuntime()->GetAppData().AllAuthenticatedUsers)));
TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(BUILTIN_ACL_ROOT)));
}

Y_UNIT_TEST(TestDefaultPoolAdminPermissions) {
Expand Down
4 changes: 3 additions & 1 deletion ydb/library/table_creator/table_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,9 @@ THolder<NSchemeCache::TSchemeCacheNavigate> BuildSchemeCacheNavigateRequest(cons
auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
auto databasePath = SplitPath(database);
request->DatabaseName = CanonizePath(databasePath);
request->UserToken = userToken;
if (userToken && !userToken->GetSerializedToken().empty()) {
request->UserToken = userToken;
}

for (const auto& pathComponents : pathsComponents) {
auto& entry = request->ResultSet.emplace_back();
Expand Down

0 comments on commit 1a58392

Please sign in to comment.