Skip to content

Commit

Permalink
reduce memory in schemas loading (#9548)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Sep 20, 2024
1 parent e4aa46b commit 89871e4
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 22 deletions.
40 changes: 21 additions & 19 deletions ydb/core/tx/columnshard/tables_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ bool TTablesManager::FillMonitoringReport(NTabletFlatExecutor::TTransactionConte
}

bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
using TTableVersionsInfo = TVersionedSchema<NKikimrTxColumnShard::TTableVersionInfo>;

THashMap<ui32, TSchemaPreset> schemaPresets;
THashMap<ui32, TTableVersionsInfo> tableVersions;
{
TMemoryProfileGuard g("TTablesManager/InitFromDB::Tables");
auto rowset = db.Table<Schema::TableInfo>().Select();
Expand All @@ -64,7 +61,6 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
PathsToDrop.insert(table.GetPathId());
}

AFL_VERIFY(tableVersions.emplace(table.GetPathId(), TTableVersionsInfo()).second);
AFL_VERIFY(Tables.emplace(table.GetPathId(), std::move(table)).second);

if (!rowset.Next()) {
Expand Down Expand Up @@ -115,7 +111,6 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
rowset.GetValue<Schema::TableVersionInfo::SinceTxId>());

auto& table = Tables[pathId];
auto& versionsInfo = tableVersions[pathId];
NKikimrTxColumnShard::TTableVersionInfo versionInfo;
Y_ABORT_UNLESS(versionInfo.ParseFromString(rowset.GetValue<Schema::TableVersionInfo::InfoProto>()));
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "load_table_version")("path_id", pathId)("snapshot", version)("version", versionInfo.HasSchema() ? versionInfo.GetSchema().GetVersion() : -1);
Expand All @@ -125,15 +120,17 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
auto& ttlSettings = versionInfo.GetTtlSettings();
if (ttlSettings.HasEnabled()) {
auto vIt = lastVersion.find(pathId);
if (vIt == lastVersion.end() || vIt->second < version) {
if (vIt == lastVersion.end()) {
vIt = lastVersion.emplace(pathId, version).first;
}
if (vIt->second <= version) {
TTtl::TDescription description(ttlSettings.GetEnabled());
Ttl.SetPathTtl(pathId, std::move(description));
lastVersion.emplace(pathId, version);
vIt->second = version;
}
}
}
table.AddVersion(version);
versionsInfo.AddVersion(version, versionInfo);
if (!rowset.Next()) {
return false;
}
Expand All @@ -152,8 +149,7 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
Y_ABORT_UNLESS(schemaPresets.contains(id));
auto& preset = schemaPresets[id];
NOlap::TSnapshot version(
rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(),
rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>());
rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(), rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>());

TSchemaPreset::TSchemaPresetVersionInfo info;
Y_ABORT_UNLESS(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>()));
Expand All @@ -166,21 +162,27 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
}

TMemoryProfileGuard g("TTablesManager/InitFromDB::Other");
for (const auto& [id, preset] : schemaPresets) {
for (auto& [id, preset] : schemaPresets) {
if (isFakePresetOnly) {
Y_ABORT_UNLESS(id == 0);
} else {
Y_ABORT_UNLESS(id > 0);
}
for (const auto& [version, schemaInfo] : preset.GetVersionsById()) {
if (schemaInfo.HasSchema()) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "index_schema")("preset_id", id)("snapshot", version)("version", schemaInfo.GetSchema().GetVersion());
if (!PrimaryIndex) {
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, StoragesManager, preset.GetMinVersionForId(schemaInfo.GetSchema().GetVersion()), schemaInfo.GetSchema());
} else {
PrimaryIndex->RegisterSchemaVersion(preset.GetMinVersionForId(schemaInfo.GetSchema().GetVersion()), schemaInfo.GetSchema());
}
for (auto it = preset.MutableVersionsById().begin(); it != preset.MutableVersionsById().end();) {
const auto version = it->first;
const auto& schemaInfo = it->second;
if (!schemaInfo.HasSchema()) {
continue;
}
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "index_schema")("preset_id", id)("snapshot", version)(
"version", schemaInfo.GetSchema().GetVersion());
if (!PrimaryIndex) {
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(
TabletId, StoragesManager, preset.GetMinVersionForId(schemaInfo.GetSchema().GetVersion()), schemaInfo.GetSchema());
} else {
PrimaryIndex->RegisterSchemaVersion(preset.GetMinVersionForId(schemaInfo.GetSchema().GetVersion()), schemaInfo.GetSchema());
}
it = preset.MutableVersionsById().erase(it);
}
}
for (auto&& i : Tables) {
Expand Down
11 changes: 8 additions & 3 deletions ydb/core/tx/columnshard/tables_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class TVersionedSchema {
return VersionsById;
}

TMap<ui64, TVersionData>& MutableVersionsById() {
return VersionsById;
}

NOlap::TSnapshot GetMinVersionForId(const ui64 sVersion) const {
auto it = MinVersionById.find(sVersion);
Y_ABORT_UNLESS(it != MinVersionById.end());
Expand All @@ -42,10 +46,11 @@ class TVersionedSchema {
VersionsById.emplace(ssVersion, versionInfo);
Y_ABORT_UNLESS(Versions.emplace(snapshot, ssVersion).second);

if (MinVersionById.contains(ssVersion)) {
MinVersionById.emplace(ssVersion, std::min(snapshot, MinVersionById.at(ssVersion)));
} else {
auto it = MinVersionById.find(ssVersion);
if (it == MinVersionById.end()) {
MinVersionById.emplace(ssVersion, snapshot);
} else {
it->second = std::min(snapshot, it->second);
}
}
};
Expand Down

0 comments on commit 89871e4

Please sign in to comment.