Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the support of SCALING for bloom filter #1721

Merged
merged 14 commits into from
Sep 12, 2023
2 changes: 2 additions & 0 deletions src/storage/redis_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -452,3 +452,5 @@ uint32_t BloomChainMetadata::GetCapacity() const {
}
return static_cast<uint32_t>(base_capacity * (1 - pow(expansion, n_filters)) / (1 - expansion));
}

bool BloomChainMetadata::IsScaling() const { return expansion != 0; }
5 changes: 5 additions & 0 deletions src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,4 +245,9 @@ class BloomChainMetadata : public Metadata {
///
/// @return the total capacity value
uint32_t GetCapacity() const;

/// Check the bloom chain is scaling or not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's needless to comment on simple methods such as IsScaling or GetCapacity - they have self-explanatory names. BTW, the implementation of IsScaling is so short/simple that you can write it in the header file.

///
/// @return true if scaling, false if nonscaling
bool IsScaling() const;
};
46 changes: 37 additions & 9 deletions src/types/redis_bloom_chain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,36 @@ rocksdb::Status BloomChain::createBloomChain(const Slice &ns_key, double error_r
block_split_bloom_filter.Init(metadata->bloom_bytes);

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisBloomFilter, {"createSBChain"});
WriteBatchLogData log_data(kRedisBloomFilter, {"createBloomChain"});
batch->PutLogData(log_data.Encode());

std::string sb_chain_meta_bytes;
metadata->Encode(&sb_chain_meta_bytes);
batch->Put(metadata_cf_handle_, ns_key, sb_chain_meta_bytes);
std::string bloom_chain_meta_bytes;
metadata->Encode(&bloom_chain_meta_bytes);
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_meta_bytes);

std::string bf_key = getBFKey(ns_key, *metadata, metadata->n_filters - 1);
batch->Put(bf_key, block_split_bloom_filter.GetData());

return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status BloomChain::createBloomFilter(const Slice &ns_key, BloomChainMetadata *metadata) {
uint32_t bloom_filter_bytes = BlockSplitBloomFilter::OptimalNumOfBytes(
static_cast<uint32_t>(metadata->base_capacity * pow(metadata->expansion, metadata->n_filters)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might not strong related to this patch, should we enforce a "hardlimit" to avoid too large BloomFilter?

cc @PragmaTwice @git-hulk

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Redis have some limits here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RedisBloom has a maximum Cuckoo Filter n_filters. And don't have limit

I think our bitmap has some limits. And BlockSplitBloomFilter limit 128MB, perhaps if filter is too large, we need to reject and return to user?

Copy link
Member

@git-hulk git-hulk Sep 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

128MiB is also too large for one key, maybe we can improve this by separating them into multi subkeys in future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously I talked with @zncleon . Split into subkeys doesn't work like bitmap, since bloom filter would tent to random access all bits in it's space. Maybe a maximum size would help?

metadata->error_rate);
metadata->n_filters += 1;
metadata->bloom_bytes += bloom_filter_bytes;

BlockSplitBloomFilter block_split_bloom_filter;
block_split_bloom_filter.Init(bloom_filter_bytes);

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisBloomFilter, {"createBloomFilter"});
batch->PutLogData(log_data.Encode());

std::string bloom_chain_meta_bytes;
metadata->Encode(&bloom_chain_meta_bytes);
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_meta_bytes);

std::string bf_key = getBFKey(ns_key, *metadata, metadata->n_filters - 1);
batch->Put(bf_key, block_split_bloom_filter.GetData());
Expand Down Expand Up @@ -147,18 +171,22 @@ rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, int *r

// insert
if (!exist) {
if (metadata.size + 1 > metadata.GetCapacity()) { // TODO: scaling would be supported later
return rocksdb::Status::Aborted("filter is full");
if (metadata.size + 1 > metadata.GetCapacity()) {
if (metadata.IsScaling()) {
s = createBloomFilter(ns_key, &metadata);
Copy link
Member

@PragmaTwice PragmaTwice Sep 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the status seems not checked

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it can be done like this. Now we have holding a "lock" here. And createBloomFilter will directly call Write finally

I propose a createBloomFilterInBatch or other, the interface would just put the new bloomFilter in batch. The previous create can make use of this, and here we can just put it in batch first?

} else {
return rocksdb::Status::Aborted("filter is full and is nonscaling");
}
}
s = bloomAdd(bf_key_list.back(), item_string);
zncleon marked this conversation as resolved.
Show resolved Hide resolved
if (!s.ok()) return s;
*ret = 1;
metadata.size += 1;
}

std::string sb_chain_metadata_bytes;
metadata.Encode(&sb_chain_metadata_bytes);
batch->Put(metadata_cf_handle_, ns_key, sb_chain_metadata_bytes);
std::string bloom_chain_metadata_bytes;
metadata.Encode(&bloom_chain_metadata_bytes);
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_metadata_bytes);

return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
Expand Down
1 change: 1 addition & 0 deletions src/types/redis_bloom_chain.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class BloomChain : public Database {
rocksdb::Status getBloomChainMetadata(const Slice &ns_key, BloomChainMetadata *metadata);
rocksdb::Status createBloomChain(const Slice &ns_key, double error_rate, uint32_t capacity, uint16_t expansion,
BloomChainMetadata *metadata);
rocksdb::Status createBloomFilter(const Slice &ns_key, BloomChainMetadata *metadata);
rocksdb::Status bloomAdd(const Slice &bf_key, const std::string &item);
rocksdb::Status bloomCheck(const Slice &bf_key, const std::string &item, bool *exist);
};
Expand Down
44 changes: 43 additions & 1 deletion tests/gocase/unit/type/bloom/bloom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,5 +203,47 @@ func TestBloom(t *testing.T) {
require.Equal(t, int64(1), rdb.Do(ctx, "bf.info", key, "items").Val())
})

// TODO: Add the testcase of get filters of bloom filter after complete the scaling.
t.Run("Bloom filter full and nonscaling", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.0001", "50", "nonscaling").Err())

// insert items, suppose false positives is 0
for i := 0; i < 50; i++ {
buf := util.RandString(7, 8, util.Alpha)
Add := rdb.Do(ctx, "bf.add", key, buf)
require.NoError(t, Add.Err())
}
require.Equal(t, int64(50), rdb.Do(ctx, "bf.info", key, "items").Val())
require.ErrorContains(t, rdb.Do(ctx, "bf.add", key, "xxx").Err(), "filter is full and is nonscaling")
require.Equal(t, int64(50), rdb.Do(ctx, "bf.info", key, "items").Val())
})

t.Run("Bloom filter full and scaling", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.0001", "50", "expansion", "2").Err())

// insert items, suppose false positives is 0
for i := 0; i < 50; i++ {
buf := util.RandString(7, 8, util.Alpha)
Add := rdb.Do(ctx, "bf.add", key, buf)
require.NoError(t, Add.Err())
}
require.Equal(t, []interface{}{"Capacity", int64(50), "Size", int64(256), "Number of filters", int64(1), "Number of items inserted", int64(50), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())

// bloom filter is full and scaling
require.NoError(t, rdb.Do(ctx, "bf.add", key, "xxx").Err())
require.Equal(t, []interface{}{"Capacity", int64(150), "Size", int64(768), "Number of filters", int64(2), "Number of items inserted", int64(51), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())

// insert items, suppose false positives is 0
for i := 0; i < 99; i++ {
buf := util.RandString(7, 8, util.Alpha)
Add := rdb.Do(ctx, "bf.add", key, buf)
require.NoError(t, Add.Err())
}
require.Equal(t, []interface{}{"Capacity", int64(150), "Size", int64(768), "Number of filters", int64(2), "Number of items inserted", int64(150), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())

// bloom filter is full and scaling
require.NoError(t, rdb.Do(ctx, "bf.add", key, "xxxx").Err())
require.Equal(t, []interface{}{"Capacity", int64(350), "Size", int64(1792), "Number of filters", int64(3), "Number of items inserted", int64(151), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
})
}