Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support limit WriteBatch size #2508

Merged
merged 9 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -967,5 +967,12 @@ rocksdb.rate_limiter_auto_tuned yes
# Default: yes
# rocksdb.avoid_unnecessary_blocking_io yes

# Specifies the maximum size in bytes for a write batch in RocksDB.
# If set to 0, there is no size limit for write batches.
# This option can help control memory usage and manage large WriteBatch operations more effectively.
#
# Default: 0
# rocksdb.write_options.write_batch_max_bytes 0

################################ NAMESPACE #####################################
# namespace.test change.me
2 changes: 2 additions & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ Config::Config() {
{"rocksdb.write_options.low_pri", true, new YesNoField(&rocks_db.write_options.low_pri, false)},
{"rocksdb.write_options.memtable_insert_hint_per_batch", true,
new YesNoField(&rocks_db.write_options.memtable_insert_hint_per_batch, false)},
{"rocksdb.write_options.write_batch_max_bytes", false,
new IntField(&rocks_db.write_options.write_batch_max_bytes, 0, 0, INT_MAX)},

/* rocksdb read options */
{"rocksdb.read_options.async_io", false, new YesNoField(&rocks_db.read_options.async_io, true)},
Expand Down
1 change: 1 addition & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ struct Config {
bool no_slowdown;
bool low_pri;
bool memtable_insert_hint_per_batch;
int write_batch_max_bytes;
} write_options;

struct ReadOptions {
Expand Down
64 changes: 46 additions & 18 deletions src/search/hnsw_indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,16 @@ StatusOr<HnswNodeFieldMetadata> HnswNode::DecodeMetadata(engine::Context& ctx, c
return metadata;
}

void HnswNode::PutMetadata(HnswNodeFieldMetadata* node_meta, const SearchKey& search_key, engine::Storage* storage,
rocksdb::WriteBatchBase* batch) const {
Status HnswNode::PutMetadata(HnswNodeFieldMetadata* node_meta, const SearchKey& search_key, engine::Storage* storage,
rocksdb::WriteBatchBase* batch) const {
std::string updated_metadata;
node_meta->Encode(&updated_metadata);
batch->Put(storage->GetCFHandle(ColumnFamilyID::Search), search_key.ConstructHnswNode(level, key), updated_metadata);
auto s = batch->Put(storage->GetCFHandle(ColumnFamilyID::Search), search_key.ConstructHnswNode(level, key),
updated_metadata);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
return Status::OK();
}

void HnswNode::DecodeNeighbours(engine::Context& ctx, const SearchKey& search_key) {
Expand All @@ -75,12 +80,13 @@ void HnswNode::DecodeNeighbours(engine::Context& ctx, const SearchKey& search_ke
Status HnswNode::AddNeighbour(engine::Context& ctx, const NodeKey& neighbour_key, const SearchKey& search_key,
rocksdb::WriteBatchBase* batch) const {
auto edge_index_key = search_key.ConstructHnswEdge(level, key, neighbour_key);
batch->Put(ctx.storage->GetCFHandle(ColumnFamilyID::Search), edge_index_key, Slice());

auto rocket_s = batch->Put(ctx.storage->GetCFHandle(ColumnFamilyID::Search), edge_index_key, Slice());
if (!rocket_s.ok()) {
return {Status::NotOK, rocket_s.ToString()};
}
HnswNodeFieldMetadata node_metadata = GET_OR_RET(DecodeMetadata(ctx, search_key));
node_metadata.num_neighbours++;
PutMetadata(&node_metadata, search_key, ctx.storage, batch);
return Status::OK();
return PutMetadata(&node_metadata, search_key, ctx.storage, batch);
}

Status HnswNode::RemoveNeighbour(engine::Context& ctx, const NodeKey& neighbour_key, const SearchKey& search_key,
Expand All @@ -93,8 +99,7 @@ Status HnswNode::RemoveNeighbour(engine::Context& ctx, const NodeKey& neighbour_

HnswNodeFieldMetadata node_metadata = GET_OR_RET(DecodeMetadata(ctx, search_key));
node_metadata.num_neighbours--;
PutMetadata(&node_metadata, search_key, ctx.storage, batch);
return Status::OK();
return PutMetadata(&node_metadata, search_key, ctx.storage, batch);
}

Status VectorItem::Create(NodeKey key, const kqir::NumericArray& vector, const HnswVectorFieldMetadata* metadata,
Expand Down Expand Up @@ -445,7 +450,10 @@ Status HnswIndex::InsertVectorEntryInternal(engine::Context& ctx, std::string_vi

// Update inserted node metadata
HnswNodeFieldMetadata node_metadata(static_cast<uint16_t>(connected_edges_set.size()), vector);
node.PutMetadata(&node_metadata, search_key, storage, batch.Get());
auto s = node.PutMetadata(&node_metadata, search_key, storage, batch.Get());
if (!s.IsOK()) {
return s;
}

// Update modified nodes metadata
for (const auto& node_edges : deleted_edges_map) {
Expand All @@ -458,14 +466,20 @@ Status HnswIndex::InsertVectorEntryInternal(engine::Context& ctx, std::string_vi
connected_edges_set.erase(current_node_key);
}
current_node_metadata.num_neighbours = new_num_neighbours;
current_node.PutMetadata(&current_node_metadata, search_key, storage, batch.Get());
s = current_node.PutMetadata(&current_node_metadata, search_key, storage, batch.Get());
if (!s.IsOK()) {
return s;
}
}

for (const auto& current_node_key : connected_edges_set) {
auto current_node = HnswNode(current_node_key, level);
HnswNodeFieldMetadata current_node_metadata = GET_OR_RET(current_node.DecodeMetadata(ctx, search_key));
current_node_metadata.num_neighbours++;
current_node.PutMetadata(&current_node_metadata, search_key, storage, batch.Get());
s = current_node.PutMetadata(&current_node_metadata, search_key, storage, batch.Get());
if (!s.IsOK()) {
return s;
}
}

entry_points.clear();
Expand All @@ -476,21 +490,30 @@ Status HnswIndex::InsertVectorEntryInternal(engine::Context& ctx, std::string_vi
} else {
auto node = HnswNode(std::string(key), 0);
HnswNodeFieldMetadata node_metadata(0, vector);
node.PutMetadata(&node_metadata, search_key, storage, batch.Get());
auto s = node.PutMetadata(&node_metadata, search_key, storage, batch.Get());
if (!s.IsOK()) {
return s;
}
metadata->num_levels = 1;
}

while (target_level > metadata->num_levels - 1) {
auto node = HnswNode(std::string(key), metadata->num_levels);
HnswNodeFieldMetadata node_metadata(0, vector);
node.PutMetadata(&node_metadata, search_key, storage, batch.Get());
auto s = node.PutMetadata(&node_metadata, search_key, storage, batch.Get());
if (!s.IsOK()) {
return s;
}
metadata->num_levels++;
}

std::string encoded_index_metadata;
metadata->Encode(&encoded_index_metadata);
auto index_meta_key = search_key.ConstructFieldMeta();
batch->Put(cf_handle, index_meta_key, encoded_index_metadata);
auto s = batch->Put(cf_handle, index_meta_key, encoded_index_metadata);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

return Status::OK();
}
Expand Down Expand Up @@ -524,7 +547,10 @@ Status HnswIndex::DeleteVectorEntry(engine::Context& ctx, std::string_view key,
auto neighbour_node = HnswNode(neighbour_key, level);
HnswNodeFieldMetadata neighbour_node_metadata = GET_OR_RET(neighbour_node.DecodeMetadata(ctx, search_key));
neighbour_node_metadata.num_neighbours--;
neighbour_node.PutMetadata(&neighbour_node_metadata, search_key, storage, batch.Get());
auto s = neighbour_node.PutMetadata(&neighbour_node_metadata, search_key, storage, batch.Get());
if (!s.IsOK()) {
return s;
}
}
}

Expand Down Expand Up @@ -559,8 +585,10 @@ Status HnswIndex::DeleteVectorEntry(engine::Context& ctx, std::string_view key,
std::string encoded_index_metadata;
metadata->Encode(&encoded_index_metadata);
auto index_meta_key = search_key.ConstructFieldMeta();
batch->Put(storage->GetCFHandle(ColumnFamilyID::Search), index_meta_key, encoded_index_metadata);

auto s = batch->Put(storage->GetCFHandle(ColumnFamilyID::Search), index_meta_key, encoded_index_metadata);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
return Status::OK();
}

Expand Down
4 changes: 2 additions & 2 deletions src/search/hnsw_indexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ struct HnswNode {
HnswNode(NodeKey key, uint16_t level);

StatusOr<HnswNodeFieldMetadata> DecodeMetadata(engine::Context& ctx, const SearchKey& search_key) const;
void PutMetadata(HnswNodeFieldMetadata* node_meta, const SearchKey& search_key, engine::Storage* storage,
rocksdb::WriteBatchBase* batch) const;
Status PutMetadata(HnswNodeFieldMetadata* node_meta, const SearchKey& search_key, engine::Storage* storage,
rocksdb::WriteBatchBase* batch) const;
void DecodeNeighbours(engine::Context& ctx, const SearchKey& search_key);

// For testing purpose
Expand Down
35 changes: 28 additions & 7 deletions src/search/index_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,28 @@ struct IndexManager {

std::string meta_val;
info->metadata.Encode(&meta_val);
batch->Put(cf, index_key.ConstructIndexMeta(), meta_val);
auto s = batch->Put(cf, index_key.ConstructIndexMeta(), meta_val);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

std::string prefix_val;
info->prefixes.Encode(&prefix_val);
batch->Put(cf, index_key.ConstructIndexPrefixes(), prefix_val);
s = batch->Put(cf, index_key.ConstructIndexPrefixes(), prefix_val);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

for (const auto &[_, field_info] : info->fields) {
SearchKey field_key(info->ns, info->name, field_info.name);

std::string field_val;
field_info.metadata->Encode(&field_val);

batch->Put(cf, field_key.ConstructFieldMeta(), field_val);
s = batch->Put(cf, field_key.ConstructFieldMeta(), field_val);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
}

if (auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); !s.ok()) {
Expand Down Expand Up @@ -231,16 +240,28 @@ struct IndexManager {

auto batch = storage->GetWriteBatchBase();

batch->Delete(cf, index_key.ConstructIndexMeta());
batch->Delete(cf, index_key.ConstructIndexPrefixes());
auto s = batch->Delete(cf, index_key.ConstructIndexMeta());
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
s = batch->Delete(cf, index_key.ConstructIndexPrefixes());
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

auto begin = index_key.ConstructAllFieldMetaBegin();
auto end = index_key.ConstructAllFieldMetaEnd();
batch->DeleteRange(cf, begin, end);
s = batch->DeleteRange(cf, begin, end);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

begin = index_key.ConstructAllFieldDataBegin();
end = index_key.ConstructAllFieldDataEnd();
batch->DeleteRange(cf, begin, end);
s = batch->DeleteRange(cf, begin, end);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

auto no_txn_ctx = engine::Context::NoTransactionContext(storage);
if (auto s = storage->Write(no_txn_ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); !s.ok()) {
Expand Down
20 changes: 16 additions & 4 deletions src/search/indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,19 @@ Status IndexUpdater::UpdateTagIndex(engine::Context &ctx, std::string_view key,
for (const auto &tag : tags_to_delete) {
auto index_key = search_key.ConstructTagFieldData(tag, key);

batch->Delete(cf_handle, index_key);
auto s = batch->Delete(cf_handle, index_key);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
}

for (const auto &tag : tags_to_add) {
auto index_key = search_key.ConstructTagFieldData(tag, key);

batch->Put(cf_handle, index_key, Slice());
auto s = batch->Put(cf_handle, index_key, Slice());
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
}

auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch());
Expand All @@ -253,13 +259,19 @@ Status IndexUpdater::UpdateNumericIndex(engine::Context &ctx, std::string_view k
if (!original.IsNull()) {
auto index_key = search_key.ConstructNumericFieldData(original.Get<kqir::Numeric>(), key);

batch->Delete(cf_handle, index_key);
auto s = batch->Delete(cf_handle, index_key);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
}

if (!current.IsNull()) {
auto index_key = search_key.ConstructNumericFieldData(current.Get<kqir::Numeric>(), key);

batch->Put(cf_handle, index_key, Slice());
auto s = batch->Put(cf_handle, index_key, Slice());
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
}
auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!s.ok()) return {Status::NotOK, s.ToString()};
Expand Down
43 changes: 34 additions & 9 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,14 @@ rocksdb::Status Database::Expire(engine::Context &ctx, const Slice &user_key, ui
}
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisNone, {std::to_string(kRedisCmdExpire)});
batch->PutLogData(log_data.Encode());
batch->Put(metadata_cf_handle_, ns_key, value);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) {
return s;
}
s = batch->Put(metadata_cf_handle_, ns_key, value);
if (!s.ok()) {
return s;
}
s = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
return s;
}
Expand Down Expand Up @@ -170,7 +176,10 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector<Slice> &k

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisNone);
batch->PutLogData(log_data.Encode());
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) {
return s;
}

std::vector<Slice> slice_keys;
slice_keys.reserve(lock_keys.size());
Expand All @@ -194,7 +203,8 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector<Slice> &k
if (!s.ok()) continue;
if (metadata.Expired()) continue;

batch->Delete(metadata_cf_handle_, lock_keys[i]);
s = batch->Delete(metadata_cf_handle_, lock_keys[i]);
if (!s.ok()) return s;
*deleted_cnt += 1;
}

Expand Down Expand Up @@ -662,16 +672,25 @@ rocksdb::Status Database::Copy(engine::Context &ctx, const std::string &key, con

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(type);
batch->PutLogData(log_data.Encode());
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) {
return s;
}

engine::DBIterator iter(ctx, ctx.GetReadOptions());
iter.Seek(key);

if (delete_old) {
batch->Delete(metadata_cf_handle_, key);
s = batch->Delete(metadata_cf_handle_, key);
if (!s.ok()) {
return s;
}
}
// copy metadata
batch->Put(metadata_cf_handle_, new_key, iter.Value());
s = batch->Put(metadata_cf_handle_, new_key, iter.Value());
if (!s.ok()) {
return s;
}

auto subkey_iter = iter.GetSubKeyIterator();

Expand All @@ -683,7 +702,10 @@ rocksdb::Status Database::Copy(engine::Context &ctx, const std::string &key, con
std::string to_ikey =
InternalKey(new_key, from_ikey.GetSubKey(), from_ikey.GetVersion(), storage_->IsSlotIdEncoded()).Encode();
// copy sub key
batch->Put(subkey_iter->ColumnFamilyHandle(), to_ikey, subkey_iter->Value());
auto s = batch->Put(subkey_iter->ColumnFamilyHandle(), to_ikey, subkey_iter->Value());
if (!s.ok()) {
return s;
}

// The ZSET type stores an extra score and member field inside `zset_score` column family
// while compared to other composed data structures. The purpose is to allow to seek by score.
Expand All @@ -693,7 +715,10 @@ rocksdb::Status Database::Copy(engine::Context &ctx, const std::string &key, con
// copy score key
std::string score_key =
InternalKey(new_key, score_bytes, from_ikey.GetVersion(), storage_->IsSlotIdEncoded()).Encode();
batch->Put(zset_score_cf, score_key, Slice());
auto s = batch->Put(zset_score_cf, score_key, Slice());
if (!s.ok()) {
return s;
}
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/storage/redis_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ rocksdb::Status PubSub::Publish(engine::Context &ctx, const Slice &channel, cons
return rocksdb::Status::NotSupported("can't publish to db in slave mode");
}
auto batch = storage_->GetWriteBatchBase();
batch->Put(pubsub_cf_handle_, channel, value);
auto s = batch->Put(pubsub_cf_handle_, channel, value);
if (!s.ok()) {
return s;
}
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

Expand Down
Loading
Loading