Skip to content

Commit

Permalink
YDB-2757 Fix UUID column export (#2789)
Browse files Browse the repository at this point in the history
Fixes #2757
  • Loading branch information
SammyVimes authored Mar 15, 2024
1 parent ab356da commit d388e05
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 1 deletion.
27 changes: 27 additions & 0 deletions ydb/core/io_formats/cell_maker/cell_maker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <ydb/library/binary_json/write.h>
#include <ydb/library/dynumber/dynumber.h>
#include <ydb/library/uuid/uuid.h>

#include <ydb/library/yql/minikql/dom/yson.h>
#include <ydb/library/yql/minikql/dom/json.h>
Expand Down Expand Up @@ -132,6 +133,21 @@ namespace {
return true;
}

struct TUuidHolder {
union {
ui16 Array[8];
ui64 Halves[2];
} Buf;
};

template <>
bool TryParse(TStringBuf value, TUuidHolder& result) {
if (!NUuid::ParseUuidToArray(value, result.Buf.Array, false)) {
return false;
}
return true;
}

template <typename T, typename U>
using TConverter = std::function<U(const T&)>;

Expand Down Expand Up @@ -171,6 +187,14 @@ namespace {
return v.Str;
}

TStringBuf UuidToStringBuf(const TUuidHolder& uuid) {
char uuidBuf[16];

NUuid::UuidHalfsToBytes(uuidBuf, 16, uuid.Buf.Halves[1], uuid.Buf.Halves[0]);

return TStringBuf(uuidBuf, 16);
}

template <typename T, typename U = T>
struct TCellMaker {
static bool Make(TCell& c, TStringBuf v, TMemoryPool& pool, TString& err, TConverter<T, U> conv = &Implicit<T, U>) {
Expand Down Expand Up @@ -297,6 +321,8 @@ bool MakeCell(TCell& cell, TStringBuf value, NScheme::TTypeInfo type, TMemoryPoo
return TCellMaker<NYql::NDecimal::TInt128, std::pair<ui64, ui64>>::Make(cell, value, pool, err, &Int128ToPair);
case NScheme::NTypeIds::Pg:
return TCellMaker<NPg::TConvertResult, TStringBuf>::Make(cell, value, pool, err, &PgToStringBuf, type.GetTypeDesc());
case NScheme::NTypeIds::Uuid:
return TCellMaker<TUuidHolder, TStringBuf>::Make(cell, value, pool, err, &UuidToStringBuf);
default:
return false;
}
Expand Down Expand Up @@ -390,6 +416,7 @@ bool CheckCellValue(const TCell& cell, NScheme::TTypeInfo type) {
case NScheme::NTypeIds::JsonDocument: // checked at parsing time
case NScheme::NTypeIds::DyNumber: // checked at parsing time
case NScheme::NTypeIds::Pg: // checked at parsing time
case NScheme::NTypeIds::Uuid: // checked at parsing time
return true;
case NScheme::NTypeIds::Date:
return cell.AsValue<ui16>() < NUdf::MAX_DATE;
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/tx/datashard/export_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,5 +118,15 @@ bool PgToStream(TStringBuf data, void* typeDesc, IOutputStream& out, TString& er
return true;
}

bool UuidToStream(const std::pair<ui64, ui64>& loHi, IOutputStream& out, TString& err) {
Y_UNUSED(err);

NYdb::TUuidValue uuid(loHi.first, loHi.second);

out << uuid.ToString();

return true;
}

} // NDataShard
} // NKikimr
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/export_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ TString DyNumberToString(TStringBuf data);
bool DecimalToStream(const std::pair<ui64, i64>& loHi, IOutputStream& out, TString& err);
bool DyNumberToStream(TStringBuf data, IOutputStream& out, TString& err);
bool PgToStream(TStringBuf data, void* typeDesc, IOutputStream& out, TString& err);
bool UuidToStream(const std::pair<ui64, ui64>& loHi, IOutputStream& out, TString& err);

} // NDataShard
} // NKikimr
3 changes: 3 additions & 0 deletions ydb/core/tx/datashard/export_s3_buffer_raw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
case NScheme::NTypeIds::Pg:
serialized = PgToStream(cell.AsBuf(), column.Type.GetTypeDesc(), out, ErrorString);
break;
case NScheme::NTypeIds::Uuid:
serialized = UuidToStream(cell.AsValue<std::pair<ui64, ui64>>(), out, ErrorString);
break;
default:
Y_ABORT("Unsupported type");
}
Expand Down
24 changes: 24 additions & 0 deletions ydb/core/tx/schemeshard/ut_backup/ut_backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,30 @@ Y_UNIT_TEST_SUITE(TBackupTests) {
});
}

Y_UNIT_TEST_WITH_COMPRESSION(BackupUuidColumn) {
TTestBasicRuntime runtime;

Backup(runtime, ToString(Codec), R"(
Name: "Table"
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Uuid" }
KeyColumnNames: ["key"]
)", [](TTestBasicRuntime& runtime) {
NKikimrMiniKQL::TResult result;
TString error;
NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, TTestTxConfig::FakeHiveTablets, Sprintf(R"(
(
(let key '( '('key (Uint32 '%d) ) ) )
(let row '( '('value (Uuid '"%s") ) ) )
(return (AsList (UpdateRow '__user__%s key row) ))
)
)", 1, "0000111122223333", "Table"), result, error);

UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
UNIT_ASSERT_VALUES_EQUAL(error, "");
});
}

template<ECompressionCodec Codec>
void ShouldSucceedOnLargeData(ui32 minWriteBatchSize, const std::pair<ui32, ui32>& expectedResult) {
TTestBasicRuntime runtime;
Expand Down
71 changes: 70 additions & 1 deletion ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,76 @@ value {
env.TestWaitNotification(runtime, txId);
TestGetExport(runtime, txId, "/MyRoot");

TestImport(runtime, txId, "/MyRoot", Sprintf(R"(
TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ImportFromS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_prefix: "Backup1"
destination_path: "/MyRoot/Restored"
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetImport(runtime, txId, "/MyRoot");
}

Y_UNIT_TEST(ExportImportUuid) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().EnableTablePgTypes(true));
ui64 txId = 100;

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Uuid" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

{
TString tablePath = "/MyRoot/Table";
int partitionIdx = 0;

auto tableDesc = DescribePath(runtime, tablePath, true, true);
const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions();
UNIT_ASSERT(partitionIdx < tablePartitions.size());
const ui64 datashardTabletId = tablePartitions[partitionIdx].GetDatashardId();

NKikimrMiniKQL::TResult result;
TString error;
NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, datashardTabletId, Sprintf(R"(
(
(let key '( '('key (Uint32 '%d) ) ) )
(let row '( '('value (Uuid '"%s") ) ) )
(return (AsList (UpdateRow '__user__%s key row) ))
)
)", 1, "0123456789012345", "Table"), result, error);

UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
UNIT_ASSERT_VALUES_EQUAL(error, "");
}

TPortManager portManager;
const ui16 port = portManager.GetPort();

TS3Mock s3Mock({}, TS3Mock::TSettings(port));
UNIT_ASSERT(s3Mock.Start());

TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_path: "/MyRoot/Table"
destination_prefix: "Backup1"
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetExport(runtime, txId, "/MyRoot");

TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ImportFromS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
Expand Down

0 comments on commit d388e05

Please sign in to comment.