Skip to content

Commit

Permalink
LongTxService subscription id (#6250)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov authored Jul 3, 2024
1 parent 75959d3 commit a2023e5
Show file tree
Hide file tree
Showing 23 changed files with 256 additions and 80 deletions.
4 changes: 3 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2594,7 +2594,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
auto ev = std::make_unique<TEvPersQueue::TEvProposeTransaction>();

if (writeId.Defined()) {
transaction.SetWriteId(*writeId);
auto* w = transaction.MutableWriteId();
w->SetNodeId(SelfId().NodeId());
w->SetKeyId(*writeId);
}
transaction.SetImmediate(ImmediateTx);

Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1723,7 +1723,10 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

if (replyTopicOperations) {
if (HasTopicWriteId()) {
response->MutableTopicOperations()->SetWriteId(GetTopicWriteId());
auto* w = response->MutableTopicOperations();
auto* writeId = w->MutableWriteId();
writeId->SetNodeId(SelfId().NodeId());
writeId->SetKeyId(GetTopicWriteId());
}
}

Expand Down
5 changes: 3 additions & 2 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <ydb/core/persqueue/key.h>
#include <ydb/core/persqueue/sourceid_info.h>
#include <ydb/core/persqueue/metering_sink.h>
#include <ydb/core/persqueue/write_id.h>
#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/library/persqueue/topic_parser/topic_parser.h>

Expand Down Expand Up @@ -1158,12 +1159,12 @@ struct TEvPQ {
};

struct TEvTransactionCompleted : TEventLocal<TEvTransactionCompleted, EvTransactionCompleted> {
explicit TEvTransactionCompleted(TMaybe<ui64> writeId) :
explicit TEvTransactionCompleted(const TMaybe<NPQ::TWriteId>& writeId) :
WriteId(writeId)
{
}

TMaybe<ui64> WriteId;
TMaybe<NPQ::TWriteId> WriteId;
};
};

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3366,9 +3366,9 @@ void TPartition::ScheduleTransactionCompleted(const NKikimrPQ::TEvProposeTransac
Y_ABORT_UNLESS(tx.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
Y_ABORT_UNLESS(tx.HasData());

TMaybe<ui64> writeId;
TMaybe<TWriteId> writeId;
if (tx.GetData().HasWriteId()) {
writeId = tx.GetData().GetWriteId();
writeId = GetWriteId(tx.GetData());
}

Replies.emplace_back(Tablet,
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/persqueue/partition_id.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <ydb/core/persqueue/write_id.h>

#include <util/generic/maybe.h>
#include <util/stream/output.h>
#include <util/system/types.h>
Expand All @@ -20,7 +22,7 @@ class TPartitionId {
{
}

TPartitionId(ui32 originalPartitionId, TMaybe<ui64> writeId, ui32 internalPartitionId) :
TPartitionId(ui32 originalPartitionId, const TMaybe<TWriteId>& writeId, ui32 internalPartitionId) :
OriginalPartitionId(originalPartitionId),
WriteId(writeId),
InternalPartitionId(internalPartitionId)
Expand Down Expand Up @@ -55,7 +57,7 @@ class TPartitionId {
}

ui32 OriginalPartitionId = 0;
TMaybe<ui64> WriteId;
TMaybe<TWriteId> WriteId;
ui32 InternalPartitionId = 0;
};

Expand Down
38 changes: 19 additions & 19 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,7 @@ void TPersQueue::InitTxWrites(const NKikimrPQ::TTabletTxInfo& info,

for (size_t i = 0; i != info.TxWritesSize(); ++i) {
auto& txWrite = info.GetTxWrites(i);
ui64 writeId = txWrite.GetWriteId();
const TWriteId writeId = GetWriteId(txWrite);
ui32 partitionId = txWrite.GetOriginalPartitionId();
TPartitionId shadowPartitionId(partitionId, writeId, txWrite.GetInternalPartitionId());

Expand Down Expand Up @@ -2539,7 +2539,7 @@ const TPartitionInfo& TPersQueue::GetPartitionInfo(const NKikimrClient::TPersQue
{
Y_ABORT_UNLESS(req.HasWriteId());

ui64 writeId = req.GetWriteId();
const TWriteId writeId = GetWriteId(req);
ui32 originalPartitionId = req.GetPartition();

Y_ABORT_UNLESS(TxWrites.contains(writeId) && TxWrites.at(writeId).Partitions.contains(originalPartitionId));
Expand Down Expand Up @@ -2624,7 +2624,7 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie,
return;
}

ui64 writeId = req.GetWriteId();
const TWriteId writeId = GetWriteId(req);
ui32 originalPartitionId = req.GetPartition();

if (TxWrites.contains(writeId) && TxWrites.at(writeId).Partitions.contains(originalPartitionId)) {
Expand Down Expand Up @@ -3136,7 +3136,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
}

bool TPersQueue::CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& operation,
ui64 writeId) const
const TWriteId& writeId) const
{
TPartitionId partitionId(operation.GetPartitionId(),
writeId,
Expand All @@ -3151,7 +3151,7 @@ bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBod
return true;
}

ui64 writeId = txBody.GetWriteId();
const TWriteId writeId = GetWriteId(txBody);
PQ_LOG_D("writeId=" << writeId);

for (auto& operation : txBody.GetOperations()) {
Expand Down Expand Up @@ -3421,18 +3421,18 @@ bool TPersQueue::CanProcessTxWrites() const
return !NewSupportivePartitions.empty();
}

void TPersQueue::SubscribeWriteId(ui64 writeId,
void TPersQueue::SubscribeWriteId(const TWriteId& writeId,
const TActorContext& ctx)
{
ctx.Send(NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId()),
new NLongTxService::TEvLongTxService::TEvSubscribeLock(writeId, ctx.SelfID.NodeId()));
ctx.Send(NLongTxService::MakeLongTxServiceID(writeId.NodeId),
new NLongTxService::TEvLongTxService::TEvSubscribeLock(writeId.KeyId, writeId.NodeId));
}

void TPersQueue::UnsubscribeWriteId(ui64 writeId,
void TPersQueue::UnsubscribeWriteId(const TWriteId& writeId,
const TActorContext& ctx)
{
ctx.Send(NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId()),
new NLongTxService::TEvLongTxService::TEvUnsubscribeLock(writeId, ctx.SelfID.NodeId()));
ctx.Send(NLongTxService::MakeLongTxServiceID(writeId.NodeId),
new NLongTxService::TEvLongTxService::TEvUnsubscribeLock(writeId.KeyId, writeId.NodeId));
}

void TPersQueue::CreateSupportivePartitionActors(const TActorContext& ctx)
Expand Down Expand Up @@ -3538,7 +3538,7 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)
TabletID());

if (tx.WriteId.Defined()) {
ui64 writeId = *tx.WriteId;
const TWriteId& writeId = *tx.WriteId;
Y_ABORT_UNLESS(TxWrites.contains(writeId));
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
writeInfo.TxId = tx.TxId;
Expand Down Expand Up @@ -3717,7 +3717,7 @@ void TPersQueue::SaveTxWrites(NKikimrPQ::TTabletTxInfo& info)
for (auto& [writeId, write] : TxWrites) {
for (auto [partitionId, shadowPartitionId] : write.Partitions) {
auto* txWrite = info.MutableTxWrites()->Add();
txWrite->SetWriteId(writeId);
SetWriteId(*txWrite, writeId);
txWrite->SetOriginalPartitionId(partitionId);
txWrite->SetInternalPartitionId(shadowPartitionId.InternalPartitionId);
}
Expand Down Expand Up @@ -3810,7 +3810,7 @@ TMaybe<TPartitionId> TPersQueue::FindPartitionId(const NKikimrPQ::TDataTransacti
ui32 partitionId = txBody.GetOperations(0).GetPartitionId();

if (txBody.HasWriteId() && hasWriteOperation(txBody)) {
ui64 writeId = txBody.GetWriteId();
const TWriteId writeId = GetWriteId(txBody);
if (!TxWrites.contains(writeId)) {
PQ_LOG_D("unknown WriteId " << writeId);
return Nothing();
Expand Down Expand Up @@ -3848,7 +3848,7 @@ void TPersQueue::SendEvTxCalcPredicateToPartitions(const TActorContext& ctx,
}

if (tx.WriteId.Defined()) {
ui64 writeId = *tx.WriteId;
const TWriteId& writeId = *tx.WriteId;
Y_ABORT_UNLESS(TxWrites.contains(writeId));
const TTxWriteInfo& writeInfo = TxWrites.at(writeId);

Expand Down Expand Up @@ -4521,7 +4521,7 @@ void TPersQueue::ProcessCheckPartitionStatusRequests(const TPartitionId& partiti
void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext&)
{
auto& record = ev->Get()->Record;
ui64 writeId = record.GetLockId();
const TWriteId writeId(record.GetLockNode(), record.GetLockId());

if (!TxWrites.contains(writeId)) {
// the transaction has already been completed
Expand Down Expand Up @@ -4559,7 +4559,7 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon
{
auto* event = ev->Get();
Y_ABORT_UNLESS(event->PartitionId.WriteId.Defined());
const ui64 writeId = *event->PartitionId.WriteId;
const TWriteId& writeId = *event->PartitionId.WriteId;
Y_ABORT_UNLESS(TxWrites.contains(writeId));
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
Y_ABORT_UNLESS(writeInfo.Partitions.contains(event->PartitionId.OriginalPartitionId));
Expand Down Expand Up @@ -4593,7 +4593,7 @@ void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorCo
return;
}

const ui64 writeId = *event->WriteId;
const TWriteId& writeId = *event->WriteId;
Y_ABORT_UNLESS(TxWrites.contains(writeId));
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
Y_ABORT_UNLESS(writeInfo.Partitions.size() == 1);
Expand All @@ -4604,7 +4604,7 @@ void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorCo
void TPersQueue::BeginDeleteTx(const TDistributedTransaction& tx)
{
Y_ABORT_UNLESS(tx.WriteId.Defined());
const ui64 writeId = *tx.WriteId;
const TWriteId& writeId = *tx.WriteId;
if (!TxWrites.contains(writeId)) {
// the transaction has already been completed
return;
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/persqueue/pq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
bool Deleting = false;
};

THashMap<ui64, TTxWriteInfo> TxWrites;
THashMap<TWriteId, TTxWriteInfo> TxWrites;
bool TxWritesChanged = false;
ui32 NextSupportivePartitionId = 100'000;

Expand Down Expand Up @@ -488,8 +488,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
void CreateSupportivePartitionActors(const TActorContext& ctx);
void CreateSupportivePartitionActor(const TPartitionId& shadowPartitionId, const TActorContext& ctx);
NKikimrPQ::TPQTabletConfig MakeSupportivePartitionConfig() const;
void SubscribeWriteId(ui64 writeId, const TActorContext& ctx);
void UnsubscribeWriteId(ui64 writeId, const TActorContext& ctx);
void SubscribeWriteId(const TWriteId& writeId, const TActorContext& ctx);
void UnsubscribeWriteId(const TWriteId& writeId, const TActorContext& ctx);

bool AllOriginalPartitionsInited() const;

Expand All @@ -501,7 +501,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
void BeginDeletePartitions(TTxWriteInfo& writeInfo);

bool CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& operation,
ui64 writeId) const;
const TWriteId& writeId) const;
bool CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBody) const;
};

Expand Down
6 changes: 3 additions & 3 deletions ydb/core/persqueue/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction&
SourceActor = ActorIdFromProto(tx.GetSourceActor());

if (tx.HasWriteId()) {
WriteId = tx.GetWriteId();
WriteId = GetWriteId(tx);
}
}

Expand Down Expand Up @@ -139,7 +139,7 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransac
InitPartitions(txBody.GetOperations());

if (txBody.HasWriteId() && HasWriteOperations) {
WriteId = txBody.GetWriteId();
WriteId = GetWriteId(txBody);
} else {
WriteId = Nothing();
}
Expand Down Expand Up @@ -350,7 +350,7 @@ void TDistributedTransaction::AddCmdWriteDataTx(NKikimrPQ::TTransaction& tx)
tx.SetAggrPredicate(ParticipantsDecision == NKikimrTx::TReadSetData::DECISION_COMMIT);
}
if (WriteId.Defined()) {
tx.SetWriteId(*WriteId);
SetWriteId(tx, *WriteId);
}
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct TDistributedTransaction {
THashSet<ui64> Senders; // список отправителей TEvReadSet
THashSet<ui64> Receivers; // список получателей TEvReadSet
TVector<NKikimrPQ::TPartitionOperation> Operations;
TMaybe<ui64> WriteId;
TMaybe<TWriteId> WriteId;

EDecision SelfDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN;
EDecision ParticipantsDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ut/internals_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ Y_UNIT_TEST(StoreKeys) {
TKey keyOld(TKeyPrefix::TypeData, TPartitionId{9}, 8, 7, 6, 5, false);
UNIT_ASSERT_VALUES_EQUAL(keyOld.ToString(), "d0000000009_00000000000000000008_00007_0000000006_00005");

TKey keyNew(TKeyPrefix::TypeData, TPartitionId{5, 1, 9}, 8, 7, 6, 5, false);
TKey keyNew(TKeyPrefix::TypeData, TPartitionId{5, TWriteId{0, 1}, 9}, 8, 7, 6, 5, false);
UNIT_ASSERT_VALUES_EQUAL(keyNew.ToString(), "D0000000009_00000000000000000008_00007_0000000006_00005");

keyNew.SetType(TKeyPrefix::TypeInfo);
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/persqueue/ut/partition_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ void CompareVectors(const TVector<ui64>& expected, const TIterable& actual) {
}

void TPartitionFixture::ShadowPartitionCountersTest(bool isFirstClass) {
const TPartitionId partition{0, 1111, 123};
const TPartitionId partition{0, TWriteId{0, 1111}, 123};
const ui64 begin = 0;
const ui64 end = 10;
const TString session = "session";
Expand Down Expand Up @@ -2320,7 +2320,7 @@ Y_UNIT_TEST_F(GetPartitionWriteInfoSuccess, TPartitionFixture) {
Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetEnableQuoting(false);

CreatePartition({
.Partition=TPartitionId{2, 10, 100'001},
.Partition=TPartitionId{2, TWriteId{0, 10}, 100'001},
//
// partition configuration
//
Expand Down Expand Up @@ -2389,7 +2389,7 @@ Y_UNIT_TEST_F(GetPartitionWriteInfoSuccess, TPartitionFixture) {

Y_UNIT_TEST_F(GetPartitionWriteInfoError, TPartitionFixture) {
CreatePartition({
.Partition=TPartitionId{2, 10, 100'001},
.Partition=TPartitionId{2, TWriteId{0, 10}, 100'001},
.Begin=0, .End=10,
//
// partition configuration
Expand Down Expand Up @@ -2444,7 +2444,7 @@ Y_UNIT_TEST_F(ShadowPartitionCountersFirstClass, TPartitionFixture) {
}

Y_UNIT_TEST_F(ShadowPartitionCountersRestore, TPartitionFixture) {
const TPartitionId partitionId{0, 1111, 123};
const TPartitionId partitionId{0, TWriteId{0, 1111}, 123};
const ui64 begin = 0;
const ui64 end = 10;
const TString session = "session";
Expand Down Expand Up @@ -3126,7 +3126,7 @@ Y_UNIT_TEST_F(TestBatchingWithProposeConfig, TPartitionTxTestHelper) {

Y_UNIT_TEST_F(GetUsedStorage, TPartitionFixture) {
auto* actor = CreatePartition({
.Partition=TPartitionId{2, 10, 100'001},
.Partition=TPartitionId{2, TWriteId{0, 10}, 100'001},
.Begin=0, .End=10,
//
// partition configuration
Expand Down
Loading

0 comments on commit a2023e5

Please sign in to comment.