Skip to content

Commit

Permalink
Merge e7e0197 into 2414fb7
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev authored Oct 14, 2024
2 parents 2414fb7 + e7e0197 commit bd7f114
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 1 deletion.
4 changes: 4 additions & 0 deletions ydb/core/persqueue/dread_cache_service/caching_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,10 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
continue; //TODO - no such chunks must be on prod
}

if (!proto.has_codec()) {
proto.set_codec(NPersQueueCommon::RAW);
}

TString sourceId;
if (!r.GetSourceId().empty()) {
sourceId = NPQ::NSourceIdEncoding::Decode(r.GetSourceId());
Expand Down
5 changes: 5 additions & 0 deletions ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2126,6 +2126,11 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
if (proto.GetChunkType() != NKikimrPQClient::TDataChunk::REGULAR) {
continue; //TODO - no such chunks must be on prod
}

if (!proto.has_codec()) {
proto.set_codec(NPersQueueCommon::RAW);
}

TString sourceId = "";
if (!r.GetSourceId().empty()) {
if (!NPQ::NSourceIdEncoding::IsValidEncoded(r.GetSourceId())) {
Expand Down
5 changes: 5 additions & 0 deletions ydb/services/persqueue_v1/actors/partition_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,11 @@ bool FillBatchedData(
hasOffset = true;

auto proto(GetDeserializedData(r.GetData()));

if (!proto.has_codec()) {
proto.set_codec(NPersQueueCommon::RAW);
}

if (proto.GetChunkType() != NKikimrPQClient::TDataChunk::REGULAR) {
continue; //TODO - no such chunks must be on prod
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/services/persqueue_v1/actors/read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,7 @@ i64 TFormedReadResponse<TServerMessage>::ApplyResponse(TServerMessage&& resp) {
return ByteSize - prev;
}


template <typename TServerMessage>
i64 TFormedReadResponse<TServerMessage>::ApplyDirectReadResponse(TEvPQProxy::TEvDirectReadResponse::TPtr& ev) {

Expand All @@ -1755,7 +1756,6 @@ i64 TFormedReadResponse<TServerMessage>::ApplyDirectReadResponse(TEvPQProxy::TEv
return diff;
}


template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadResponse::TPtr& ev, const TActorContext& ctx) {
if (!ActualPartitionActors.contains(ev->Sender)) {
Expand Down

0 comments on commit bd7f114

Please sign in to comment.