Skip to content

Commit

Permalink
temp save
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang committed Oct 9, 2024
1 parent 2fffe3b commit 5b0c653
Show file tree
Hide file tree
Showing 13 changed files with 169 additions and 34 deletions.
51 changes: 51 additions & 0 deletions dbms/src/IO/Encryption/AESCTRCipherStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Logger.h>
#include <Common/RedactHelpers.h>
#include <IO/Encryption/AESCTRCipherStream.h>
#include <IO/Endian.h>
#include <common/logger_useful.h>


namespace DB
Expand Down Expand Up @@ -45,7 +48,31 @@ void AESCTRCipherStream::encrypt(UInt64 file_offset, char * data, size_t data_si
UInt64 block_index = file_offset / block_size;
unsigned char iv[block_size];
initIV(block_index, iv);
LOG_DEBUG(
DB::Logger::get("ffff"),
"AESCTRCipher encrypt, before,"
" block_index={} init_iv_high/low={}/{}"
" file_offset={} data_size={} iv={} data={}",
block_index,
initial_iv_high,
initial_iv_low,
file_offset,
data_size,
Redact::keyToHexString(reinterpret_cast<char *>(iv), block_size),
Redact::keyToHexString(data, data_size));
DB::Encryption::Cipher(file_offset, data, data_size, key, method, iv, /*is_encrypt=*/true);
LOG_DEBUG(
DB::Logger::get("ffff"),
"AESCTRCipher encrypt, after,"
" block_index={} init_iv_high/low={}/{}"
" file_offset={} data_size={} iv={} data={}",
block_index,
initial_iv_high,
initial_iv_low,
file_offset,
data_size,
Redact::keyToHexString(reinterpret_cast<char *>(iv), block_size),
Redact::keyToHexString(data, data_size));
}

void AESCTRCipherStream::decrypt(UInt64 file_offset, char * data, size_t data_size)
Expand All @@ -54,7 +81,31 @@ void AESCTRCipherStream::decrypt(UInt64 file_offset, char * data, size_t data_si
UInt64 block_index = file_offset / block_size;
unsigned char iv[block_size];
initIV(block_index, iv);
LOG_DEBUG(
DB::Logger::get("ffff"),
"AESCTRCipher decrypt, before,"
" block_index={} init_iv_high/low={}/{}"
" file_offset={} data_size={} iv={} data={}",
block_index,
initial_iv_high,
initial_iv_low,
file_offset,
data_size,
Redact::keyToHexString(reinterpret_cast<char *>(iv), block_size),
Redact::keyToHexString(data, data_size));
DB::Encryption::Cipher(file_offset, data, data_size, key, method, iv, /*is_encrypt=*/false);
LOG_DEBUG(
DB::Logger::get("ffff"),
"AESCTRCipher decrypt, after,"
" block_index={} init_iv_high/low={}/{}"
" file_offset={} data_size={} iv={} data={}",
block_index,
initial_iv_high,
initial_iv_low,
file_offset,
data_size,
Redact::keyToHexString(reinterpret_cast<char *>(iv), block_size),
Redact::keyToHexString(data, data_size));
}

} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/IO/Encryption/DataKeyManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ DataKeyManager::DataKeyManager(EngineStoreServerWrap * tiflash_instance_wrap_)
FileEncryptionInfo DataKeyManager::getInfo(const EncryptionPath & ep)
{
auto r = tiflash_instance_wrap->proxy_helper->getFile(Poco::Path(ep.full_path).toString());
LOG_DEBUG(Logger::get("fff"), "KeyManager::getInfo -> {}", ep.full_path);
if (unlikely(!r.isValid()))
{
throw DB::TiFlashException(
Expand All @@ -40,6 +41,7 @@ FileEncryptionInfo DataKeyManager::getInfo(const EncryptionPath & ep)
FileEncryptionInfo DataKeyManager::newInfo(const EncryptionPath & ep)
{
auto r = tiflash_instance_wrap->proxy_helper->newFile(Poco::Path(ep.full_path).toString());
LOG_DEBUG(Logger::get("fff"), "KeyManager::newInfo -> {}", ep.full_path);
if (unlikely(!r.isValid()))
{
throw DB::TiFlashException(
Expand All @@ -54,6 +56,7 @@ FileEncryptionInfo DataKeyManager::newInfo(const EncryptionPath & ep)
void DataKeyManager::deleteInfo(const EncryptionPath & ep, bool throw_on_error)
{
auto r = tiflash_instance_wrap->proxy_helper->deleteFile(Poco::Path(ep.full_path).toString());
LOG_DEBUG(Logger::get("fff"), "KeyManager::deleteInfo -> {}", ep.full_path);
if (unlikely(!r.isValid() && throw_on_error))
{
throw DB::TiFlashException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,13 +544,15 @@ bool DeltaMergeStore::segmentEnsureStableIndexAsync(const SegmentPtr & segment)
if (!build_info.indexes_to_build || build_info.indexes_to_build->empty() || build_info.dm_files.empty())
return false;

#if 0
if (auto encryption_enabled = global_context.getFileProvider()->isEncryptionEnabled(); encryption_enabled)
{
segment->setIndexBuildError(
build_info.indexesIDs(),
"Encryption-at-rest on TiFlash is enabled, which does not support building vector index");
return false;
}
#endif

auto store_weak_ptr = weak_from_this();
auto tracing_id
Expand Down
30 changes: 20 additions & 10 deletions dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,26 +327,36 @@ void DMFileMetaV2::finalizeSmallFiles(
WriteLimiterPtr & write_limiter)
{
auto copy_file_to_cur = [&](const String & fname, UInt64 fsize) {
// check whether we need to create a new .merged file
checkMergedFile(writer, file_provider, write_limiter);

auto file = openForRead(
file_provider,
subFilePath(fname),
EncryptionPath(encryptionBasePath(), fname, keyspace_id),
fsize);
std::vector<char> read_buf(fsize);
auto read_size = file.readBig(read_buf.data(), read_buf.size());
std::vector<char> in_mem_buf(fsize);
auto read_size = file.readBig(in_mem_buf.data(), in_mem_buf.size());
RUNTIME_CHECK(read_size == fsize, fname, read_size, fsize);

writer.buffer->write(read_buf.data(), read_buf.size());
merged_sub_file_infos.emplace(
writer.buffer->write(in_mem_buf.data(), in_mem_buf.size());
auto merged_info = MergedSubFileInfo(
fname,
MergedSubFileInfo(
fname,
writer.file_info.number,
/*offset*/ writer.file_info.size,
/*size*/ read_buf.size()));
writer.file_info.size += read_buf.size();
writer.file_info.number,
/*offset*/ writer.file_info.size,
/*size*/ in_mem_buf.size());
merged_sub_file_infos.emplace(fname, merged_info);
LOG_DEBUG(
log,
"merged, fname={} fsize={} merged_num={} offset={} size={} plaintext={}",
fname,
fsize,
merged_info.number,
merged_info.offset,
merged_info.size,
Redact::keyToHexString(in_mem_buf.data(), in_mem_buf.size()));

writer.file_info.size += in_mem_buf.size();
};

std::vector<String> delete_file_name;
Expand Down
23 changes: 15 additions & 8 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,15 @@ void DMFilePackFilter::loadIndex(
if (info == dmfile_meta->merged_sub_file_infos.end())
{
throw Exception(
fmt::format("Unknown index file {}", dmfile->colIndexPath(file_name_base)),
ErrorCodes::LOGICAL_ERROR);
ErrorCodes::LOGICAL_ERROR,
"Unknown index file {}",
dmfile->colIndexPath(file_name_base));
}

auto file_path = dmfile->meta->mergedPath(info->second.number);
auto encryp_path = dmfile_meta->encryptionMergedPath(info->second.number);
auto offset = info->second.offset;
auto data_size = info->second.size;
const auto file_path = dmfile->meta->mergedPath(info->second.number);
const auto encryp_path = dmfile_meta->encryptionMergedPath(info->second.number);
const auto offset = info->second.offset;
const auto data_size = info->second.size;

auto buffer = ReadBufferFromRandomAccessFileBuilder::build(
file_provider,
Expand All @@ -216,12 +217,18 @@ void DMFilePackFilter::loadIndex(

String raw_data;
raw_data.resize(data_size);

buffer.read(reinterpret_cast<char *>(raw_data.data()), data_size);
LOG_DEBUG(
Logger::get(),
"read from merged, fname={} fsize={} offset={} plaintext={}",
dmfile->colIndexPath(file_name_base),
data_size,
offset,
Redact::keyToHexString(raw_data.data(), data_size));

auto buf = ChecksumReadBufferBuilder::build(
std::move(raw_data),
dmfile->colDataPath(file_name_base),
dmfile->colIndexPath(file_name_base),
dmfile->getConfiguration()->getChecksumFrameLength(),
dmfile->getConfiguration()->getChecksumAlgorithm(),
dmfile->getConfiguration()->getChecksumFrameLength());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ void DMFileV3IncrementWriter::writeAndIncludeMetaFile()
options.file_provider,
meta_file_path_for_write, // Must not use meta->metaPath(), because DMFile may be a S3 DMFile
EncryptionPath(local_path, meta_file_name),
/*create_new_encryption_info*/ true,
/*create_new_encryption_info*/ false,
options.write_limiter,
DMFileMetaV2::meta_buffer_size);

Expand Down
24 changes: 22 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type)
}
};
#endif

// TODO: move this callback into DMFileMeta/DMFileMetaV2
auto callback = [&](const IDataType::SubstreamPath & substream) {
const auto stream_name = DMFile::getFileNameBase(col_id, substream);
auto & stream = column_streams.at(stream_name);
Expand Down Expand Up @@ -310,11 +312,11 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type)
#endif

// write index info into merged_file_writer
if (stream->minmaxes and !is_empty_file)
if (stream->minmaxes && !is_empty_file)
{
dmfile_meta->checkMergedFile(merged_file, file_provider, write_limiter);

auto fname = colIndexFileName(stream_name);
const auto fname = colIndexFileName(stream_name);

auto buffer = ChecksumWriteBufferBuilder::build(
merged_file.buffer,
Expand All @@ -334,6 +336,15 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type)

merged_file.file_info.size += col_stat.index_bytes;
buffer->next();

LOG_DEBUG(
Logger::get(),
"merged, fname={} fsize={} merged_num={} offset={} size={}",
fname,
col_stat.index_bytes,
info.number,
info.offset,
info.size);
}

// write mark into merged_file_writer
Expand Down Expand Up @@ -361,6 +372,15 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type)
merged_file.file_info.size += mark_size;
buffer->next();

LOG_DEBUG(
Logger::get(),
"merged, fname={} fsize={} merged_num={} offset={} size={}",
fname,
col_stat.index_bytes,
info.number,
info.offset,
info.size);

if (is_null)
{
col_stat.nullmap_mark_bytes = mark_size;
Expand Down
23 changes: 21 additions & 2 deletions dbms/src/Storages/DeltaMerge/ScanContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,11 @@ String ScanContext::toJson() const

json->set("read_bytes", user_read_bytes.load());

json->set("disagg_cache_hit_size", disagg_read_cache_hit_size.load());
json->set("disagg_cache_miss_size", disagg_read_cache_miss_size.load());
if (disagg_read_cache_hit_size.load() > 0 && disagg_read_cache_miss_size.load() > 0)
{
json->set("disagg_cache_hit_size", disagg_read_cache_hit_size.load());
json->set("disagg_cache_miss_size", disagg_read_cache_miss_size.load());
}

json->set("num_segments", num_segments.load());
json->set("num_read_tasks", num_read_tasks.load());
Expand Down Expand Up @@ -164,6 +167,22 @@ String ScanContext::toJson() const
};
json->set("region_num_of_instance", to_json_array(region_num_of_instance));

if (total_vector_idx_load_from_cache.load() //
+ total_vector_idx_load_from_disk.load() //
+ total_vector_idx_load_from_s3.load()
> 0)
{
Poco::JSON::Object::Ptr vec_idx = new Poco::JSON::Object();
vec_idx->set("tot_load", total_vector_idx_load_time_ms.load());
vec_idx->set("load_s3", total_vector_idx_load_from_s3.load());
vec_idx->set("load_disk", total_vector_idx_load_from_disk.load());
vec_idx->set("load_cache", total_vector_idx_load_from_cache.load());
vec_idx->set("tot_search", total_vector_idx_search_time_ms.load());
vec_idx->set("read_vec", total_vector_idx_read_vec_time_ms.load());
vec_idx->set("read_others", total_vector_idx_read_others_time_ms.load());
json->set("vector_idx", vec_idx);
}

std::stringstream buf;
json->stringify(buf);
return buf.str();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,12 +612,7 @@ try
ASSERT_EQ(store->getLocalIndexInfosSnapshot(), nullptr);
}

const size_t num_rows_write = 128;

// write to store before index built
write(num_rows_write);
// trigger mergeDelta for all segments
triggerMergeDelta();
const size_t num_rows_write = 5;

{
// Add vecotr index
Expand Down Expand Up @@ -645,14 +640,19 @@ try
ASSERT_EQ(store->local_index_infos->size(), 1);
}

// write to store before index built
write(num_rows_write);
// trigger mergeDelta for all segments
triggerMergeDelta();

// check stable index has built for all segments
waitStableIndexReady();

const auto range = RowKeyRange::newAll(store->is_common_handle, store->rowkey_column_size);

// read from store
{
read(range, EMPTY_FILTER, colVecFloat32("[0, 128)", vec_column_name, vec_column_id));
read(range, EMPTY_FILTER, colVecFloat32("[0, 5)", vec_column_name, vec_column_id));
}

auto ann_query_info = std::make_shared<tipb::ANNQueryInfo>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class VectorIndexTestUtils
vec.push_back(static_cast<Float64>(v));
data_in_array.push_back(vec);
}
return ::DB::tests::createVecFloat32Column<Array>(data_in_array, name, column_id);
return ::DB::tests::createNullableVecFloat32Column<Array>(data_in_array, name, column_id);
}

static String encodeVectorFloat32(const std::vector<Float32> & vec)
Expand All @@ -68,7 +68,7 @@ class VectorIndexTestUtils
ColumnDefine cdVec() const
{
// When used in read, no need to assign vector_index.
return ColumnDefine(vec_column_id, vec_column_name, ::DB::tests::typeFromString("Array(Float32)"));
return ColumnDefine(vec_column_id, vec_column_name, ::DB::tests::typeFromString("Nullable(Array(Float32))"));
}

static size_t cleanVectorCacheEntries(const std::shared_ptr<VectorIndexCache> & cache)
Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Storages/KVStore/FFI/FileEncryption.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Logger.h>
#include <Common/RedactHelpers.h>
#include <Common/TiFlashException.h>
#include <IO/Encryption/AESCTRCipherStream.h>
#include <IO/Endian.h>
#include <Storages/KVStore/FFI/FileEncryption.h>
#include <common/logger_useful.h>
#include <openssl/md5.h>

#include <magic_enum.hpp>
Expand Down Expand Up @@ -51,6 +54,15 @@ BlockAccessCipherStreamPtr FileEncryptionInfo::createCipherStream(
DB::Encryption::blockSize(method));
auto iv_high = readBigEndian<uint64_t>(reinterpret_cast<const char *>(iv->data()));
auto iv_low = readBigEndian<uint64_t>(reinterpret_cast<const char *>(iv->data() + sizeof(uint64_t)));
LOG_DEBUG(
DB::Logger::get("ffff"),
"createCipherStream, before,"
"enc_info.full_path/file_name={}/{}"
" key={} iv={}",
encryption_path.full_path,
encryption_path.file_name,
Redact::keyToHexString(reinterpret_cast<char *>(key->data()), key->size()),
Redact::keyToHexString(iv->data(), iv->size()));
// Currently all encryption info are stored in one file called file.dict.
// Every update of file.dict will sync the whole file.
// So when the file is too large, the update cost increases.
Expand Down
Loading

0 comments on commit 5b0c653

Please sign in to comment.