Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deduplication options checks (#2254) #2416

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions ydb/core/persqueue/writer/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace NKikimr::NPQ {
#define INFO(message) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message);
#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message);

static const ui64 WRITE_BLOCK_SIZE = 4_KB;
static const ui64 WRITE_BLOCK_SIZE = 4_KB;

TString TEvPartitionWriter::TEvInitResult::TSuccess::ToString() const {
auto out = TStringBuilder() << "Success {"
Expand Down Expand Up @@ -106,7 +106,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
using EErrorCode = TEvPartitionWriter::TEvWriteResponse::EErrorCode;

static constexpr size_t MAX_QUOTA_INFLIGHT = 3;

static void FillHeader(NKikimrClient::TPersQueuePartitionRequest& request,
ui32 partitionId, const TActorId& pipeClient)
{
Expand Down Expand Up @@ -272,12 +272,9 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
void GetOwnership() {
auto ev = MakeRequest(PartitionId, PipeClient);

auto& cmd = *ev->Record.MutablePartitionRequest()->MutableCmdGetOwnership();
if (Opts.UseDeduplication) {
cmd.SetOwner(SourceId);
} else {
cmd.SetOwner(CreateGuidAsString());
}
auto& request = *ev->Record.MutablePartitionRequest();
auto& cmd = *request.MutableCmdGetOwnership();
cmd.SetOwner(SourceId);
cmd.SetForce(true);

NTabletPipe::SendData(SelfId(), PipeClient, ev.Release());
Expand Down Expand Up @@ -741,15 +738,14 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
PendingQuota.clear();

ProcessQuota();

break;

case EWakeupTag::RlNoResource:
// Re-requesting the quota. We do this until we get a quota.
// Re-requesting the quota. We do this until we get a quota.
// We do not request a quota with a long waiting time because the writer may already be a destroyer, and the quota will still be waiting to be received.
RequestDataQuota(PendingQuotaAmount, ctx);
break;

default:
Y_VERIFY_DEBUG_S(false, "Unsupported tag: " << static_cast<ui64>(tag));
}
Expand All @@ -770,7 +766,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
, TabletId(tabletId)
, PartitionId(partitionId)
, ExpectedGeneration(opts.ExpectedGeneration)
, SourceId(opts.SourceId)
, SourceId(opts.UseDeduplication ? opts.SourceId : CreateGuidAsString())
, Opts(opts)
{
if (Opts.MeteringMode) {
Expand Down Expand Up @@ -856,7 +852,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
IActor* CreatePartitionWriter(const TActorId& client,
// const NKikimrSchemeOp::TPersQueueGroupDescription& config,
ui64 tabletId,
ui32 partitionId,
ui32 partitionId,
const TPartitionWriterOpts& opts) {
return new TPartitionWriter(client, tabletId, partitionId, opts);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
};

THandleResult OnErrorImpl(NYdb::TPlainStatus&& status); // true - should Start(), false - should Close(), empty - no action

public:
TWriteSessionImpl(const TWriteSessionSettings& settings,
std::shared_ptr<TTopicClient::TImpl> client,
Expand Down
26 changes: 23 additions & 3 deletions ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -697,9 +697,29 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
UNIT_ASSERT_VALUES_EQUAL(stats->GetEndOffset(), count);

}
} // Y_UNIT_TEST_SUITE(BasicUsage)

Y_UNIT_TEST_SUITE(TSettingsValidation) {
Y_UNIT_TEST(TWriteSessionProducerSettings) {
TTopicSdkTestSetup setup(TEST_CASE_NAME);
TTopicClient client = setup.MakeClient();

{
auto writeSettings = TWriteSessionSettings()
.Path(TEST_TOPIC)
.ProducerId("something")
.DeduplicationEnabled(false);
try {
auto writeSession = client.CreateWriteSession(writeSettings);
auto event = writeSession->GetEvent(true);
UNIT_ASSERT(event.Defined());
auto* closedEvent = std::get_if<TSessionClosedEvent>(&event.GetRef());
UNIT_ASSERT(closedEvent);
} catch (NYdb::TContractViolation&) {
//pass
}
}
}
} // Y_UNIT_TEST_SUITE(TSettingsValidation)

}

}
} // namespace
24 changes: 15 additions & 9 deletions ydb/services/persqueue_v1/actors/write_session_actor.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,12 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt
// 1.2. non-empty partition_id (explicit partitioning)
// 1.3. non-empty partition_with_generation (explicit partitioning && direct write to partition host)
// 2. Empty producer id (no deduplication, partition is selected using round-robin).
bool isScenarioSupported =
bool isScenarioSupported =
!InitRequest.producer_id().empty() && (
InitRequest.has_message_group_id() && InitRequest.message_group_id() == InitRequest.producer_id() ||
InitRequest.has_message_group_id() && InitRequest.message_group_id() == InitRequest.producer_id() ||
InitRequest.has_partition_id() ||
InitRequest.has_partition_with_generation()) ||
InitRequest.has_partition_with_generation())
||
InitRequest.producer_id().empty();

if (!isScenarioSupported) {
Expand Down Expand Up @@ -424,7 +425,6 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt
return InitRequest.has_message_group_id() ? InitRequest.message_group_id() : InitRequest.producer_id();
}
}();

LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << " " << InitRequest.ShortDebugString() << " from " << PeerName);
if (!UseDeduplication) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << ". Disable deduplication for empty producer id");
Expand Down Expand Up @@ -467,8 +467,9 @@ template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::InitAfterDiscovery(const TActorContext& ctx) {
Y_UNUSED(ctx);

if (SourceId.empty()) {
Y_ABORT_UNLESS(!UseDeduplication);
if (SourceId.empty() && UseDeduplication) {
CloseSession("Internal server error: got empty SourceId with enabled deduplication", PersQueue::ErrorCode::ERROR, ctx);
return;
}

InitMeta = GetInitialDataChunk(InitRequest, FullConverter->GetClientsideName(), PeerName); // ToDo[migration] - check?
Expand Down Expand Up @@ -835,9 +836,14 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
OwnerCookie = result.GetResult().OwnerCookie;

const auto& maxSeqNo = result.GetResult().SourceIdInfo.GetSeqNo();
if (!UseDeduplication) {
Y_ABORT_UNLESS(maxSeqNo == 0);
}

// ToDo: uncomment after fixing KIKIMR-21124
// if (!UseDeduplication) {
// if (maxSeqNo != 0) {
// return CloseSession("Internal server error: have maxSeqNo != with deduplication disabled",
// PersQueue::ErrorCode::ERROR, ctx);
// }
// }

OwnerCookie = result.GetResult().OwnerCookie;
MakeAndSentInitResponse(maxSeqNo, ctx);
Expand Down
49 changes: 49 additions & 0 deletions ydb/services/persqueue_v1/persqueue_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6694,6 +6694,55 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
}
}

Y_UNIT_TEST(DisableWrongSettings) {
NPersQueue::TTestServer server;
server.EnableLogs({NKikimrServices::PQ_READ_PROXY, NKikimrServices::BLACKBOX_VALIDATOR });
server.EnableLogs({NKikimrServices::PERSQUEUE}, NActors::NLog::EPriority::PRI_INFO);
TString topicFullName = "rt3.dc1--acc--topic1";
auto driver = SetupTestAndGetDriver(server, topicFullName, 3);

std::shared_ptr<grpc::Channel> Channel_;
std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> TopicStubP_;
{
Channel_ = grpc::CreateChannel("localhost:" + ToString(server.GrpcPort), grpc::InsecureChannelCredentials());
TopicStubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_);
}

{
grpc::ClientContext rcontext1;
auto writeStream1 = TopicStubP_->StreamWrite(&rcontext1);
UNIT_ASSERT(writeStream1);
Ydb::Topic::StreamWriteMessage::FromClient req;
Ydb::Topic::StreamWriteMessage::FromServer resp;

req.mutable_init_request()->set_path("acc/topic1");
req.mutable_init_request()->set_message_group_id("some-group");
if (!writeStream1->Write(req)) {
ythrow yexception() << "write fail";
}
UNIT_ASSERT(writeStream1->Read(&resp));
Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
UNIT_ASSERT(resp.status() == Ydb::StatusIds::BAD_REQUEST);
}
{
grpc::ClientContext rcontext1;
auto writeStream1 = TopicStubP_->StreamWrite(&rcontext1);
UNIT_ASSERT(writeStream1);
Ydb::Topic::StreamWriteMessage::FromClient req;
Ydb::Topic::StreamWriteMessage::FromServer resp;

req.mutable_init_request()->set_path("acc/topic1");
req.mutable_init_request()->set_message_group_id("some-group");
req.mutable_init_request()->set_producer_id("producer");
if (!writeStream1->Write(req)) {
ythrow yexception() << "write fail";
}
UNIT_ASSERT(writeStream1->Read(&resp));
Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
UNIT_ASSERT(resp.status() == Ydb::StatusIds::BAD_REQUEST);
}
}

Y_UNIT_TEST(DisableDeduplication) {
NPersQueue::TTestServer server;
TString topicFullName = "rt3.dc1--topic1";
Expand Down
Loading