Skip to content

Commit

Permalink
Fix message cookie and add more logging (ydb-platform#5897)
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Jun 25, 2024
1 parent 323bf1b commit 1078162
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 16 deletions.
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ownerinfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace NPQ {
THolder<TEvPersQueue::TEvResponse> response = MakeHolder<TEvPersQueue::TEvResponse>();
response->Record.SetStatus(NMsgBusProxy::MSTATUS_OK);
response->Record.SetErrorCode(NPersQueue::NErrorCode::BAD_REQUEST);
response->Record.SetErrorReason("ownership session is killed by another session with id " + OwnerCookie);
response->Record.SetErrorReason("ownership session is killed by another session with id " + OwnerCookie + " partition id " + partition.OriginalPartitionId);
ctx.Send(Sender, response.Release());
}
Sender = sender;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
NKikimrPQ::TStatusResponse::TPartResult result;
result.SetPartition(Partition.InternalPartitionId);
result.SetGeneration(TabletGeneration);
result.SetCookie(++PQRBCookie);

if (DiskIsFull || WaitingForSubDomainQuota(ctx)) {
result.SetStatus(NKikimrPQ::TStatusResponse::STATUS_DISK_IS_FULL);
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c

if (!CanEnqueue()) {
ReplyError(ctx, ev->Get()->Cookie, InactivePartitionErrorCode,
TStringBuilder() << "Write to inactive partition");
TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId);
return;
}

Expand Down Expand Up @@ -884,7 +884,7 @@ void TPartition::CancelOneWriteOnWrite(const TActorContext& ctx,
TPartition::EProcessResult TPartition::PreProcessRequest(TRegisterMessageGroupMsg& msg) {
if (!CanWrite()) {
ScheduleReplyError(msg.Cookie, InactivePartitionErrorCode,
TStringBuilder() << "Write to inactive partition");
TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId);
return EProcessResult::ContinueDrop;
}
if (DiskIsFull) {
Expand Down Expand Up @@ -916,7 +916,7 @@ void TPartition::ExecRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& p
TPartition::EProcessResult TPartition::PreProcessRequest(TDeregisterMessageGroupMsg& msg) {
if (!CanWrite()) {
ScheduleReplyError(msg.Cookie, InactivePartitionErrorCode,
TStringBuilder() << "Write to inactive partition");
TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId);
return EProcessResult::ContinueDrop;
}
if (DiskIsFull) {
Expand All @@ -940,7 +940,7 @@ void TPartition::ExecRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters&
TPartition::EProcessResult TPartition::PreProcessRequest(TSplitMessageGroupMsg& msg) {
if (!CanWrite()) {
ScheduleReplyError(msg.Cookie, InactivePartitionErrorCode,
TStringBuilder() << "Write to inactive partition");
TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId);
return EProcessResult::ContinueDrop;
}
if (DiskIsFull) {
Expand Down Expand Up @@ -985,7 +985,7 @@ void TPartition::ExecRequest(TSplitMessageGroupMsg& msg, ProcessParameters& para
TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) {
if (!CanWrite()) {
ScheduleReplyError(p.Cookie, InactivePartitionErrorCode,
TStringBuilder() << "Write to inactive partition");
TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId);
return EProcessResult::ContinueDrop;
}
if (DiskIsFull) {
Expand All @@ -1004,7 +1004,7 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) {
bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request) {
if (!CanWrite()) {
ScheduleReplyError(p.Cookie, InactivePartitionErrorCode,
TStringBuilder() << "Write to inactive partition");
TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId);
return false;
}
if (DiskIsFull) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/writer/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
}

void Disconnected(EErrorCode errorCode) {
Send(Client, new TEvPartitionWriter::TEvDisconnected());
BecomeZombie(errorCode, "Disconnected");
Send(Client, new TEvPartitionWriter::TEvDisconnected(errorCode));
}

/// GetWriteId
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/persqueue/writer/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ struct TEvPartitionWriter {
};

struct TEvDisconnected: public TEventLocal<TEvDisconnected, EvDisconnected> {
TEvDisconnected(TEvWriteResponse::EErrorCode errorCode)
: ErrorCode(errorCode) {
}

const TEvWriteResponse::EErrorCode ErrorCode;
};

struct TEvTxWriteRequest : public TEventLocal<TEvTxWriteRequest, EvTxWriteRequest> {
Expand Down
16 changes: 8 additions & 8 deletions ydb/services/persqueue_v1/actors/write_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1034,10 +1034,6 @@ void TWriteSessionActor<UseMigrationProtocol>::ProcessWriteResponse(

template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev, const TActorContext& ctx) {
if (State != ES_INITED) {
return CloseSession("got write response but not wait for it", PersQueue::ErrorCode::ERROR, ctx);
}

const auto& result = *ev->Get();
if (!result.IsSuccess()) {
const auto& record = result.Record;
Expand All @@ -1048,9 +1044,12 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
}
}

if (State != ES_INITED) {
return CloseSession(TStringBuilder() << "got write response but not wait for it (" << static_cast<int>(State) << ")", PersQueue::ErrorCode::BAD_REQUEST, ctx);
}

if (AcceptedRequests.empty()) {
CloseSession("got too many replies from server, internal error", PersQueue::ErrorCode::ERROR, ctx);
return;
return CloseSession("got too many replies from server, internal error", PersQueue::ErrorCode::ERROR, ctx);
}

const auto& writeRequest = AcceptedRequests.front();
Expand All @@ -1064,8 +1063,9 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
}

template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::TEvDisconnected::TPtr&, const TActorContext& ctx) {
CloseSession("pipe to partition's tablet is dead", PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED, ctx);
void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::TEvDisconnected::TPtr& ev, const TActorContext& ctx) {
CloseSession(TStringBuilder() << "pipe to partition's " << Partition << " tablet is dead #" << static_cast<int>(ev->Get()->ErrorCode),
PersQueue::ErrorCode::TABLET_PIPE_DISCONNECTED, ctx);
}

template<bool UseMigrationProtocol>
Expand Down

0 comments on commit 1078162

Please sign in to comment.