Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the support of SCALING for bloom filter #1721

Merged
merged 14 commits into from
Sep 12, 2023
5 changes: 2 additions & 3 deletions src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ class BloomChainMetadata : public Metadata {
using Metadata::Decode;
rocksdb::Status Decode(Slice *bytes) override;

/// Get the total capacity of the bloom chain (the sum capacity of all sub-filters)
///
/// @return the total capacity value
uint32_t GetCapacity() const;

bool IsScaling() const { return expansion != 0; };
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
};
61 changes: 42 additions & 19 deletions src/types/redis_bloom_chain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,31 +56,44 @@ rocksdb::Status BloomChain::createBloomChain(const Slice &ns_key, double error_r
block_split_bloom_filter.Init(metadata->bloom_bytes);

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisBloomFilter, {"createSBChain"});
WriteBatchLogData log_data(kRedisBloomFilter, {"createBloomChain"});
batch->PutLogData(log_data.Encode());

std::string sb_chain_meta_bytes;
metadata->Encode(&sb_chain_meta_bytes);
batch->Put(metadata_cf_handle_, ns_key, sb_chain_meta_bytes);
std::string bloom_chain_meta_bytes;
metadata->Encode(&bloom_chain_meta_bytes);
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_meta_bytes);

std::string bf_key = getBFKey(ns_key, *metadata, metadata->n_filters - 1);
batch->Put(bf_key, block_split_bloom_filter.GetData());

return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status BloomChain::bloomAdd(const Slice &bf_key, const std::string &item) {
std::string bf_data;
rocksdb::Status s = storage_->Get(rocksdb::ReadOptions(), bf_key, &bf_data);
if (!s.ok()) return s;
void BloomChain::createBloomFilterInBatch(const Slice &ns_key, BloomChainMetadata *metadata,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch, std::string *bf_data) {
uint32_t bloom_filter_bytes = BlockSplitBloomFilter::OptimalNumOfBytes(
static_cast<uint32_t>(metadata->base_capacity * pow(metadata->expansion, metadata->n_filters)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might not strong related to this patch, should we enforce a "hardlimit" to avoid too large BloomFilter?

cc @PragmaTwice @git-hulk

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Redis have some limits here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RedisBloom has a maximum Cuckoo Filter n_filters. And don't have limit

I think our bitmap has some limits. And BlockSplitBloomFilter limit 128MB, perhaps if filter is too large, we need to reject and return to user?

Copy link
Member

@git-hulk git-hulk Sep 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

128MiB is also too large for one key, maybe we can improve this by separating them into multi subkeys in future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously I talked with @zncleon . Split into subkeys doesn't work like bitmap, since bloom filter would tent to random access all bits in it's space. Maybe a maximum size would help?

metadata->error_rate);
metadata->n_filters += 1;
metadata->bloom_bytes += bloom_filter_bytes;

BlockSplitBloomFilter block_split_bloom_filter;
block_split_bloom_filter.Init(std::move(bf_data));
block_split_bloom_filter.Init(bloom_filter_bytes);
*bf_data = block_split_bloom_filter.GetData();

std::string bloom_chain_meta_bytes;
metadata->Encode(&bloom_chain_meta_bytes);
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_meta_bytes);
}

void BloomChain::bloomAdd(const std::string &item, std::string *bf_data) {
BlockSplitBloomFilter block_split_bloom_filter;
std::string in_data = *bf_data;
block_split_bloom_filter.Init(std::move(in_data));
zncleon marked this conversation as resolved.
Show resolved Hide resolved

uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
block_split_bloom_filter.InsertHash(h);
auto batch = storage_->GetWriteBatchBase();
batch->Put(bf_key, block_split_bloom_filter.GetData());
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
*bf_data = block_split_bloom_filter.GetData();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface is ok, but what about adding a MoveData to avoid an around of copying?

Copy link
Member

@PragmaTwice PragmaTwice Sep 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a glance, I think we just add a string&& GetData() &&.

Copy link
Member

@PragmaTwice PragmaTwice Sep 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
*bf_data = block_split_bloom_filter.GetData();
*bf_data = std::move(block_split_bloom_filter).GetData();

And add a overload to GetData:

std::string&& GetData() && { return data_; }

Copy link
Member

@PragmaTwice PragmaTwice Sep 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @mapleFU that seems better than a MoveData method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, this also LGTM

}

rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const std::string &item, bool *exist) {
Expand Down Expand Up @@ -147,18 +160,28 @@ rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, int *r

// insert
if (!exist) {
if (metadata.size + 1 > metadata.GetCapacity()) { // TODO: scaling would be supported later
return rocksdb::Status::Aborted("filter is full");
}
s = bloomAdd(bf_key_list.back(), item_string);
std::string bf_data;
s = storage_->Get(rocksdb::ReadOptions(), bf_key_list.back(), &bf_data);
if (!s.ok()) return s;

if (metadata.size + 1 > metadata.GetCapacity()) {
if (metadata.IsScaling()) {
batch->Put(bf_key_list.back(), bf_data);
createBloomFilterInBatch(ns_key, &metadata, batch, &bf_data);
bf_key_list.push_back(getBFKey(ns_key, metadata, metadata.n_filters - 1));
} else {
return rocksdb::Status::Aborted("filter is full and is nonscaling");
}
}
bloomAdd(item_string, &bf_data);
batch->Put(bf_key_list.back(), bf_data);
*ret = 1;
metadata.size += 1;
}

std::string sb_chain_metadata_bytes;
metadata.Encode(&sb_chain_metadata_bytes);
batch->Put(metadata_cf_handle_, ns_key, sb_chain_metadata_bytes);
std::string bloom_chain_metadata_bytes;
metadata.Encode(&bloom_chain_metadata_bytes);
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_metadata_bytes);

return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
Expand Down
4 changes: 3 additions & 1 deletion src/types/redis_bloom_chain.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ class BloomChain : public Database {
rocksdb::Status getBloomChainMetadata(const Slice &ns_key, BloomChainMetadata *metadata);
rocksdb::Status createBloomChain(const Slice &ns_key, double error_rate, uint32_t capacity, uint16_t expansion,
BloomChainMetadata *metadata);
rocksdb::Status bloomAdd(const Slice &bf_key, const std::string &item);
void createBloomFilterInBatch(const Slice &ns_key, BloomChainMetadata *metadata,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch, std::string *bf_data);
static void bloomAdd(const std::string &item, std::string *bf_data);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: when using pointer as an argument, it looks like an output. Maybe we can add a comment to mark this like:

/// bf_data: [in/out] The content string of bloomfilter.

rocksdb::Status bloomCheck(const Slice &bf_key, const std::string &item, bool *exist);
};
} // namespace redis
44 changes: 43 additions & 1 deletion tests/gocase/unit/type/bloom/bloom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,5 +203,47 @@ func TestBloom(t *testing.T) {
require.Equal(t, int64(1), rdb.Do(ctx, "bf.info", key, "items").Val())
})

// TODO: Add the testcase of get filters of bloom filter after complete the scaling.
t.Run("Bloom filter full and nonscaling", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.0001", "50", "nonscaling").Err())

// insert items, suppose false positives is 0
for i := 0; i < 50; i++ {
buf := util.RandString(7, 8, util.Alpha)
Add := rdb.Do(ctx, "bf.add", key, buf)
require.NoError(t, Add.Err())
}
require.Equal(t, int64(50), rdb.Do(ctx, "bf.info", key, "items").Val())
require.ErrorContains(t, rdb.Do(ctx, "bf.add", key, "xxx").Err(), "filter is full and is nonscaling")
require.Equal(t, int64(50), rdb.Do(ctx, "bf.info", key, "items").Val())
})

t.Run("Bloom filter full and scaling", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.0001", "50", "expansion", "2").Err())

// insert items, suppose false positives is 0
for i := 0; i < 50; i++ {
buf := util.RandString(7, 8, util.Alpha)
Add := rdb.Do(ctx, "bf.add", key, buf)
require.NoError(t, Add.Err())
}
require.Equal(t, []interface{}{"Capacity", int64(50), "Size", int64(256), "Number of filters", int64(1), "Number of items inserted", int64(50), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())

// bloom filter is full and scaling
require.NoError(t, rdb.Do(ctx, "bf.add", key, "xxx").Err())
require.Equal(t, []interface{}{"Capacity", int64(150), "Size", int64(768), "Number of filters", int64(2), "Number of items inserted", int64(51), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())

// insert items, suppose false positives is 0
for i := 0; i < 99; i++ {
buf := util.RandString(7, 8, util.Alpha)
Add := rdb.Do(ctx, "bf.add", key, buf)
require.NoError(t, Add.Err())
}
require.Equal(t, []interface{}{"Capacity", int64(150), "Size", int64(768), "Number of filters", int64(2), "Number of items inserted", int64(150), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())

// bloom filter is full and scaling
require.NoError(t, rdb.Do(ctx, "bf.add", key, "xxxx").Err())
require.Equal(t, []interface{}{"Capacity", int64(350), "Size", int64(1792), "Number of filters", int64(3), "Number of items inserted", int64(151), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
})
}