From a2023e50d7e72defd56d43b56c7ff05e84b9abc1 Mon Sep 17 00:00:00 2001 From: Alek5andr-Kotov Date: Wed, 3 Jul 2024 22:33:19 +0300 Subject: [PATCH] LongTxService subscription id (#6250) --- .../kqp/executer_actor/kqp_data_executer.cpp | 4 +- .../kqp/session_actor/kqp_session_actor.cpp | 5 +- ydb/core/persqueue/events/internal.h | 5 +- ydb/core/persqueue/partition.cpp | 4 +- ydb/core/persqueue/partition_id.h | 6 +- ydb/core/persqueue/pq_impl.cpp | 38 ++++---- ydb/core/persqueue/pq_impl.h | 8 +- ydb/core/persqueue/transaction.cpp | 6 +- ydb/core/persqueue/transaction.h | 2 +- ydb/core/persqueue/ut/internals_ut.cpp | 2 +- ydb/core/persqueue/ut/partition_ut.cpp | 10 +- ydb/core/persqueue/ut/pqtablet_ut.cpp | 18 ++-- ydb/core/persqueue/write_id.cpp | 91 +++++++++++++++++++ ydb/core/persqueue/write_id.h | 61 +++++++++++++ ydb/core/persqueue/writer/writer.cpp | 14 +-- ydb/core/persqueue/writer/writer.h | 7 +- ydb/core/persqueue/ya.make | 1 + ydb/core/protos/kqp.proto | 7 +- ydb/core/protos/msgbus_pq.proto | 2 +- ydb/core/protos/pqconfig.proto | 11 ++- .../client/ydb_topic/ut/topic_to_table_ut.cpp | 26 +++--- ydb/services/persqueue_v1/ut/kqp_mock.cpp | 4 +- .../partition_writer_cache_actor_fixture.cpp | 4 +- 23 files changed, 256 insertions(+), 80 deletions(-) create mode 100644 ydb/core/persqueue/write_id.cpp create mode 100644 ydb/core/persqueue/write_id.h diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index f35d3f298c14..9bc57b456d02 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2594,7 +2594,9 @@ class TKqpDataExecuter : public TKqpExecuterBase(); if (writeId.Defined()) { - transaction.SetWriteId(*writeId); + auto* w = transaction.MutableWriteId(); + w->SetNodeId(SelfId().NodeId()); + w->SetKeyId(*writeId); } transaction.SetImmediate(ImmediateTx); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 318ea251c2e2..a7d9bce31447 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1723,7 +1723,10 @@ class TKqpSessionActor : public TActorBootstrapped { if (replyTopicOperations) { if (HasTopicWriteId()) { - response->MutableTopicOperations()->SetWriteId(GetTopicWriteId()); + auto* w = response->MutableTopicOperations(); + auto* writeId = w->MutableWriteId(); + writeId->SetNodeId(SelfId().NodeId()); + writeId->SetKeyId(GetTopicWriteId()); } } diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index bf0354ec4634..b588ba9c32b8 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -1158,12 +1159,12 @@ struct TEvPQ { }; struct TEvTransactionCompleted : TEventLocal { - explicit TEvTransactionCompleted(TMaybe writeId) : + explicit TEvTransactionCompleted(const TMaybe& writeId) : WriteId(writeId) { } - TMaybe WriteId; + TMaybe WriteId; }; }; diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 8141f7635d53..8a16e3f7ec04 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -3366,9 +3366,9 @@ void TPartition::ScheduleTransactionCompleted(const NKikimrPQ::TEvProposeTransac Y_ABORT_UNLESS(tx.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData); Y_ABORT_UNLESS(tx.HasData()); - TMaybe writeId; + TMaybe writeId; if (tx.GetData().HasWriteId()) { - writeId = tx.GetData().GetWriteId(); + writeId = GetWriteId(tx.GetData()); } Replies.emplace_back(Tablet, diff --git a/ydb/core/persqueue/partition_id.h b/ydb/core/persqueue/partition_id.h index 7367e66af1a1..5ef5c4fa75e2 100644 --- a/ydb/core/persqueue/partition_id.h +++ b/ydb/core/persqueue/partition_id.h @@ -1,5 +1,7 @@ #pragma once +#include + #include #include #include @@ -20,7 +22,7 @@ class TPartitionId { { } - TPartitionId(ui32 originalPartitionId, TMaybe writeId, ui32 internalPartitionId) : + TPartitionId(ui32 originalPartitionId, const TMaybe& writeId, ui32 internalPartitionId) : OriginalPartitionId(originalPartitionId), WriteId(writeId), InternalPartitionId(internalPartitionId) @@ -55,7 +57,7 @@ class TPartitionId { } ui32 OriginalPartitionId = 0; - TMaybe WriteId; + TMaybe WriteId; ui32 InternalPartitionId = 0; }; diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index d7e103a1eedb..0a7c0ff0d000 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -957,7 +957,7 @@ void TPersQueue::InitTxWrites(const NKikimrPQ::TTabletTxInfo& info, for (size_t i = 0; i != info.TxWritesSize(); ++i) { auto& txWrite = info.GetTxWrites(i); - ui64 writeId = txWrite.GetWriteId(); + const TWriteId writeId = GetWriteId(txWrite); ui32 partitionId = txWrite.GetOriginalPartitionId(); TPartitionId shadowPartitionId(partitionId, writeId, txWrite.GetInternalPartitionId()); @@ -2539,7 +2539,7 @@ const TPartitionInfo& TPersQueue::GetPartitionInfo(const NKikimrClient::TPersQue { Y_ABORT_UNLESS(req.HasWriteId()); - ui64 writeId = req.GetWriteId(); + const TWriteId writeId = GetWriteId(req); ui32 originalPartitionId = req.GetPartition(); Y_ABORT_UNLESS(TxWrites.contains(writeId) && TxWrites.at(writeId).Partitions.contains(originalPartitionId)); @@ -2624,7 +2624,7 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie, return; } - ui64 writeId = req.GetWriteId(); + const TWriteId writeId = GetWriteId(req); ui32 originalPartitionId = req.GetPartition(); if (TxWrites.contains(writeId) && TxWrites.at(writeId).Partitions.contains(originalPartitionId)) { @@ -3136,7 +3136,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc } bool TPersQueue::CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& operation, - ui64 writeId) const + const TWriteId& writeId) const { TPartitionId partitionId(operation.GetPartitionId(), writeId, @@ -3151,7 +3151,7 @@ bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBod return true; } - ui64 writeId = txBody.GetWriteId(); + const TWriteId writeId = GetWriteId(txBody); PQ_LOG_D("writeId=" << writeId); for (auto& operation : txBody.GetOperations()) { @@ -3421,18 +3421,18 @@ bool TPersQueue::CanProcessTxWrites() const return !NewSupportivePartitions.empty(); } -void TPersQueue::SubscribeWriteId(ui64 writeId, +void TPersQueue::SubscribeWriteId(const TWriteId& writeId, const TActorContext& ctx) { - ctx.Send(NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId()), - new NLongTxService::TEvLongTxService::TEvSubscribeLock(writeId, ctx.SelfID.NodeId())); + ctx.Send(NLongTxService::MakeLongTxServiceID(writeId.NodeId), + new NLongTxService::TEvLongTxService::TEvSubscribeLock(writeId.KeyId, writeId.NodeId)); } -void TPersQueue::UnsubscribeWriteId(ui64 writeId, +void TPersQueue::UnsubscribeWriteId(const TWriteId& writeId, const TActorContext& ctx) { - ctx.Send(NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId()), - new NLongTxService::TEvLongTxService::TEvUnsubscribeLock(writeId, ctx.SelfID.NodeId())); + ctx.Send(NLongTxService::MakeLongTxServiceID(writeId.NodeId), + new NLongTxService::TEvLongTxService::TEvUnsubscribeLock(writeId.KeyId, writeId.NodeId)); } void TPersQueue::CreateSupportivePartitionActors(const TActorContext& ctx) @@ -3538,7 +3538,7 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx) TabletID()); if (tx.WriteId.Defined()) { - ui64 writeId = *tx.WriteId; + const TWriteId& writeId = *tx.WriteId; Y_ABORT_UNLESS(TxWrites.contains(writeId)); TTxWriteInfo& writeInfo = TxWrites.at(writeId); writeInfo.TxId = tx.TxId; @@ -3717,7 +3717,7 @@ void TPersQueue::SaveTxWrites(NKikimrPQ::TTabletTxInfo& info) for (auto& [writeId, write] : TxWrites) { for (auto [partitionId, shadowPartitionId] : write.Partitions) { auto* txWrite = info.MutableTxWrites()->Add(); - txWrite->SetWriteId(writeId); + SetWriteId(*txWrite, writeId); txWrite->SetOriginalPartitionId(partitionId); txWrite->SetInternalPartitionId(shadowPartitionId.InternalPartitionId); } @@ -3810,7 +3810,7 @@ TMaybe TPersQueue::FindPartitionId(const NKikimrPQ::TDataTransacti ui32 partitionId = txBody.GetOperations(0).GetPartitionId(); if (txBody.HasWriteId() && hasWriteOperation(txBody)) { - ui64 writeId = txBody.GetWriteId(); + const TWriteId writeId = GetWriteId(txBody); if (!TxWrites.contains(writeId)) { PQ_LOG_D("unknown WriteId " << writeId); return Nothing(); @@ -3848,7 +3848,7 @@ void TPersQueue::SendEvTxCalcPredicateToPartitions(const TActorContext& ctx, } if (tx.WriteId.Defined()) { - ui64 writeId = *tx.WriteId; + const TWriteId& writeId = *tx.WriteId; Y_ABORT_UNLESS(TxWrites.contains(writeId)); const TTxWriteInfo& writeInfo = TxWrites.at(writeId); @@ -4521,7 +4521,7 @@ void TPersQueue::ProcessCheckPartitionStatusRequests(const TPartitionId& partiti void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext&) { auto& record = ev->Get()->Record; - ui64 writeId = record.GetLockId(); + const TWriteId writeId(record.GetLockNode(), record.GetLockId()); if (!TxWrites.contains(writeId)) { // the transaction has already been completed @@ -4559,7 +4559,7 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon { auto* event = ev->Get(); Y_ABORT_UNLESS(event->PartitionId.WriteId.Defined()); - const ui64 writeId = *event->PartitionId.WriteId; + const TWriteId& writeId = *event->PartitionId.WriteId; Y_ABORT_UNLESS(TxWrites.contains(writeId)); TTxWriteInfo& writeInfo = TxWrites.at(writeId); Y_ABORT_UNLESS(writeInfo.Partitions.contains(event->PartitionId.OriginalPartitionId)); @@ -4593,7 +4593,7 @@ void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorCo return; } - const ui64 writeId = *event->WriteId; + const TWriteId& writeId = *event->WriteId; Y_ABORT_UNLESS(TxWrites.contains(writeId)); TTxWriteInfo& writeInfo = TxWrites.at(writeId); Y_ABORT_UNLESS(writeInfo.Partitions.size() == 1); @@ -4604,7 +4604,7 @@ void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorCo void TPersQueue::BeginDeleteTx(const TDistributedTransaction& tx) { Y_ABORT_UNLESS(tx.WriteId.Defined()); - const ui64 writeId = *tx.WriteId; + const TWriteId& writeId = *tx.WriteId; if (!TxWrites.contains(writeId)) { // the transaction has already been completed return; diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 5740f101329b..a754c6828749 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -204,7 +204,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { bool Deleting = false; }; - THashMap TxWrites; + THashMap TxWrites; bool TxWritesChanged = false; ui32 NextSupportivePartitionId = 100'000; @@ -488,8 +488,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void CreateSupportivePartitionActors(const TActorContext& ctx); void CreateSupportivePartitionActor(const TPartitionId& shadowPartitionId, const TActorContext& ctx); NKikimrPQ::TPQTabletConfig MakeSupportivePartitionConfig() const; - void SubscribeWriteId(ui64 writeId, const TActorContext& ctx); - void UnsubscribeWriteId(ui64 writeId, const TActorContext& ctx); + void SubscribeWriteId(const TWriteId& writeId, const TActorContext& ctx); + void UnsubscribeWriteId(const TWriteId& writeId, const TActorContext& ctx); bool AllOriginalPartitionsInited() const; @@ -501,7 +501,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void BeginDeletePartitions(TTxWriteInfo& writeInfo); bool CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& operation, - ui64 writeId) const; + const TWriteId& writeId) const; bool CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBody) const; }; diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp index 6f4775bfda09..50fc8e21bfc6 100644 --- a/ydb/core/persqueue/transaction.cpp +++ b/ydb/core/persqueue/transaction.cpp @@ -47,7 +47,7 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction& SourceActor = ActorIdFromProto(tx.GetSourceActor()); if (tx.HasWriteId()) { - WriteId = tx.GetWriteId(); + WriteId = GetWriteId(tx); } } @@ -139,7 +139,7 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransac InitPartitions(txBody.GetOperations()); if (txBody.HasWriteId() && HasWriteOperations) { - WriteId = txBody.GetWriteId(); + WriteId = GetWriteId(txBody); } else { WriteId = Nothing(); } @@ -350,7 +350,7 @@ void TDistributedTransaction::AddCmdWriteDataTx(NKikimrPQ::TTransaction& tx) tx.SetAggrPredicate(ParticipantsDecision == NKikimrTx::TReadSetData::DECISION_COMMIT); } if (WriteId.Defined()) { - tx.SetWriteId(*WriteId); + SetWriteId(tx, *WriteId); } } diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h index 61f2658518da..151dac913233 100644 --- a/ydb/core/persqueue/transaction.h +++ b/ydb/core/persqueue/transaction.h @@ -49,7 +49,7 @@ struct TDistributedTransaction { THashSet Senders; // список отправителей TEvReadSet THashSet Receivers; // список получателей TEvReadSet TVector Operations; - TMaybe WriteId; + TMaybe WriteId; EDecision SelfDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN; EDecision ParticipantsDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN; diff --git a/ydb/core/persqueue/ut/internals_ut.cpp b/ydb/core/persqueue/ut/internals_ut.cpp index 5e6cade31d0e..14c3414e2bbb 100644 --- a/ydb/core/persqueue/ut/internals_ut.cpp +++ b/ydb/core/persqueue/ut/internals_ut.cpp @@ -300,7 +300,7 @@ Y_UNIT_TEST(StoreKeys) { TKey keyOld(TKeyPrefix::TypeData, TPartitionId{9}, 8, 7, 6, 5, false); UNIT_ASSERT_VALUES_EQUAL(keyOld.ToString(), "d0000000009_00000000000000000008_00007_0000000006_00005"); - TKey keyNew(TKeyPrefix::TypeData, TPartitionId{5, 1, 9}, 8, 7, 6, 5, false); + TKey keyNew(TKeyPrefix::TypeData, TPartitionId{5, TWriteId{0, 1}, 9}, 8, 7, 6, 5, false); UNIT_ASSERT_VALUES_EQUAL(keyNew.ToString(), "D0000000009_00000000000000000008_00007_0000000006_00005"); keyNew.SetType(TKeyPrefix::TypeInfo); diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index 5be3dcaf9bf9..8d93159cee4b 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -1028,7 +1028,7 @@ void CompareVectors(const TVector& expected, const TIterable& actual) { } void TPartitionFixture::ShadowPartitionCountersTest(bool isFirstClass) { - const TPartitionId partition{0, 1111, 123}; + const TPartitionId partition{0, TWriteId{0, 1111}, 123}; const ui64 begin = 0; const ui64 end = 10; const TString session = "session"; @@ -2320,7 +2320,7 @@ Y_UNIT_TEST_F(GetPartitionWriteInfoSuccess, TPartitionFixture) { Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetEnableQuoting(false); CreatePartition({ - .Partition=TPartitionId{2, 10, 100'001}, + .Partition=TPartitionId{2, TWriteId{0, 10}, 100'001}, // // partition configuration // @@ -2389,7 +2389,7 @@ Y_UNIT_TEST_F(GetPartitionWriteInfoSuccess, TPartitionFixture) { Y_UNIT_TEST_F(GetPartitionWriteInfoError, TPartitionFixture) { CreatePartition({ - .Partition=TPartitionId{2, 10, 100'001}, + .Partition=TPartitionId{2, TWriteId{0, 10}, 100'001}, .Begin=0, .End=10, // // partition configuration @@ -2444,7 +2444,7 @@ Y_UNIT_TEST_F(ShadowPartitionCountersFirstClass, TPartitionFixture) { } Y_UNIT_TEST_F(ShadowPartitionCountersRestore, TPartitionFixture) { - const TPartitionId partitionId{0, 1111, 123}; + const TPartitionId partitionId{0, TWriteId{0, 1111}, 123}; const ui64 begin = 0; const ui64 end = 10; const TString session = "session"; @@ -3126,7 +3126,7 @@ Y_UNIT_TEST_F(TestBatchingWithProposeConfig, TPartitionTxTestHelper) { Y_UNIT_TEST_F(GetUsedStorage, TPartitionFixture) { auto* actor = CreatePartition({ - .Partition=TPartitionId{2, 10, 100'001}, + .Partition=TPartitionId{2, TWriteId{0, 10}, 100'001}, .Begin=0, .End=10, // // partition configuration diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp index 67fb566a9bae..6159a8780b0c 100644 --- a/ydb/core/persqueue/ut/pqtablet_ut.cpp +++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp @@ -47,7 +47,7 @@ struct TProposeTransactionParams { TVector Receivers; TVector TxOps; TMaybe Configs; - TMaybe WriteId; + TMaybe WriteId; }; struct TPlanStepParams { @@ -74,7 +74,7 @@ struct TCancelTransactionProposalParams { struct TGetOwnershipRequestParams { TMaybe Partition; TMaybe MsgNo; - TMaybe WriteId; + TMaybe WriteId; TMaybe NeedSupportivePartition; TMaybe Owner; // o TMaybe Cookie; @@ -85,7 +85,7 @@ struct TWriteRequestParams { TMaybe Partition; TMaybe Owner; TMaybe MsgNo; - TMaybe WriteId; + TMaybe WriteId; TMaybe SourceId; // w TMaybe SeqNo; // w TMaybe Data; // w @@ -327,7 +327,7 @@ void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionPa body->AddReceivingShards(tabletId); } if (params.WriteId) { - body->SetWriteId(*params.WriteId); + SetWriteId(*body, *params.WriteId); } body->SetImmediate(params.Senders.empty() && params.Receivers.empty() && (partitions.size() == 1) && !params.WriteId.Defined()); } @@ -557,7 +557,7 @@ std::unique_ptr TPQTabletFixture::MakeGetOwnershipRequ request->SetMessageNo(*params.MsgNo); } if (params.WriteId.Defined()) { - request->SetWriteId(*params.WriteId); + SetWriteId(*request, *params.WriteId); } if (params.NeedSupportivePartition.Defined()) { request->SetNeedSupportivePartition(*params.NeedSupportivePartition); @@ -642,7 +642,7 @@ void TPQTabletFixture::SendWriteRequest(const TWriteRequestParams& params) request->SetMessageNo(*params.MsgNo); } if (params.WriteId.Defined()) { - request->SetWriteId(*params.WriteId); + SetWriteId(*request, *params.WriteId); } if (params.Cookie.Defined()) { request->SetCookie(*params.Cookie); @@ -1253,7 +1253,7 @@ Y_UNIT_TEST_F(ProposeTx_Unknown_WriteId, TPQTabletFixture) PQTabletPrepare({.partitions=1}, {}, *Ctx); const ui64 txId = 2; - const ui64 writeId = 3; + const TWriteId writeId(0, 3); SendProposeTransactionRequest({.TxId=txId, .TxOps={{.Partition=0, .Path="/topic"}}, @@ -1267,7 +1267,7 @@ Y_UNIT_TEST_F(ProposeTx_Unknown_Partition_2, TPQTabletFixture) PQTabletPrepare({.partitions=2}, {}, *Ctx); const ui64 txId = 2; - const ui64 writeId = 3; + const TWriteId writeId(0, 3); const ui64 cookie = 4; SendGetOwnershipRequest({.Partition=0, @@ -1289,7 +1289,7 @@ Y_UNIT_TEST_F(ProposeTx_Command_After_Propose, TPQTabletFixture) const ui32 partitionId = 0; const ui64 txId = 2; - const ui64 writeId = 3; + const TWriteId writeId(0, 3); SyncGetOwnership({.Partition=partitionId, .WriteId=writeId, diff --git a/ydb/core/persqueue/write_id.cpp b/ydb/core/persqueue/write_id.cpp new file mode 100644 index 000000000000..028359bf71b9 --- /dev/null +++ b/ydb/core/persqueue/write_id.cpp @@ -0,0 +1,91 @@ +#include "write_id.h" + +namespace NKikimr::NPQ { + +TWriteId::TWriteId(ui64 nodeId, ui64 keyId) : + NodeId(nodeId), + KeyId(keyId) +{ +} + +bool TWriteId::operator==(const TWriteId& rhs) const +{ + return std::make_tuple(NodeId, KeyId) == std::make_tuple(rhs.NodeId, rhs.KeyId); +} + +bool TWriteId::operator<(const TWriteId& rhs) const +{ + return std::make_tuple(NodeId, KeyId) < std::make_tuple(rhs.NodeId, rhs.KeyId); +} + +void TWriteId::ToStream(IOutputStream& s) const +{ + s << '{' << NodeId << ", " << KeyId << '}'; +} + +template +TWriteId GetWriteIdImpl(const T& m) +{ + const auto& writeId = m.GetWriteId(); + return {writeId.GetNodeId(), writeId.GetKeyId()}; +} + +template +void SetWriteIdImpl(T& m, const TWriteId& writeId) +{ + auto* w = m.MutableWriteId(); + w->SetNodeId(writeId.NodeId); + w->SetKeyId(writeId.KeyId); +} + +TWriteId GetWriteId(const NKikimrPQ::TTransaction& m) +{ + return GetWriteIdImpl(m); +} + +void SetWriteId(NKikimrPQ::TTransaction& m, const TWriteId& writeId) +{ + SetWriteIdImpl(m, writeId); +} + +TWriteId GetWriteId(const NKikimrPQ::TDataTransaction& m) +{ + return GetWriteIdImpl(m); +} + +void SetWriteId(NKikimrPQ::TDataTransaction& m, const TWriteId& writeId) +{ + SetWriteIdImpl(m, writeId); +} + +TWriteId GetWriteId(const NKikimrPQ::TTabletTxInfo::TTxWriteInfo& m) +{ + return GetWriteIdImpl(m); +} + +void SetWriteId(NKikimrPQ::TTabletTxInfo::TTxWriteInfo& m, const TWriteId& writeId) +{ + SetWriteIdImpl(m, writeId); +} + +TWriteId GetWriteId(const NKikimrClient::TPersQueuePartitionRequest& m) +{ + return GetWriteIdImpl(m); +} + +void SetWriteId(NKikimrClient::TPersQueuePartitionRequest& m, const NKikimr::NPQ::TWriteId& writeId) +{ + SetWriteIdImpl(m, writeId); +} + +TWriteId GetWriteId(const NKikimrKqp::TTopicOperationsResponse& m) +{ + return GetWriteIdImpl(m); +} + +void SetWriteId(NKikimrKqp::TTopicOperationsResponse& m, const TWriteId& writeId) +{ + SetWriteIdImpl(m, writeId); +} + +} diff --git a/ydb/core/persqueue/write_id.h b/ydb/core/persqueue/write_id.h new file mode 100644 index 000000000000..662790f94684 --- /dev/null +++ b/ydb/core/persqueue/write_id.h @@ -0,0 +1,61 @@ +#pragma once + +#include +#include +#include + +#include +#include +#include + +namespace NKikimr::NPQ { + +struct TWriteId { + TWriteId() = default; + TWriteId(ui64 nodeId, ui64 keyId); + + bool operator==(const TWriteId& rhs) const; + bool operator<(const TWriteId& rhs) const; + + void ToStream(IOutputStream& s) const; + + size_t GetHash() const + { + return MultiHash(NodeId, KeyId); + } + + ui64 NodeId = 0; + ui64 KeyId = 0; +}; + +inline +IOutputStream& operator<<(IOutputStream& s, const TWriteId& v) +{ + v.ToStream(s); + return s; +} + +TWriteId GetWriteId(const NKikimrPQ::TTransaction& m); +void SetWriteId(NKikimrPQ::TTransaction& m, const TWriteId& writeId); + +TWriteId GetWriteId(const NKikimrPQ::TDataTransaction& m); +void SetWriteId(NKikimrPQ::TDataTransaction& m, const TWriteId& writeId); + +TWriteId GetWriteId(const NKikimrPQ::TTabletTxInfo::TTxWriteInfo& m); +void SetWriteId(NKikimrPQ::TTabletTxInfo::TTxWriteInfo& m, const TWriteId& writeId); + +TWriteId GetWriteId(const NKikimrClient::TPersQueuePartitionRequest& m); +void SetWriteId(NKikimrClient::TPersQueuePartitionRequest& m, const TWriteId& writeId); + +TWriteId GetWriteId(const NKikimrKqp::TTopicOperationsResponse& m); +void SetWriteId(NKikimrKqp::TTopicOperationsResponse& m, const TWriteId& writeId); + +} + +template <> +struct THash { + inline size_t operator()(const NKikimr::NPQ::TWriteId& v) const + { + return v.GetHash(); + } +}; diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index 285c3918cccf..466f57b60c91 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -42,8 +42,8 @@ TString TEvPartitionWriter::TEvInitResult::TSuccess::ToString() const { auto out = TStringBuilder() << "Success {" << " OwnerCookie: " << OwnerCookie << " SourceIdInfo: " << SourceIdInfo.ShortDebugString(); - if (WriteId != INVALID_WRITE_ID) { - out << " WriteId: " << WriteId; + if (WriteId.Defined()) { + out << " WriteId: " << *WriteId; } out << " }"; return out; @@ -171,7 +171,7 @@ class TPartitionWriter: public TActorBootstrapped, private TRl BecomeZombie(EErrorCode::InternalError, "Init error"); } - void InitResult(const TString& ownerCookie, const TEvPartitionWriter::TEvInitResult::TSourceIdInfo& sourceIdInfo, ui64 writeId) { + void InitResult(const TString& ownerCookie, const TEvPartitionWriter::TEvInitResult::TSourceIdInfo& sourceIdInfo, const TMaybe& writeId) { SendInitResult(ownerCookie, sourceIdInfo, writeId); } @@ -253,7 +253,7 @@ class TPartitionWriter: public TActorBootstrapped, private TRl return InitResult("Invalid KQP session", record); } - WriteId = record.GetResponse().GetTopicOperations().GetWriteId(); + WriteId = NPQ::GetWriteId(record.GetResponse().GetTopicOperations()); LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "SessionId: " << Opts.SessionId << @@ -304,7 +304,7 @@ class TPartitionWriter: public TActorBootstrapped, private TRl void SetWriteId(NKikimrClient::TPersQueuePartitionRequest& request) { if (HasWriteId()) { - request.SetWriteId(WriteId); + NPQ::SetWriteId(request, *WriteId); } } @@ -881,7 +881,7 @@ class TPartitionWriter: public TActorBootstrapped, private TRl private: bool HasWriteId() const { - return WriteId != INVALID_WRITE_ID; + return WriteId.Defined(); } bool HasSupportivePartitionId() const { @@ -922,7 +922,7 @@ class TPartitionWriter: public TActorBootstrapped, private TRl EErrorCode ErrorCode = EErrorCode::InternalError; - ui64 WriteId = INVALID_WRITE_ID; + TMaybe WriteId; ui32 SupportivePartitionId = INVALID_PARTITION_ID; using IRetryPolicy = IRetryPolicy; diff --git a/ydb/core/persqueue/writer/writer.h b/ydb/core/persqueue/writer/writer.h index e95fc06a1477..d31b11771442 100644 --- a/ydb/core/persqueue/writer/writer.h +++ b/ydb/core/persqueue/writer/writer.h @@ -6,6 +6,7 @@ #include #include #include +#include #include @@ -13,8 +14,6 @@ namespace NKikimr::NPQ { -constexpr ui64 INVALID_WRITE_ID = Max(); - struct TEvPartitionWriter { enum EEv { EvInitResult = EventSpaceBegin(TKikimrEvents::ES_PQ_PARTITION_WRITER), @@ -36,7 +35,7 @@ struct TEvPartitionWriter { struct TSuccess { TString OwnerCookie; TSourceIdInfo SourceIdInfo; - ui64 WriteId = INVALID_WRITE_ID; + TMaybe WriteId; TString ToString() const; }; @@ -53,7 +52,7 @@ struct TEvPartitionWriter { std::variant Result; TEvInitResult(const TString& sessionId, const TString& txId, - const TString& ownerCookie, const TSourceIdInfo& sourceIdInfo, ui64 writeId) + const TString& ownerCookie, const TSourceIdInfo& sourceIdInfo, const TMaybe& writeId) : SessionId(sessionId) , TxId(txId) , Result(TSuccess{ownerCookie, sourceIdInfo, writeId}) diff --git a/ydb/core/persqueue/ya.make b/ydb/core/persqueue/ya.make index c6dc649a4b94..3d72a67cbea4 100644 --- a/ydb/core/persqueue/ya.make +++ b/ydb/core/persqueue/ya.make @@ -43,6 +43,7 @@ SRCS( write_quoter.cpp microseconds_sliding_window.cpp dread_cache_service/caching_service.cpp + write_id.cpp ) GENERATE_ENUM_SERIALIZATION(read_balancer__balancing.h) diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 6aa51c698ab8..fe0ef81a862f 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -79,7 +79,12 @@ message TTopicOperationsRequest { } message TTopicOperationsResponse { - optional int64 WriteId = 1; + message TWriteId { + optional uint64 NodeId = 1; + optional uint64 KeyId = 2; + } + + optional TWriteId WriteId = 1; }; message TQueryRequest { diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto index f6034600b8ee..88112500d1e0 100644 --- a/ydb/core/protos/msgbus_pq.proto +++ b/ydb/core/protos/msgbus_pq.proto @@ -144,7 +144,7 @@ message TPersQueuePartitionRequest { optional string OwnerCookie = 3; //mandatory for write optional int64 MessageNo = 12; //mandatory for write optional int64 CmdWriteOffset = 13; //optional - optional int64 WriteId = 23; + optional NKikimrPQ.TWriteId WriteId = 23; optional bool NeedSupportivePartition = 28; repeated TCmdWrite CmdWrite = 4; diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index ce82b799b4d2..9c7be1e9d6f1 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -946,6 +946,11 @@ message TPartitionOperation { optional uint32 SupportivePartition = 6; }; +message TWriteId { + optional uint64 NodeId = 1; + optional fixed64 KeyId = 2; +}; + message TDataTransaction { enum ELocksOp { Unspecified = 0; @@ -959,7 +964,7 @@ message TDataTransaction { repeated uint64 SendingShards = 3; repeated uint64 ReceivingShards = 4; optional bool Immediate = 5; - optional fixed64 WriteId = 6; + optional TWriteId WriteId = 6; } message TConfigTransaction { @@ -1116,12 +1121,12 @@ message TTransaction { // optional NActorsProto.TActorId SourceActor = 14; - optional uint64 WriteId = 15; + optional TWriteId WriteId = 15; }; message TTabletTxInfo { message TTxWriteInfo { - optional uint64 WriteId = 1; + optional TWriteId WriteId = 1; optional uint32 OriginalPartitionId = 2; optional uint32 InternalPartitionId = 3; }; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp index b60de8347a21..d63e47ca3def 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp @@ -162,15 +162,15 @@ class TFixture : public NUnitTest::TBaseFixture { ui32 partition); TVector GetTabletKeys(const TActorId& actorId, ui64 tabletId); - ui64 GetTransactionWriteId(const TActorId& actorId, - ui64 tabletId); + NPQ::TWriteId GetTransactionWriteId(const TActorId& actorId, + ui64 tabletId); void SendLongTxLockStatus(const TActorId& actorId, ui64 tabletId, - ui64 writeId, + const NPQ::TWriteId& writeId, NKikimrLongTxService::TEvLockStatus::EStatus status); void WaitForTheTabletToDeleteTheWriteInfo(const TActorId& actorId, ui64 tabletId, - ui64 writeId); + const NPQ::TWriteId& writeId); std::unique_ptr Setup; std::unique_ptr Driver; @@ -1244,8 +1244,8 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_10, TFixture) } } -ui64 TFixture::GetTransactionWriteId(const TActorId& actorId, - ui64 tabletId) +NPQ::TWriteId TFixture::GetTransactionWriteId(const TActorId& actorId, + ui64 tabletId) { using TEvKeyValue = NKikimr::TEvKeyValue; @@ -1272,22 +1272,24 @@ ui64 TFixture::GetTransactionWriteId(const TActorId& actorId, auto& writeInfo = info.GetTxWrites(0); UNIT_ASSERT(writeInfo.HasWriteId()); - return writeInfo.GetWriteId(); + return NPQ::GetWriteId(writeInfo); } void TFixture::SendLongTxLockStatus(const TActorId& actorId, ui64 tabletId, - ui64 writeId, + const NPQ::TWriteId& writeId, NKikimrLongTxService::TEvLockStatus::EStatus status) { - auto event = std::make_unique(writeId, 0, status); + auto event = + std::make_unique(writeId.KeyId, writeId.NodeId, + status); auto& runtime = Setup->GetRuntime(); runtime.SendToPipe(tabletId, actorId, event.release()); } void TFixture::WaitForTheTabletToDeleteTheWriteInfo(const TActorId& actorId, ui64 tabletId, - ui64 writeId) + const NPQ::TWriteId& writeId) { while (true) { using TEvKeyValue = NKikimr::TEvKeyValue; @@ -1315,7 +1317,7 @@ void TFixture::WaitForTheTabletToDeleteTheWriteInfo(const TActorId& actorId, for (size_t i = 0; i < info.TxWritesSize(); ++i) { auto& writeInfo = info.GetTxWrites(i); UNIT_ASSERT(writeInfo.HasWriteId()); - if (writeInfo.GetWriteId() == writeId) { + if (NPQ::GetWriteId(writeInfo) == writeId) { found = true; break; } @@ -1344,7 +1346,7 @@ void TFixture::DeleteSupportivePartition(const TString& topicName, ui32 partitio auto& runtime = Setup->GetRuntime(); TActorId edge = runtime.AllocateEdgeActor(); ui64 tabletId = GetTopicTabletId(edge, "/Root/" + topicName, partition); - ui64 writeId = GetTransactionWriteId(edge, tabletId); + NPQ::TWriteId writeId = GetTransactionWriteId(edge, tabletId); SendLongTxLockStatus(edge, tabletId, writeId, NKikimrLongTxService::TEvLockStatus::STATUS_NOT_FOUND); diff --git a/ydb/services/persqueue_v1/ut/kqp_mock.cpp b/ydb/services/persqueue_v1/ut/kqp_mock.cpp index 39291a9119a7..949f08bed8fb 100644 --- a/ydb/services/persqueue_v1/ut/kqp_mock.cpp +++ b/ydb/services/persqueue_v1/ut/kqp_mock.cpp @@ -1,4 +1,5 @@ #include "kqp_mock.h" +#include namespace NKikimr::NPersQueueTests { @@ -24,7 +25,8 @@ void TKqpProxyServiceMock::Handle(NKqp::TEvKqp::TEvQueryRequest::TPtr& ev, const auto queryResponse = std::make_unique(); auto* response = queryResponse->Record.GetRef().MutableResponse(); - response->MutableTopicOperations()->SetWriteId(NextWriteId++); + NPQ::TWriteId writeId(0, NextWriteId++); + NPQ::SetWriteId(*response->MutableTopicOperations(), writeId); ctx.Send(ev->Sender, std::move(queryResponse)); } diff --git a/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp b/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp index 486e406fd988..d4e973a3a3c9 100644 --- a/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp +++ b/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp @@ -3,6 +3,7 @@ #include #include +#include namespace NKikimr::NPersQueueTests { @@ -117,7 +118,8 @@ void TPartitionWriterCacheActorFixture::SetupEventObserver() // auto response = std::make_unique(); response->Record.GetRef().SetYdbStatus(Ydb::StatusIds::SUCCESS); - response->Record.GetRef().MutableResponse()->MutableTopicOperations()->SetWriteId(NextWriteId++); + NPQ::SetWriteId(*response->Record.GetRef().MutableResponse()->MutableTopicOperations(), + NPQ::TWriteId(0, NextWriteId++)); Ctx->Runtime->Send(ev->Sender, ev->Recipient, response.release(), 0, true); return TTestActorRuntime::EEventAction::DROP; }