Skip to content

Commit

Permalink
Reuser memory pool & pass schema version
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed May 20, 2024
1 parent 19a2355 commit f90a94a
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 14 deletions.
16 changes: 12 additions & 4 deletions ydb/core/tx/replication/service/json_change_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,10 @@ static bool ParseValue(TVector<NTable::TTag>& tags, TVector<TCell>& cells,
return true;
}

void TChangeRecord::Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record) const {
void TChangeRecord::Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record, TMemoryPool& pool) const {
record.SetSourceOffset(GetOrder());
// TODO: fill WriteTxId

TMemoryPool pool(256);
TString error;

if (JsonBody.Has("key") && JsonBody["key"].IsArray()) {
Expand Down Expand Up @@ -104,9 +103,13 @@ void TChangeRecord::Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TC
}
}

TConstArrayRef<TCell> TChangeRecord::GetKey() const {
void TChangeRecord::Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record) const {
TMemoryPool pool(256);
Serialize(record, pool);
}

TConstArrayRef<TCell> TChangeRecord::GetKey(TMemoryPool& pool) const {
if (!Key) {
TMemoryPool pool(256);
TString error;

if (JsonBody.Has("key") && JsonBody["key"].IsArray()) {
Expand All @@ -126,4 +129,9 @@ TConstArrayRef<TCell> TChangeRecord::GetKey() const {
return *Key;
}

TConstArrayRef<TCell> TChangeRecord::GetKey() const {
TMemoryPool pool(256);
return GetKey(pool);
}

}
4 changes: 4 additions & 0 deletions ydb/core/tx/replication/service/json_change_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <util/generic/maybe.h>
#include <util/generic/ptr.h>
#include <util/generic/vector.h>
#include <util/memory/pool.h>

namespace NKikimr::NReplication::NService {

Expand All @@ -26,6 +27,7 @@ struct TLightweightSchema: public TThrRefBase {

TVector<NScheme::TTypeInfo> KeyColumns;
THashMap<TString, TColumn> ValueColumns;
ui64 Version = 0;
};

class TChangeRecordBuilder;
Expand All @@ -39,8 +41,10 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
ui64 GetTxId() const override;
EKind GetKind() const override;

void Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record, TMemoryPool& pool) const;
void Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record) const;

TConstArrayRef<TCell> GetKey(TMemoryPool& pool) const;
TConstArrayRef<TCell> GetKey() const;

private:
Expand Down
32 changes: 22 additions & 10 deletions ydb/core/tx/replication/service/table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/change_exchange/change_sender_common_ops.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/core/tablet_flat/flat_row_eggs.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
Expand All @@ -25,7 +26,7 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter> {
if (!LogPrefix) {
LogPrefix = TStringBuilder()
<< "[TablePartitionWriter]"
<< TablePathId
<< TableId
<< "[" << TabletId << "]"
<< SelfId() << " ";
}
Expand Down Expand Up @@ -71,13 +72,14 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter> {

auto event = MakeHolder<TEvDataShard::TEvApplyReplicationChanges>();
auto& tableId = *event->Record.MutableTableId();
tableId.SetOwnerId(TablePathId.OwnerId);
tableId.SetTableId(TablePathId.LocalPathId);
// TODO: SetSchemaVersion?
tableId.SetOwnerId(TableId.PathId.OwnerId);
tableId.SetTableId(TableId.PathId.LocalPathId);
tableId.SetSchemaVersion(TableId.SchemaVersion);

for (auto recordPtr : ev->Get()->Records) {
MemoryPool.Clear();
const auto& record = *recordPtr->Get<TChangeRecord>();
record.Serialize(*event->Record.AddChanges());
record.Serialize(*event->Record.AddChanges(), MemoryPool);
// TODO: set WriteTxId, Source
}

Expand Down Expand Up @@ -147,10 +149,11 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter> {
return NKikimrServices::TActivity::REPLICATION_TABLE_PARTITION_WRITER;
}

explicit TTablePartitionWriter(const TActorId& parent, ui64 tabletId, const TPathId& tablePathId)
explicit TTablePartitionWriter(const TActorId& parent, ui64 tabletId, const TTableId& tableId)
: Parent(parent)
, TabletId(tabletId)
, TablePathId(tablePathId)
, TableId(tableId)
, MemoryPool(256)
{
}

Expand All @@ -168,10 +171,11 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter> {
private:
const TActorId Parent;
const ui64 TabletId;
const TPathId TablePathId;
const TTableId TableId;
mutable TMaybe<TString> LogPrefix;

TActorId LeaderPipeCache;
TMemoryPool MemoryPool;

}; // TTablePartitionWriter

Expand Down Expand Up @@ -309,6 +313,10 @@ class TLocalTableWriter
}

auto schema = MakeIntrusive<TLightweightSchema>();
if (entry.Self && entry.Self->Info.HasVersion()) {
schema->Version = entry.Self->Info.GetVersion().GetTableSchemaVersion();
}

for (const auto& [_, column] : entry.Columns) {
if (column.KeyOrder >= 0) {
if (schema->KeyColumns.size() <= static_cast<ui32>(column.KeyOrder)) {
Expand Down Expand Up @@ -382,14 +390,15 @@ class TLocalTableWriter
}

IActor* CreateSender(ui64 partitionId) override {
return new TTablePartitionWriter(SelfId(), partitionId, PathId);
return new TTablePartitionWriter(SelfId(), partitionId, TTableId(PathId, Schema->Version));
}

ui64 GetPartitionId(NChangeExchange::IChangeRecord::TPtr record) const override {
Y_ABORT_UNLESS(KeyDesc);
Y_ABORT_UNLESS(KeyDesc->GetPartitions());

const auto range = TTableRange(record->Get<TChangeRecord>()->GetKey());
MemoryPool.Clear();
const auto range = TTableRange(record->Get<TChangeRecord>()->GetKey(MemoryPool));
Y_ABORT_UNLESS(range.Point);

TVector<TKeyDesc::TPartitionInfo>::const_iterator it = LowerBound(
Expand Down Expand Up @@ -492,6 +501,7 @@ class TLocalTableWriter
explicit TLocalTableWriter(const TPathId& tablePathId)
: TActor(&TThis::StateWork)
, TBaseChangeSender(this, this, tablePathId)
, MemoryPool(256)
{
}

Expand All @@ -517,9 +527,11 @@ class TLocalTableWriter
ui64 TableVersion = 0;
THolder<TKeyDesc> KeyDesc;
TLightweightSchema::TCPtr Schema;

bool Resolving = false;

TMap<ui64, NChangeExchange::IChangeRecord::TPtr> PendingRecords;
mutable TMemoryPool MemoryPool;

}; // TLocalTableWriter

Expand Down

0 comments on commit f90a94a

Please sign in to comment.