Skip to content

Commit

Permalink
New field COMPRESSION_LEVEL in Column Family (#10645)
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-gogov authored Oct 22, 2024
1 parent 08196b8 commit 7a1e697
Show file tree
Hide file tree
Showing 16 changed files with 219 additions and 6 deletions.
3 changes: 3 additions & 0 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ bool ConvertCreateTableSettingsToProto(NYql::TKikimrTableMetadataPtr metadata, Y
return false;
}
}
if (family.CompressionLevel) {
familyProto->set_compression_level(family.CompressionLevel.GetRef());
}
}

if (metadata->TableSettings.CompactionPolicy) {
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1556,6 +1556,9 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
return SyncError();
}

} else if (name == "compression_level") {
auto level = FromString<i32>(familySetting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value());
f->set_compression_level(level);
} else {
ctx.AddError(TIssue(ctx.GetPosition(familySetting.Name().Pos()),
TStringBuilder() << "Unknown column family setting name: " << name));
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ struct TColumnFamily {
TString Name;
TMaybe<TString> Data;
TMaybe<TString> Compression;
TMaybe<i32> CompressionLevel;
};

struct TTtlSettings {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,8 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
family.Compression = TString(
familySetting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
);
} else if (name == "compression_level") {
family.CompressionLevel = FromString<i32>(familySetting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value());
} else {
ctx.AddError(TIssue(ctx.GetPosition(familySetting.Name().Pos()),
TStringBuilder() << "Unknown column family setting name: " << name));
Expand Down
88 changes: 88 additions & 0 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2057,6 +2057,94 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
}
}

Y_UNIT_TEST(CreateFamilyWithCompressionLevel) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
TString tableName = "/Root/TableWithCompressionLevel";
auto query = TStringBuilder() << R"(
--!syntax_v1
CREATE TABLE `)" << tableName
<< R"(` (
Key Uint64,
Value1 String,
Value2 Uint32,
PRIMARY KEY (Key),
FAMILY Family1 (
DATA = "test",
COMPRESSION = "lz4",
COMPRESSION_LEVEL = 5
),
);)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Field `COMPRESSION_LEVEL` is not supported for OLTP tables");
}

Y_UNIT_TEST(AlterCompressionLevelInColumnFamily) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
TString tableName = "/Root/TableWithCompressionLevel";
auto query = TStringBuilder() << R"(
--!syntax_v1
CREATE TABLE `)" << tableName
<< R"(` (
Key Uint64,
Value1 String FAMILY Family1,
Value2 Uint32,
PRIMARY KEY (Key),
FAMILY Family1 (
DATA = "test",
COMPRESSION = "lz4"
),
);)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

auto queryAlter = TStringBuilder() << R"(
--!syntax_v1
ALTER TABLE `)" << tableName << R"(`
ALTER FAMILY Family1 SET COMPRESSION_LEVEL 5;)";
auto resultAlter = session.ExecuteSchemeQuery(queryAlter).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(resultAlter.GetStatus(), EStatus::BAD_REQUEST, resultAlter.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(resultAlter.GetIssues().ToString(), "Field `COMPRESSION_LEVEL` is not supported for OLTP tables");
}

Y_UNIT_TEST(AddColumnFamilyWithCompressionLevel) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
TString tableName = "/Root/TableWithCompressionLevel";
auto query = TStringBuilder() << R"(
--!syntax_v1
CREATE TABLE `)" << tableName
<< R"(` (
Key Uint64,
Value1 String FAMILY Family1,
Value2 Uint32,
PRIMARY KEY (Key),
FAMILY Family1 (
DATA = "test",
COMPRESSION = "lz4"
),
);)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

auto queryAlter = TStringBuilder() << R"(
--!syntax_v1
ALTER TABLE `)" << tableName << R"(`
ADD FAMILY Family2 (
DATA = "test",
COMPRESSION = "lz4",
COMPRESSION_LEVEL = 5
);)";
auto resultAlter = session.ExecuteSchemeQuery(queryAlter).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(resultAlter.GetStatus(), EStatus::BAD_REQUEST, resultAlter.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(resultAlter.GetIssues().ToString(), "Field `COMPRESSION_LEVEL` is not supported for OLTP tables");
}

Y_UNIT_TEST(CreateTableWithDefaultFamily) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ message TFamilyDescription {
optional EColumnCache ColumnCache = 7;
optional EColumnStorage Storage = 8; // DEPRECATED: use StorageConfig
optional TStorageConfig StorageConfig = 9;
optional int32 ColumnCodecLevel = 10;
}

message TFastSplitSettings {
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/ydb_convert/column_families.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ namespace NKikimr {
return false;
}

if (familySettings.has_compression_level()) {
*code = Ydb::StatusIds::BAD_REQUEST;
*error = "Field `COMPRESSION_LEVEL` is not supported for OLTP tables";
return false;
}

auto* family = MutableNamedFamily(familySettings.name());

if (familySettings.has_data()) {
Expand Down Expand Up @@ -214,6 +220,15 @@ namespace NKikimr {
return false;
}

for (size_t index = 0; index < PartitionConfig->ColumnFamiliesSize(); ++index) {
auto columnFamily = PartitionConfig->GetColumnFamilies(index);
if (columnFamily.HasColumnCodecLevel()) {
*code = Ydb::StatusIds::BAD_REQUEST;
*error = "Field `COMPRESSION_LEVEL` is not supported for OLTP tables";
return false;
}
}

if (!defaultFamily->HasStorageConfig() ||
!defaultFamily->GetStorageConfig().HasSysLog() ||
!defaultFamily->GetStorageConfig().HasLog())
Expand Down
5 changes: 4 additions & 1 deletion ydb/library/yql/sql/v1/SQLv1.g.in
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,10 @@ table_setting_value:
family_entry: FAMILY an_id family_settings;
family_settings: LPAREN (family_settings_entry (COMMA family_settings_entry)*)? RPAREN;
family_settings_entry: an_id EQUALS family_setting_value;
family_setting_value: STRING_VALUE;
family_setting_value:
STRING_VALUE
| integer
;

split_boundaries:
LPAREN literal_value_list (COMMA literal_value_list)* RPAREN
Expand Down
5 changes: 4 additions & 1 deletion ydb/library/yql/sql/v1/SQLv1Antlr4.g.in
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,10 @@ table_setting_value:
family_entry: FAMILY an_id family_settings;
family_settings: LPAREN (family_settings_entry (COMMA family_settings_entry)*)? RPAREN;
family_settings_entry: an_id EQUALS family_setting_value;
family_setting_value: STRING_VALUE;
family_setting_value:
STRING_VALUE
| integer
;

split_boundaries:
LPAREN literal_value_list (COMMA literal_value_list)* RPAREN
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/sql/v1/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,7 @@ namespace NSQLTranslationV1 {
TIdentifier Name;
TNodePtr Data;
TNodePtr Compression;
TNodePtr CompressionLevel;
};

struct TVectorIndexSettings {
Expand Down
9 changes: 9 additions & 0 deletions ydb/library/yql/sql/v1/query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,9 @@ class TCreateTableNode final: public TAstListNode {
if (family.Compression) {
familyDesc = L(familyDesc, Q(Y(Q("compression"), family.Compression)));
}
if (family.CompressionLevel) {
familyDesc = L(familyDesc, Q(Y(Q("compression_level"), family.CompressionLevel)));
}
columnFamilies = L(columnFamilies, Q(familyDesc));
}
opts = L(opts, Q(Y(Q("columnFamilies"), Q(columnFamilies))));
Expand Down Expand Up @@ -1413,6 +1416,9 @@ class TAlterTableNode final: public TAstListNode {
if (family.Compression) {
familyDesc = L(familyDesc, Q(Y(Q("compression"), family.Compression)));
}
if (family.CompressionLevel) {
familyDesc = L(familyDesc, Q(Y(Q("compression_level"), family.CompressionLevel)));
}
columnFamilies = L(columnFamilies, Q(familyDesc));
}
actions = L(actions, Q(Y(Q("addColumnFamilies"), Q(columnFamilies))));
Expand All @@ -1429,6 +1435,9 @@ class TAlterTableNode final: public TAstListNode {
if (family.Compression) {
familyDesc = L(familyDesc, Q(Y(Q("compression"), family.Compression)));
}
if (family.CompressionLevel) {
familyDesc = L(familyDesc, Q(Y(Q("compression_level"), family.CompressionLevel)));
}
columnFamilies = L(columnFamilies, Q(familyDesc));
}
actions = L(actions, Q(Y(Q("alterColumnFamilies"), Q(columnFamilies))));
Expand Down
10 changes: 8 additions & 2 deletions ydb/library/yql/sql/v1/sql_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2016,16 +2016,22 @@ bool TSqlQuery::AlterTableAlterFamily(const TRule_alter_table_alter_column_famil
<< "' in one alter";
return false;
}
const TString stringValue(Ctx.Token(value.GetToken1()));
const TString stringValue(Ctx.Token(value.GetAlt_family_setting_value1().GetToken1()));
entry->Data = BuildLiteralSmartString(Ctx, stringValue);
} else if (to_lower(settingName.Name) == "compression") {
if (entry->Compression) {
Ctx.Error() << "Redefinition of 'compression' setting for column family '" << name.Name
<< "' in one alter";
return false;
}
const TString stringValue(Ctx.Token(value.GetToken1()));
const TString stringValue(Ctx.Token(value.GetAlt_family_setting_value1().GetToken1()));
entry->Compression = BuildLiteralSmartString(Ctx, stringValue);
} else if (to_lower(settingName.Name) == "compression_level") {
if (entry->CompressionLevel) {
Ctx.Error() << "Redefinition of 'compression_level' setting for column family '" << name.Name << "' in one alter";
return false;
}
entry->CompressionLevel = LiteralNumber(Ctx, value.GetAlt_family_setting_value2().GetRule_integer1());
} else {
Ctx.Error() << "Unknown table setting: " << settingName.Name;
return false;
Expand Down
6 changes: 4 additions & 2 deletions ydb/library/yql/sql/v1/sql_translation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1571,11 +1571,13 @@ bool TSqlTranslation::FillFamilySettingsEntry(const TRule_family_settings_entry&
TIdentifier id = IdEx(settingNode.GetRule_an_id1(), *this);
const TRule_family_setting_value& value = settingNode.GetRule_family_setting_value3();
if (to_lower(id.Name) == "data") {
const TString stringValue(Ctx.Token(value.GetToken1()));
const TString stringValue(Ctx.Token(value.GetAlt_family_setting_value1().GetToken1()));
family.Data = BuildLiteralSmartString(Ctx, stringValue);
} else if (to_lower(id.Name) == "compression") {
const TString stringValue(Ctx.Token(value.GetToken1()));
const TString stringValue(Ctx.Token(value.GetAlt_family_setting_value1().GetToken1()));
family.Compression = BuildLiteralSmartString(Ctx, stringValue);
} else if (to_lower(id.Name) == "compression_level") {
family.CompressionLevel = LiteralNumber(Ctx, value.GetAlt_family_setting_value2().GetRule_integer1());
} else {
Ctx.Error() << "Unknown table setting: " << id.Name;
return false;
Expand Down
36 changes: 36 additions & 0 deletions ydb/library/yql/sql/v1/sql_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7424,3 +7424,39 @@ Y_UNIT_TEST_SUITE(Restore) {
UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
}
}

Y_UNIT_TEST_SUITE(ColumnFamily) {
Y_UNIT_TEST(CompressionLevel) {
NYql::TAstParseResult res = SqlToYql(R"( use plato;
CREATE TABLE tableName (
Key Uint32 FAMILY default,
Value String FAMILY family1,
PRIMARY KEY (Key),
FAMILY default (
DATA = "test",
COMPRESSION = "lz4",
COMPRESSION_LEVEL = 5
),
FAMILY family1 (
DATA = "test",
COMPRESSION = "lz4",
COMPRESSION_LEVEL = 3
)
);
)");
UNIT_ASSERT(res.IsOk());
UNIT_ASSERT(res.Issues.Size() == 0);
TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
if (word == "Write") {
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("compression_level"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("5"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("3"));
}
};

TWordCountHive elementStat = { { TString("Write"), 0 }, { TString("compression_level"), 0 } };
VerifyProgram(res, elementStat, verifyLine);
UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
UNIT_ASSERT_VALUES_EQUAL(2, elementStat["compression_level"]);
}
}
36 changes: 36 additions & 0 deletions ydb/library/yql/sql/v1/sql_ut_antlr4.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7396,3 +7396,39 @@ Y_UNIT_TEST_SUITE(Restore) {
UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
}
}

Y_UNIT_TEST_SUITE(ColumnFamily) {
Y_UNIT_TEST(CompressionLevel) {
NYql::TAstParseResult res = SqlToYql(R"( use plato;
CREATE TABLE tableName (
Key Uint32 FAMILY default,
Value String FAMILY family1,
PRIMARY KEY (Key),
FAMILY default (
DATA = "test",
COMPRESSION = "lz4",
COMPRESSION_LEVEL = 5
),
FAMILY family1 (
DATA = "test",
COMPRESSION = "lz4",
COMPRESSION_LEVEL = 3
)
);
)");
UNIT_ASSERT(res.IsOk());
UNIT_ASSERT(res.Issues.Size() == 0);
TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
if (word == "Write") {
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("compression_level"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("5"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("3"));
}
};

TWordCountHive elementStat = { { TString("Write"), 0 }, { TString("compression_level"), 0 } };
VerifyProgram(res, elementStat, verifyLine);
UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
UNIT_ASSERT_VALUES_EQUAL(2, elementStat["compression_level"]);
}
}
4 changes: 4 additions & 0 deletions ydb/public/api/protos/ydb_table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,10 @@ message ColumnFamily {
// When enabled table data will be kept in memory
// WARNING: DO NOT USE
Ydb.FeatureFlag.Status keep_in_memory = 4;

// Not all compression algorithms support
// Set if want to change default value
optional int32 compression_level = 5;
}

message PartitioningSettings {
Expand Down

0 comments on commit 7a1e697

Please sign in to comment.