Skip to content

Commit

Permalink
add unit case
Browse files Browse the repository at this point in the history
  • Loading branch information
AntiTopQuark committed Aug 28, 2024
1 parent 3622d44 commit ba0b85d
Show file tree
Hide file tree
Showing 24 changed files with 589 additions and 248 deletions.
2 changes: 1 addition & 1 deletion src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ Config::Config() {
{"rocksdb.write_options.low_pri", true, new YesNoField(&rocks_db.write_options.low_pri, false)},
{"rocksdb.write_options.memtable_insert_hint_per_batch", true,
new YesNoField(&rocks_db.write_options.memtable_insert_hint_per_batch, false)},
{"rocksdb.write_options.write_batch_max_bytes", true,
{"rocksdb.write_options.write_batch_max_bytes", false,
new IntField(&rocks_db.write_options.write_batch_max_bytes, 0, 0, INT_MAX)},

/* rocksdb read options */
Expand Down
65 changes: 50 additions & 15 deletions src/search/hnsw_indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,15 @@ StatusOr<HnswNodeFieldMetadata> HnswNode::DecodeMetadata(engine::Context& ctx, c
return metadata;
}

void HnswNode::PutMetadata(HnswNodeFieldMetadata* node_meta, const SearchKey& search_key, engine::Storage* storage,
Status HnswNode::PutMetadata(HnswNodeFieldMetadata* node_meta, const SearchKey& search_key, engine::Storage* storage,
rocksdb::WriteBatchBase* batch) const {
std::string updated_metadata;
node_meta->Encode(&updated_metadata);
batch->Put(storage->GetCFHandle(ColumnFamilyID::Search), search_key.ConstructHnswNode(level, key), updated_metadata);
auto s = batch->Put(storage->GetCFHandle(ColumnFamilyID::Search), search_key.ConstructHnswNode(level, key), updated_metadata);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
return Status::OK();
}

void HnswNode::DecodeNeighbours(engine::Context& ctx, const SearchKey& search_key) {
Expand All @@ -75,11 +79,16 @@ void HnswNode::DecodeNeighbours(engine::Context& ctx, const SearchKey& search_ke
Status HnswNode::AddNeighbour(engine::Context& ctx, const NodeKey& neighbour_key, const SearchKey& search_key,
rocksdb::WriteBatchBase* batch) const {
auto edge_index_key = search_key.ConstructHnswEdge(level, key, neighbour_key);
batch->Put(ctx.storage->GetCFHandle(ColumnFamilyID::Search), edge_index_key, Slice());

auto rocket_s = batch->Put(ctx.storage->GetCFHandle(ColumnFamilyID::Search), edge_index_key, Slice());
if (!rocket_s.ok()) {
return {Status::NotOK, rocket_s.ToString()};
}
HnswNodeFieldMetadata node_metadata = GET_OR_RET(DecodeMetadata(ctx, search_key));
node_metadata.num_neighbours++;
PutMetadata(&node_metadata, search_key, ctx.storage, batch);
auto s = PutMetadata(&node_metadata, search_key, ctx.storage, batch);
if (!s.IsOK()) {
return s;
}
return Status::OK();
}

Expand All @@ -93,7 +102,10 @@ Status HnswNode::RemoveNeighbour(engine::Context& ctx, const NodeKey& neighbour_

HnswNodeFieldMetadata node_metadata = GET_OR_RET(DecodeMetadata(ctx, search_key));
node_metadata.num_neighbours--;
PutMetadata(&node_metadata, search_key, ctx.storage, batch);
auto redis_status = PutMetadata(&node_metadata, search_key, ctx.storage, batch);
if (!redis_status.IsOK()) {
return redis_status;
}
return Status::OK();
}

Expand Down Expand Up @@ -445,7 +457,10 @@ Status HnswIndex::InsertVectorEntryInternal(engine::Context& ctx, std::string_vi

// Update inserted node metadata
HnswNodeFieldMetadata node_metadata(static_cast<uint16_t>(connected_edges_set.size()), vector);
node.PutMetadata(&node_metadata, search_key, storage, batch.Get());
auto s = node.PutMetadata(&node_metadata, search_key, storage, batch.Get());
if (!s.IsOK()) {
return s;
}

// Update modified nodes metadata
for (const auto& node_edges : deleted_edges_map) {
Expand All @@ -458,14 +473,20 @@ Status HnswIndex::InsertVectorEntryInternal(engine::Context& ctx, std::string_vi
connected_edges_set.erase(current_node_key);
}
current_node_metadata.num_neighbours = new_num_neighbours;
current_node.PutMetadata(&current_node_metadata, search_key, storage, batch.Get());
s = current_node.PutMetadata(&current_node_metadata, search_key, storage, batch.Get());
if (!s.IsOK()) {
return s;
}
}

for (const auto& current_node_key : connected_edges_set) {
auto current_node = HnswNode(current_node_key, level);
HnswNodeFieldMetadata current_node_metadata = GET_OR_RET(current_node.DecodeMetadata(ctx, search_key));
current_node_metadata.num_neighbours++;
current_node.PutMetadata(&current_node_metadata, search_key, storage, batch.Get());
s = current_node.PutMetadata(&current_node_metadata, search_key, storage, batch.Get());
if (!s.IsOK()) {
return s;
}
}

entry_points.clear();
Expand All @@ -476,21 +497,30 @@ Status HnswIndex::InsertVectorEntryInternal(engine::Context& ctx, std::string_vi
} else {
auto node = HnswNode(std::string(key), 0);
HnswNodeFieldMetadata node_metadata(0, vector);
node.PutMetadata(&node_metadata, search_key, storage, batch.Get());
auto s = node.PutMetadata(&node_metadata, search_key, storage, batch.Get());
if (!s.IsOK()) {
return s;
}
metadata->num_levels = 1;
}

while (target_level > metadata->num_levels - 1) {
auto node = HnswNode(std::string(key), metadata->num_levels);
HnswNodeFieldMetadata node_metadata(0, vector);
node.PutMetadata(&node_metadata, search_key, storage, batch.Get());
auto s = node.PutMetadata(&node_metadata, search_key, storage, batch.Get());
if (!s.IsOK()) {
return s;
}
metadata->num_levels++;
}

std::string encoded_index_metadata;
metadata->Encode(&encoded_index_metadata);
auto index_meta_key = search_key.ConstructFieldMeta();
batch->Put(cf_handle, index_meta_key, encoded_index_metadata);
auto s = batch->Put(cf_handle, index_meta_key, encoded_index_metadata);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

return Status::OK();
}
Expand Down Expand Up @@ -524,7 +554,10 @@ Status HnswIndex::DeleteVectorEntry(engine::Context& ctx, std::string_view key,
auto neighbour_node = HnswNode(neighbour_key, level);
HnswNodeFieldMetadata neighbour_node_metadata = GET_OR_RET(neighbour_node.DecodeMetadata(ctx, search_key));
neighbour_node_metadata.num_neighbours--;
neighbour_node.PutMetadata(&neighbour_node_metadata, search_key, storage, batch.Get());
auto s = neighbour_node.PutMetadata(&neighbour_node_metadata, search_key, storage, batch.Get());
if (!s.IsOK()) {
return s;
}
}
}

Expand Down Expand Up @@ -559,8 +592,10 @@ Status HnswIndex::DeleteVectorEntry(engine::Context& ctx, std::string_view key,
std::string encoded_index_metadata;
metadata->Encode(&encoded_index_metadata);
auto index_meta_key = search_key.ConstructFieldMeta();
batch->Put(storage->GetCFHandle(ColumnFamilyID::Search), index_meta_key, encoded_index_metadata);

auto s = batch->Put(storage->GetCFHandle(ColumnFamilyID::Search), index_meta_key, encoded_index_metadata);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion src/search/hnsw_indexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct HnswNode {
HnswNode(NodeKey key, uint16_t level);

StatusOr<HnswNodeFieldMetadata> DecodeMetadata(engine::Context& ctx, const SearchKey& search_key) const;
void PutMetadata(HnswNodeFieldMetadata* node_meta, const SearchKey& search_key, engine::Storage* storage,
Status PutMetadata(HnswNodeFieldMetadata* node_meta, const SearchKey& search_key, engine::Storage* storage,
rocksdb::WriteBatchBase* batch) const;
void DecodeNeighbours(engine::Context& ctx, const SearchKey& search_key);

Expand Down
35 changes: 28 additions & 7 deletions src/search/index_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,28 @@ struct IndexManager {

std::string meta_val;
info->metadata.Encode(&meta_val);
batch->Put(cf, index_key.ConstructIndexMeta(), meta_val);
auto s = batch->Put(cf, index_key.ConstructIndexMeta(), meta_val);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

std::string prefix_val;
info->prefixes.Encode(&prefix_val);
batch->Put(cf, index_key.ConstructIndexPrefixes(), prefix_val);
s = batch->Put(cf, index_key.ConstructIndexPrefixes(), prefix_val);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

for (const auto &[_, field_info] : info->fields) {
SearchKey field_key(info->ns, info->name, field_info.name);

std::string field_val;
field_info.metadata->Encode(&field_val);

batch->Put(cf, field_key.ConstructFieldMeta(), field_val);
s = batch->Put(cf, field_key.ConstructFieldMeta(), field_val);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
}

if (auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); !s.ok()) {
Expand Down Expand Up @@ -231,16 +240,28 @@ struct IndexManager {

auto batch = storage->GetWriteBatchBase();

batch->Delete(cf, index_key.ConstructIndexMeta());
batch->Delete(cf, index_key.ConstructIndexPrefixes());
auto s =batch->Delete(cf, index_key.ConstructIndexMeta());
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
s = batch->Delete(cf, index_key.ConstructIndexPrefixes());
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

auto begin = index_key.ConstructAllFieldMetaBegin();
auto end = index_key.ConstructAllFieldMetaEnd();
batch->DeleteRange(cf, begin, end);
s = batch->DeleteRange(cf, begin, end);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

begin = index_key.ConstructAllFieldDataBegin();
end = index_key.ConstructAllFieldDataEnd();
batch->DeleteRange(cf, begin, end);
s = batch->DeleteRange(cf, begin, end);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

auto no_txn_ctx = engine::Context::NoTransactionContext(storage);
if (auto s = storage->Write(no_txn_ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); !s.ok()) {
Expand Down
20 changes: 16 additions & 4 deletions src/search/indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,19 @@ Status IndexUpdater::UpdateTagIndex(engine::Context &ctx, std::string_view key,
for (const auto &tag : tags_to_delete) {
auto index_key = search_key.ConstructTagFieldData(tag, key);

batch->Delete(cf_handle, index_key);
auto rocker_status = batch->Delete(cf_handle, index_key);
if (!rocker_status.ok()) {
return {Status::NotOK, rocker_status.ToString()};
}
}

for (const auto &tag : tags_to_add) {
auto index_key = search_key.ConstructTagFieldData(tag, key);

batch->Put(cf_handle, index_key, Slice());
auto rocker_status = batch->Put(cf_handle, index_key, Slice());
if (!rocker_status.ok()) {
return {Status::NotOK, rocker_status.ToString()};
}
}

auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch());
Expand All @@ -253,13 +259,19 @@ Status IndexUpdater::UpdateNumericIndex(engine::Context &ctx, std::string_view k
if (!original.IsNull()) {
auto index_key = search_key.ConstructNumericFieldData(original.Get<kqir::Numeric>(), key);

batch->Delete(cf_handle, index_key);
auto s = batch->Delete(cf_handle, index_key);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
}

if (!current.IsNull()) {
auto index_key = search_key.ConstructNumericFieldData(current.Get<kqir::Numeric>(), key);

batch->Put(cf_handle, index_key, Slice());
auto s = batch->Put(cf_handle, index_key, Slice());
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
}
auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!s.ok()) return {Status::NotOK, s.ToString()};
Expand Down
43 changes: 34 additions & 9 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,14 @@ rocksdb::Status Database::Expire(engine::Context &ctx, const Slice &user_key, ui
}
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisNone, {std::to_string(kRedisCmdExpire)});
batch->PutLogData(log_data.Encode());
batch->Put(metadata_cf_handle_, ns_key, value);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) {
return s;
}
s = batch->Put(metadata_cf_handle_, ns_key, value);
if (!s.ok()) {
return s;
}
s = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
return s;
}
Expand Down Expand Up @@ -170,7 +176,10 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector<Slice> &k

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisNone);
batch->PutLogData(log_data.Encode());
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) {
return s;
}

std::vector<Slice> slice_keys;
slice_keys.reserve(lock_keys.size());
Expand All @@ -194,7 +203,8 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector<Slice> &k
if (!s.ok()) continue;
if (metadata.Expired()) continue;

batch->Delete(metadata_cf_handle_, lock_keys[i]);
s = batch->Delete(metadata_cf_handle_, lock_keys[i]);
if (!s.ok()) return s;
*deleted_cnt += 1;
}

Expand Down Expand Up @@ -662,16 +672,25 @@ rocksdb::Status Database::Copy(engine::Context &ctx, const std::string &key, con

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(type);
batch->PutLogData(log_data.Encode());
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) {
return s;
}

engine::DBIterator iter(ctx, ctx.GetReadOptions());
iter.Seek(key);

if (delete_old) {
batch->Delete(metadata_cf_handle_, key);
s = batch->Delete(metadata_cf_handle_, key);
if (!s.ok()) {
return s;
}
}
// copy metadata
batch->Put(metadata_cf_handle_, new_key, iter.Value());
s = batch->Put(metadata_cf_handle_, new_key, iter.Value());
if (!s.ok()) {
return s;
}

auto subkey_iter = iter.GetSubKeyIterator();

Expand All @@ -683,7 +702,10 @@ rocksdb::Status Database::Copy(engine::Context &ctx, const std::string &key, con
std::string to_ikey =
InternalKey(new_key, from_ikey.GetSubKey(), from_ikey.GetVersion(), storage_->IsSlotIdEncoded()).Encode();
// copy sub key
batch->Put(subkey_iter->ColumnFamilyHandle(), to_ikey, subkey_iter->Value());
auto s = batch->Put(subkey_iter->ColumnFamilyHandle(), to_ikey, subkey_iter->Value());
if (!s.ok()) {
return s;
}

// The ZSET type stores an extra score and member field inside `zset_score` column family
// while compared to other composed data structures. The purpose is to allow to seek by score.
Expand All @@ -693,7 +715,10 @@ rocksdb::Status Database::Copy(engine::Context &ctx, const std::string &key, con
// copy score key
std::string score_key =
InternalKey(new_key, score_bytes, from_ikey.GetVersion(), storage_->IsSlotIdEncoded()).Encode();
batch->Put(zset_score_cf, score_key, Slice());
auto s = batch->Put(zset_score_cf, score_key, Slice());
if (!s.ok()) {
return s;
}
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/storage/redis_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ rocksdb::Status PubSub::Publish(engine::Context &ctx, const Slice &channel, cons
return rocksdb::Status::NotSupported("can't publish to db in slave mode");
}
auto batch = storage_->GetWriteBatchBase();
batch->Put(pubsub_cf_handle_, channel, value);
auto s = batch->Put(pubsub_cf_handle_, channel, value);
if (!s.ok()) {
return s;
}
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

Expand Down
Loading

0 comments on commit ba0b85d

Please sign in to comment.