Skip to content

Commit

Permalink
Merge ae85998 into 1738d8c
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev authored Oct 10, 2024
2 parents 1738d8c + ae85998 commit 3b41317
Showing 1 changed file with 27 additions and 4 deletions.
31 changes: 27 additions & 4 deletions ydb/services/persqueue_v1/actors/read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1720,16 +1720,37 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvRead::TPtr&
ProcessReads(ctx);
}


template <typename TServerMessage>
i64 TFormedReadResponse<TServerMessage>::ApplyResponse(TServerMessage&& resp) {
constexpr bool UseMigrationProtocol = std::is_same_v<TServerMessage, PersQueue::V1::MigrationStreamingReadServerMessage>;

if constexpr (UseMigrationProtocol) {
Y_ABORT_UNLESS(resp.data_batch().partition_data_size() == 1);
Response.mutable_data_batch()->add_partition_data()->Swap(resp.mutable_data_batch()->mutable_partition_data(0));
auto* partition_data = resp.mutable_data_batch()->mutable_partition_data(0);
Y_ABORT_UNLESS(partition_data != nullptr && partition_data->batches_size() > 0);

// Проходим по всем батчам и устанавливаем codec в 0, если он равен -1
for (auto& batch : *partition_data->mutable_batches()) {
for (auto& message_data : *batch.mutable_message_data()) {
if (message_data.codec() == Ydb::PersQueue::V1::CODEC_UNSPECIFIED) {
message_data.set_codec(Ydb::PersQueue::V1::CODEC_RAW);
}
}
}

Response.mutable_data_batch()->add_partition_data()->Swap(partition_data);
} else {
Y_ABORT_UNLESS(resp.read_response().partition_data_size() == 1);
Response.mutable_read_response()->add_partition_data()->Swap(resp.mutable_read_response()->mutable_partition_data(0));
auto* partition_data = resp.mutable_read_response()->mutable_partition_data(0);
Y_ABORT_UNLESS(partition_data != nullptr && partition_data->batches_size() > 0);

// Проходим по всем батчам и устанавливаем codec в 0, если он равен -1
for (auto& batch : *partition_data->mutable_batches()) {
if (batch.codec() == Ydb::Topic::CODEC_UNSPECIFIED) {
batch.set_codec(Ydb::Topic::CODEC_RAW);
}
}

Response.mutable_read_response()->add_partition_data()->Swap(partition_data);
}

Response.set_status(Ydb::StatusIds::SUCCESS);
Expand All @@ -1739,6 +1760,8 @@ i64 TFormedReadResponse<TServerMessage>::ApplyResponse(TServerMessage&& resp) {
return ByteSize - prev;
}



template <typename TServerMessage>
i64 TFormedReadResponse<TServerMessage>::ApplyDirectReadResponse(TEvPQProxy::TEvDirectReadResponse::TPtr& ev) {

Expand Down

0 comments on commit 3b41317

Please sign in to comment.