Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge UUID related fixes for export #6697

Merged
merged 2 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions ydb/core/io_formats/cell_maker/cell_maker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <ydb/library/binary_json/write.h>
#include <ydb/library/dynumber/dynumber.h>
#include <ydb/library/uuid/uuid.h>

#include <ydb/library/yql/minikql/dom/yson.h>
#include <ydb/library/yql/minikql/dom/json.h>
Expand Down Expand Up @@ -132,6 +133,21 @@ namespace {
return true;
}

struct TUuidHolder {
union {
ui16 Array[8];
ui64 Halves[2];
} Buf;
};

template <>
bool TryParse(TStringBuf value, TUuidHolder& result) {
if (!NUuid::ParseUuidToArray(value, result.Buf.Array, false)) {
return false;
}
return true;
}

template <typename T, typename U>
using TConverter = std::function<U(const T&)>;

Expand Down Expand Up @@ -171,6 +187,14 @@ namespace {
return v.Str;
}

TStringBuf UuidToStringBuf(const TUuidHolder& uuid) {
char uuidBuf[16];

NUuid::UuidHalfsToBytes(uuidBuf, 16, uuid.Buf.Halves[1], uuid.Buf.Halves[0]);

return TStringBuf(uuidBuf, 16);
}

template <typename T, typename U = T>
struct TCellMaker {
static bool Make(TCell& c, TStringBuf v, TMemoryPool& pool, TString& err, TConverter<T, U> conv = &Implicit<T, U>) {
Expand Down Expand Up @@ -297,6 +321,8 @@ bool MakeCell(TCell& cell, TStringBuf value, NScheme::TTypeInfo type, TMemoryPoo
return TCellMaker<NYql::NDecimal::TInt128, std::pair<ui64, ui64>>::Make(cell, value, pool, err, &Int128ToPair);
case NScheme::NTypeIds::Pg:
return TCellMaker<NPg::TConvertResult, TStringBuf>::Make(cell, value, pool, err, &PgToStringBuf, type.GetTypeDesc());
case NScheme::NTypeIds::Uuid:
return TCellMaker<TUuidHolder, TStringBuf>::Make(cell, value, pool, err, &UuidToStringBuf);
default:
return false;
}
Expand Down Expand Up @@ -390,6 +416,7 @@ bool CheckCellValue(const TCell& cell, NScheme::TTypeInfo type) {
case NScheme::NTypeIds::JsonDocument: // checked at parsing time
case NScheme::NTypeIds::DyNumber: // checked at parsing time
case NScheme::NTypeIds::Pg: // checked at parsing time
case NScheme::NTypeIds::Uuid: // checked at parsing time
return true;
case NScheme::NTypeIds::Date:
return cell.AsValue<ui16>() < NUdf::MAX_DATE;
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,12 @@ class TxPlanSerializer {
}
}

if (auto literal = key.Maybe<TCoUuid>()) {
TStringStream out;
NUuid::UuidBytesToString(literal.Cast().Literal().Value().Data(), out);
return out.Str();
}

if (auto literal = key.Maybe<TCoDataCtor>()) {
return literal.Cast().Literal().StringValue();
}
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,12 @@ void FillLiteralProtoImpl(const NNodes::TCoDataCtor& literal, TProto& proto) {
protoValue.SetHi128(*reinterpret_cast<ui64*>(p + 8));
break;
}
case EDataSlot::Uuid: {
const ui64* uuidData = reinterpret_cast<const ui64*>(value.Data());
protoValue.SetLow128(uuidData[0]);
protoValue.SetHi128(uuidData[1]);
break;
}

default:
YQL_ENSURE(false, "Unexpected type slot " << slot);
Expand Down Expand Up @@ -738,6 +744,12 @@ void FillLiteralProto(const NNodes::TCoDataCtor& literal, Ydb::TypedValue& proto
protoValue.set_high_128(*reinterpret_cast<ui64*>(p + 8));
break;
}
case EDataSlot::Uuid: {
const ui64* uuidData = reinterpret_cast<const ui64*>(value.Data());
protoValue.set_low_128(uuidData[0]);
protoValue.set_high_128(uuidData[1]);
break;
}

default:
YQL_ENSURE(false, "Unexpected type slot " << slot);
Expand Down
88 changes: 88 additions & 0 deletions ydb/core/kqp/ut/yql/kqp_yql_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,94 @@ Y_UNIT_TEST_SUITE(KqpYql) {
}
}

Y_UNIT_TEST(TestUuidPrimaryKeyPrefixSearch) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetEnableUuidAsPrimaryKey(true)
.SetKqpSettings({setting});
TKikimrRunner kikimr(serverSettings.SetWithSampleTables(false));

auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

TVector<TString> testUuids = {
"5b99a330-04ef-4f1a-9b64-ba6d5f44eafe",
"afcbef30-9ac3-481a-aa6a-8d9b785dbb0a",
"b91cd23b-861c-4cc1-9119-801a4dac1cb9",
"65df9ecc-a97d-47b2-ae56-3c023da6ee8c",
};

{
const auto query = Q_(R"(
CREATE TABLE test(
key uuid NOT NULL,
val int,
PRIMARY KEY (key)
);
)");
auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
{
int val = 0;
for (const auto& uuid : testUuids) {
const auto query = Sprintf("\
INSERT INTO test (key, val)\n\
VALUES (Uuid(\"%s\"), %u);\n\
", uuid.Data(), val++);
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
}
{
int val = 0;
for (const auto& uuid : testUuids) {
const auto query = Sprintf("SELECT * FROM test WHERE key=Uuid(\"%s\");", uuid.Data());
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());

TResultSetParser parser(result.GetResultSetParser(0));
UNIT_ASSERT(parser.TryNextRow());
UNIT_ASSERT_VALUES_EQUAL(parser.ColumnParser("val").GetOptionalInt32().GetRef(), val++);
UNIT_ASSERT_VALUES_EQUAL(parser.RowsCount(), 1);
}
}
}

Y_UNIT_TEST(TestUuidDefaultColumn) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetEnableUuidAsPrimaryKey(true)
.SetKqpSettings({setting});
TKikimrRunner kikimr(serverSettings.SetWithSampleTables(false));

auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

{
const auto query = Q_(R"(
CREATE TABLE test(
key int NOT NULL,
val uuid NOT NULL DEFAULT Uuid("65df9ecc-a97d-47b2-ae56-3c023da6ee8c"),
PRIMARY KEY (key)
);
)");
auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
{
const auto query = "INSERT INTO test (key) VALUES (0);";
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
}

Y_UNIT_TEST(UuidPrimaryKeyBulkUpsert) {
auto settings = TKikimrSettings()
.SetEnableUuidAsPrimaryKey(true)
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/tx/datashard/export_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,5 +118,15 @@ bool PgToStream(TStringBuf data, void* typeDesc, IOutputStream& out, TString& er
return true;
}

bool UuidToStream(const std::pair<ui64, ui64>& loHi, IOutputStream& out, TString& err) {
Y_UNUSED(err);

NYdb::TUuidValue uuid(loHi.first, loHi.second);

out << uuid.ToString();

return true;
}

} // NDataShard
} // NKikimr
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/export_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ TString DyNumberToString(TStringBuf data);
bool DecimalToStream(const std::pair<ui64, i64>& loHi, IOutputStream& out, TString& err);
bool DyNumberToStream(TStringBuf data, IOutputStream& out, TString& err);
bool PgToStream(TStringBuf data, void* typeDesc, IOutputStream& out, TString& err);
bool UuidToStream(const std::pair<ui64, ui64>& loHi, IOutputStream& out, TString& err);

} // NDataShard
} // NKikimr
3 changes: 3 additions & 0 deletions ydb/core/tx/datashard/export_s3_buffer_raw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
case NScheme::NTypeIds::Pg:
serialized = PgToStream(cell.AsBuf(), column.Type.GetTypeDesc(), out, ErrorString);
break;
case NScheme::NTypeIds::Uuid:
serialized = UuidToStream(cell.AsValue<std::pair<ui64, ui64>>(), out, ErrorString);
break;
default:
Y_ABORT("Unsupported type");
}
Expand Down
24 changes: 24 additions & 0 deletions ydb/core/tx/schemeshard/ut_backup/ut_backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,30 @@ Y_UNIT_TEST_SUITE(TBackupTests) {
});
}

Y_UNIT_TEST_WITH_COMPRESSION(BackupUuidColumn) {
TTestBasicRuntime runtime;

Backup(runtime, ToString(Codec), R"(
Name: "Table"
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Uuid" }
KeyColumnNames: ["key"]
)", [](TTestBasicRuntime& runtime) {
NKikimrMiniKQL::TResult result;
TString error;
NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, TTestTxConfig::FakeHiveTablets, Sprintf(R"(
(
(let key '( '('key (Uint32 '%d) ) ) )
(let row '( '('value (Uuid '"%s") ) ) )
(return (AsList (UpdateRow '__user__%s key row) ))
)
)", 1, "0000111122223333", "Table"), result, error);

UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
UNIT_ASSERT_VALUES_EQUAL(error, "");
});
}

template<ECompressionCodec Codec>
void ShouldSucceedOnLargeData(ui32 minWriteBatchSize, const std::pair<ui32, ui32>& expectedResult) {
TTestBasicRuntime runtime;
Expand Down
71 changes: 70 additions & 1 deletion ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,76 @@ value {
env.TestWaitNotification(runtime, txId);
TestGetExport(runtime, txId, "/MyRoot");

TestImport(runtime, txId, "/MyRoot", Sprintf(R"(
TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ImportFromS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_prefix: "Backup1"
destination_path: "/MyRoot/Restored"
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetImport(runtime, txId, "/MyRoot");
}

Y_UNIT_TEST(ExportImportUuid) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().EnableTablePgTypes(true));
ui64 txId = 100;

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Uuid" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

{
TString tablePath = "/MyRoot/Table";
int partitionIdx = 0;

auto tableDesc = DescribePath(runtime, tablePath, true, true);
const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions();
UNIT_ASSERT(partitionIdx < tablePartitions.size());
const ui64 datashardTabletId = tablePartitions[partitionIdx].GetDatashardId();

NKikimrMiniKQL::TResult result;
TString error;
NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, datashardTabletId, Sprintf(R"(
(
(let key '( '('key (Uint32 '%d) ) ) )
(let row '( '('value (Uuid '"%s") ) ) )
(return (AsList (UpdateRow '__user__%s key row) ))
)
)", 1, "0123456789012345", "Table"), result, error);

UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
UNIT_ASSERT_VALUES_EQUAL(error, "");
}

TPortManager portManager;
const ui16 port = portManager.GetPort();

TS3Mock s3Mock({}, TS3Mock::TSettings(port));
UNIT_ASSERT(s3Mock.Start());

TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_path: "/MyRoot/Table"
destination_prefix: "Backup1"
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetExport(runtime, txId, "/MyRoot");

TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ImportFromS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
Expand Down
6 changes: 6 additions & 0 deletions ydb/library/uuid/uuid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ static void WriteHex(ui16 bytes, IOutputStream& out, bool reverseBytes = false)
}
}

void UuidBytesToString(TString in, IOutputStream& out) {
ui16 dw[8];
std::memcpy(dw, in.Data(), sizeof(dw));
NUuid::UuidToString(dw, out);
}

void UuidToString(ui16 dw[8], IOutputStream& out) {
WriteHex(dw[1], out);
WriteHex(dw[0], out);
Expand Down
1 change: 1 addition & 0 deletions ydb/library/uuid/uuid.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace NUuid {

static constexpr ui32 UUID_LEN = 16;

void UuidBytesToString(TString in, IOutputStream& out);
void UuidToString(ui16 dw[8], IOutputStream& out);
void UuidHalfsToByteString(ui64 low, ui64 hi, IOutputStream& out);

Expand Down
Loading