Skip to content

Commit

Permalink
Add sdk_build_info label; use GetFullTopicPath function
Browse files Browse the repository at this point in the history
  • Loading branch information
qyryq committed Oct 23, 2024
1 parent d043cbd commit 2d495a5
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include "data_plane_helpers.h"
#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>

namespace NKikimr::NPersQueueTests {

Expand Down Expand Up @@ -51,7 +53,8 @@ namespace NKikimr::NPersQueueTests {
std::optional<ui32> partitionGroup,
std::optional<TString> codec,
std::optional<bool> reconnectOnFailure,
THashMap<TString, TString> sessionMeta
THashMap<TString, TString> sessionMeta,
const TString& userAgent
) {
auto settings = TWriteSessionSettings().Path(topic).MessageGroupId(sourceId);
if (partitionGroup) settings.PartitionGroupId(*partitionGroup);
Expand All @@ -66,6 +69,9 @@ namespace NKikimr::NPersQueueTests {
}
settings.MaxMemoryUsage(1024*1024*1024*1024ll);
settings.Meta_.Fields = sessionMeta;
if (!userAgent.empty()) {
settings.Header({{NYdb::YDB_APPLICATION_NAME, userAgent}});
}
return CreateSimpleWriter(driver, settings);
}

Expand All @@ -79,6 +85,21 @@ namespace NKikimr::NPersQueueTests {
return TPersQueueClient(driver, clientSettings).CreateReadSession(TReadSessionSettings(settings).DisableClusterDiscovery(true));
}

std::shared_ptr<NYdb::NTopic::IReadSession> CreateReader(
NYdb::TDriver& driver,
const NYdb::NTopic::TReadSessionSettings& settings,
std::shared_ptr<NYdb::ICredentialsProviderFactory> creds,
const TString& userAgent
) {
NYdb::NTopic::TTopicClientSettings clientSettings;
if (creds) clientSettings.CredentialsProviderFactory(creds);
auto readerSettings = settings;
if (!userAgent.empty()) {
readerSettings.Header({{NYdb::YDB_APPLICATION_NAME, userAgent}});
}
return NYdb::NTopic::TTopicClient(driver, clientSettings).CreateReadSession(readerSettings);
}

TMaybe<TReadSessionEvent::TDataReceivedEvent> GetNextMessageSkipAssignment(std::shared_ptr<IReadSession>& reader, TDuration timeout) {
while (true) {
auto future = reader->WaitEvent();
Expand All @@ -99,4 +120,25 @@ namespace NKikimr::NPersQueueTests {
}
return {};
}

TMaybe<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent> GetNextMessageSkipAssignment(std::shared_ptr<NYdb::NTopic::IReadSession>& reader, TDuration timeout) {
while (true) {
auto future = reader->WaitEvent();
future.Wait(timeout);

TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent> event = reader->GetEvent(false, 1);
if (!event)
return {};
if (auto e = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&*event)) {
return *e;
} else if (auto* e = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) {
e->Confirm();
} else if (auto* e = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) {
e->Confirm();
} else if (std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*event)) {
return {};
}
}
return {};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>

namespace NKikimr::NPersQueueTests {

Expand Down Expand Up @@ -34,16 +35,24 @@ namespace NKikimr::NPersQueueTests {
std::optional<ui32> partitionGroup = {},
std::optional<TString> codec = {},
std::optional<bool> reconnectOnFailure = {},
THashMap<TString, TString> sessionMeta = {}
THashMap<TString, TString> sessionMeta = {},
const TString& userAgent = {}
);

std::shared_ptr<NYdb::NPersQueue::IReadSession> CreateReader(
NYdb::TDriver& driver,
const NYdb::NPersQueue::TReadSessionSettings& settings,
std::shared_ptr<NYdb::ICredentialsProviderFactory> creds = nullptr
);

std::shared_ptr<NYdb::NTopic::IReadSession> CreateReader(
NYdb::TDriver& driver,
const NYdb::NTopic::TReadSessionSettings& settings,
std::shared_ptr<NYdb::ICredentialsProviderFactory> creds = nullptr,
const TString& userAgent = ""
);

TMaybe<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent> GetNextMessageSkipAssignment(std::shared_ptr<NYdb::NPersQueue::IReadSession>& reader, TDuration timeout = TDuration::Max());
TMaybe<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent> GetNextMessageSkipAssignment(std::shared_ptr<NYdb::NTopic::IReadSession>& reader, TDuration timeout = TDuration::Max());

}
33 changes: 33 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_topic/ut/callback_context_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#include <library/cpp/testing/unittest/registar.h>

#include <ydb/public/sdk/cpp/client/ydb_topic/common/callback_context.h>

namespace NYdb::NTopic::NTests {

Y_UNIT_TEST_SUITE(CallbackContext) {


class TNumber : public TEnableSelfContext<TNumber> {
public:
size_t x = 0;
};

Y_UNIT_TEST(T) {
// TODO Benchmark

auto ptr = MakeWithCallbackContext<TNumber>();

for (size_t i = 0; i < 100; ++i) {
auto p = ptr->LockShared();
if (p) {
++p->x;
}
}

Cerr << "Total: " << ptr->LockShared()->x << Endl;

}

}

}
4 changes: 4 additions & 0 deletions ydb/services/persqueue_v1/actors/read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ TReadSessionActor<UseMigrationProtocol>::TReadSessionActor(
if (auto values = Request->GetStreamCtx()->GetPeerMetaValues(NYdb::YDB_APPLICATION_NAME); !values.empty()) {
UserAgent = values[0];
}
if (auto values = Request->GetStreamCtx()->GetPeerMetaValues(NYdb::YDB_SDK_BUILD_INFO_HEADER); !values.empty()) {
SdkBuildInfo = values[0];
}
}

template <bool UseMigrationProtocol>
Expand Down Expand Up @@ -895,6 +898,7 @@ void TReadSessionActor<UseMigrationProtocol>::SetupBytesReadByUserAgentCounter()
->GetSubgroup("host", "")
->GetSubgroup("protocol", protocol)
->GetSubgroup("consumer", ClientPath)
->GetSubgroup("sdk_build_info", CleanupCounterValueString(SdkBuildInfo))
->GetSubgroup("user_agent", CleanupCounterValueString(UserAgent))
->GetExpiringNamedCounter("sensor", "BytesReadByUserAgent", true);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/services/persqueue_v1/actors/read_session_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ class TReadSessionActor
const TString ClientDC;
const TInstant StartTimestamp;

TString SdkBuildInfo;
TString UserAgent = UseMigrationProtocol ? "pqv1 server" : "topic server";

TActorId SchemeCache;
Expand Down
17 changes: 10 additions & 7 deletions ydb/services/persqueue_v1/actors/write_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ TWriteSessionActor<UseMigrationProtocol>::TWriteSessionActor(
if (auto values = Request->GetStreamCtx()->GetPeerMetaValues(NYdb::YDB_APPLICATION_NAME); !values.empty()) {
UserAgent = values[0];
}
if (auto values = Request->GetStreamCtx()->GetPeerMetaValues(NYdb::YDB_SDK_BUILD_INFO_HEADER); !values.empty()) {
SdkBuildInfo = values[0];
}
}

template<bool UseMigrationProtocol>
Expand Down Expand Up @@ -489,16 +492,16 @@ void TWriteSessionActor<UseMigrationProtocol>::InitAfterDiscovery(const TActorCo
SLITotal = NKikimr::NPQ::TMultiCounter(subGroup, Aggr, {}, {"RequestsTotal"}, true, "sensor", false);
SLIErrors = NKikimr::NPQ::TMultiCounter(subGroup, Aggr, {}, {"RequestsError"}, true, "sensor", false);
SLITotal.Inc();

}

template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::SetupBytesWrittenByUserAgentCounter() {
void TWriteSessionActor<UseMigrationProtocol>::SetupBytesWrittenByUserAgentCounter(const TString& topicPath) {
static constexpr auto protocol = UseMigrationProtocol ? "pqv1" : "topic";
BytesWrittenByUserAgent = GetServiceCounters(Counters, "pqproxy|userAgents")
->GetSubgroup("host", "")
->GetSubgroup("protocol", protocol)
->GetSubgroup("topic", FullConverter->GetFederationPath())
->GetSubgroup("topic", topicPath)
->GetSubgroup("sdk_build_info", CleanupCounterValueString(SdkBuildInfo))
->GetSubgroup("user_agent", CleanupCounterValueString(UserAgent))
->GetExpiringNamedCounter("sensor", "BytesWrittenByUserAgent", true);
}
Expand Down Expand Up @@ -533,11 +536,11 @@ void TWriteSessionActor<UseMigrationProtocol>::SetupCounters()
SessionsCreated.Inc();
SessionsActive.Inc();

SetupBytesWrittenByUserAgentCounter();
SetupBytesWrittenByUserAgentCounter(FullConverter->GetPrimaryPath());
}

template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::SetupCounters(const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId)
void TWriteSessionActor<UseMigrationProtocol>::SetupCounters(const TActorContext& ctx, const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId)
{
if (SessionsCreated) {
return;
Expand All @@ -554,7 +557,7 @@ void TWriteSessionActor<UseMigrationProtocol>::SetupCounters(const TString& clou
SessionsCreated.Inc();
SessionsActive.Inc();

SetupBytesWrittenByUserAgentCounter();
SetupBytesWrittenByUserAgentCounter(NPersQueue::GetFullTopicPath(ctx, dbPath, FullConverter->GetPrimaryPath()));
}

template<bool UseMigrationProtocol>
Expand Down Expand Up @@ -609,7 +612,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvDescribeTopicsResponse:

if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
const auto& tabletConfig = Config.GetPQTabletConfig();
SetupCounters(tabletConfig.GetYcCloudId(), tabletConfig.GetYdbDatabaseId(),
SetupCounters(ctx, tabletConfig.GetYcCloudId(), tabletConfig.GetYdbDatabaseId(),
tabletConfig.GetYdbDatabasePath(), entry.DomainInfo->IsServerless(),
tabletConfig.GetYcFolderId());
} else {
Expand Down
5 changes: 3 additions & 2 deletions ydb/services/persqueue_v1/actors/write_session_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class TWriteSessionActor
static constexpr ui32 CODEC_ID_SIZE = 1;

TString UserAgent = UseMigrationProtocol ? "pqv1 server" : "topic server";
TString SdkBuildInfo;
static constexpr auto ProtoName = UseMigrationProtocol ? "v1" : "topic";

public:
Expand Down Expand Up @@ -161,9 +162,9 @@ class TWriteSessionActor
void PrepareRequest(THolder<TEvWrite>&& ev, const TActorContext& ctx);
void SendWriteRequest(typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx);

void SetupBytesWrittenByUserAgentCounter();
void SetupBytesWrittenByUserAgentCounter(const TString& topicPath);
void SetupCounters();
void SetupCounters(const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId);
void SetupCounters(const TActorContext& ctx, const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId);

private:
void CreatePartitionWriterCache(const TActorContext& ctx);
Expand Down
Loading

0 comments on commit 2d495a5

Please sign in to comment.