diff --git a/src/commands/cmd_hll.cc b/src/commands/cmd_hll.cc new file mode 100644 index 00000000000..343aa322b4e --- /dev/null +++ b/src/commands/cmd_hll.cc @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include + +#include + +#include "commander.h" +#include "commands/command_parser.h" +#include "commands/error_constants.h" +#include "error_constants.h" +#include "parse_util.h" +#include "server/redis_reply.h" +#include "server/server.h" +#include "storage/redis_metadata.h" + +namespace redis { + +/// PFADD key [element [element ...]] +/// Complexity: O(1) for each element added. +class CommandPfAdd final : public Commander { + public: + Status Execute(Server *srv, Connection *conn, std::string *output) override { + redis::HyperLogLog hll(srv->storage, conn->GetNamespace()); + std::vector hashes(args_.size() - 1); + for (size_t i = 1; i < args_.size(); i++) { + hashes[i - 1] = redis::HyperLogLog::HllHash(args_[i]); + } + uint64_t ret{}; + auto s = hll.Add(args_[0], hashes, &ret); + if (!s.ok() && !s.IsNotFound()) { + return {Status::RedisExecErr, s.ToString()}; + } + *output = redis::Integer(ret); + return Status::OK(); + } +}; + +/// PFCOUNT key [key ...] +/// Complexity: O(1) with a very small average constant time when called with a single key. +/// O(N) with N being the number of keys, and much bigger constant times, +/// when called with multiple keys. +/// +/// TODO(mwish): Currently we don't supports merge, so only one key is supported. +class CommandPfCount final : public Commander { + Status Execute(Server *srv, Connection *conn, std::string *output) override { + redis::HyperLogLog hll(srv->storage, conn->GetNamespace()); + uint64_t ret{}; + auto s = hll.Count(args_[0], &ret); + if (!s.ok() && !s.IsNotFound()) { + return {Status::RedisExecErr, s.ToString()}; + } + if (s.IsNotFound()) { + ret = 0; + } + *output = redis::Integer(ret); + return Status::OK(); + } +}; + +REDIS_REGISTER_COMMANDS(MakeCmdAttr("pfadd", -2, "write", 1, 1, 1), + MakeCmdAttr("pfcount", 2, "read-only", 1, 1, 1), ); + +} // namespace redis diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc index e44b39cad7c..76403faaef3 100644 --- a/src/storage/redis_metadata.cc +++ b/src/storage/redis_metadata.cc @@ -329,7 +329,7 @@ bool Metadata::ExpireAt(uint64_t expired_ts) const { bool Metadata::IsSingleKVType() const { return Type() == kRedisString || Type() == kRedisJson; } bool Metadata::IsEmptyableType() const { - return IsSingleKVType() || Type() == kRedisStream || Type() == kRedisBloomFilter; + return IsSingleKVType() || Type() == kRedisStream || Type() == kRedisBloomFilter || Type() == kRedisHyperLogLog; } bool Metadata::Expired() const { return ExpireAt(util::GetTimeStampMS()); } @@ -472,3 +472,26 @@ rocksdb::Status JsonMetadata::Decode(Slice *input) { return rocksdb::Status::OK(); } + +void HyperLogLogMetadata::Encode(std::string *dst) const { + Metadata::Encode(dst); + PutFixed8(dst, static_cast(this->encode_type)); +} + +rocksdb::Status HyperLogLogMetadata::Decode(Slice *input) { + if (auto s = Metadata::Decode(input); !s.ok()) { + return s; + } + + uint8_t encoded_type = 0; + if (!GetFixed8(input, &encoded_type)) { + return rocksdb::Status::InvalidArgument(kErrMetadataTooShort); + } + // Check validity of encode type + if (encoded_type > 0) { + return rocksdb::Status::InvalidArgument(fmt::format("Invalid encode type {}", encoded_type)); + } + this->encode_type = static_cast(encoded_type); + + return rocksdb::Status::OK(); +} diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h index 68f36b2c994..5590609be37 100644 --- a/src/storage/redis_metadata.h +++ b/src/storage/redis_metadata.h @@ -49,6 +49,7 @@ enum RedisType : uint8_t { kRedisStream = 8, kRedisBloomFilter = 9, kRedisJson = 10, + kRedisHyperLogLog = 11, }; struct RedisTypes { @@ -90,8 +91,9 @@ enum RedisCommand { kRedisCmdLMove, }; -const std::vector RedisTypeNames = {"none", "string", "hash", "list", "set", "zset", - "bitmap", "sortedint", "stream", "MBbloom--", "ReJSON-RL"}; +const std::vector RedisTypeNames = {"none", "string", "hash", "list", + "set", "zset", "bitmap", "sortedint", + "stream", "MBbloom--", "ReJSON-RL", "hyperloglog"}; constexpr const char *kErrMsgWrongType = "WRONGTYPE Operation against a key holding the wrong kind of value"; constexpr const char *kErrMsgKeyExpired = "the key was expired"; @@ -313,3 +315,23 @@ class JsonMetadata : public Metadata { void Encode(std::string *dst) const override; rocksdb::Status Decode(Slice *input) override; }; + +class HyperLogLogMetadata : public Metadata { + public: + enum class EncodeType : uint8_t { + // Redis-style dense encoding implement as bitmap like sub keys to + // store registers by segment in data column family. + // The registers are stored in 6-bit format and each segment contains + // 768 registers. + DENSE = 0, + // TODO(mwish): sparse encoding + // SPARSE = 1, + }; + + explicit HyperLogLogMetadata(bool generate_version = true) : Metadata(kRedisHyperLogLog, generate_version) {} + + void Encode(std::string *dst) const override; + rocksdb::Status Decode(Slice *input) override; + + EncodeType encode_type = EncodeType::DENSE; +}; diff --git a/src/storage/storage.h b/src/storage/storage.h index 7f31fc451a2..aa93fa27d4a 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -42,6 +42,10 @@ #include "observer_or_unique.h" #include "status.h" +#if defined(__sparc__) || defined(__arm__) +#define USE_ALIGNED_ACCESS +#endif + enum class StorageEngineType : uint16_t { RocksDB, Speedb, diff --git a/src/types/hyperloglog.cc b/src/types/hyperloglog.cc new file mode 100644 index 00000000000..80923181d2f --- /dev/null +++ b/src/types/hyperloglog.cc @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* Redis HyperLogLog probabilistic cardinality approximation. + * This file implements the algorithm and the exported Redis commands. + * + * Copyright (c) 2014, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +// NOTE: this file is copy from redis's source: `src/hyperloglog.c` + +#include "hyperloglog.h" + +#include "vendor/murmurhash2.h" + +uint8_t HllDenseGetRegister(const uint8_t *registers, uint32_t register_index) { + uint32_t byte = (register_index * kHyperLogLogRegisterBits) / 8; + uint8_t fb = (register_index * kHyperLogLogRegisterBits) & 7; + uint8_t fb8 = 8 - fb; + uint8_t b0 = registers[byte]; + uint8_t b1 = 0; + if (fb > 8 - kHyperLogLogRegisterBits) { + b1 = registers[byte + 1]; + } + return ((b0 >> fb) | (b1 << fb8)) & kHyperLogLogRegisterMax; +} + +void HllDenseSetRegister(uint8_t *registers, uint32_t register_index, uint8_t val) { + uint32_t byte = register_index * kHyperLogLogRegisterBits / 8; + uint8_t fb = register_index * kHyperLogLogRegisterBits & 7; + uint8_t fb8 = 8 - fb; + uint8_t v = val; + registers[byte] &= ~(kHyperLogLogRegisterMax << fb); + registers[byte] |= v << fb; + if (fb > 8 - kHyperLogLogRegisterBits) { + registers[byte + 1] &= ~(kHyperLogLogRegisterMax >> fb8); + registers[byte + 1] |= v >> fb8; + } +} + +/* ========================= HyperLogLog algorithm ========================= */ + +// Reference: +// https://github.com/valkey-io/valkey/blob/14e09e981e0039edbf8c41a208a258c18624cbb7/src/hyperloglog.c#L457 +// +// Given a string element to add to the HyperLogLog, returns the length of the pattern 000..1 of the element +// hash. As a side effect 'regp' is *set to the register index this element hashes to +DenseHllResult ExtractDenseHllResult(uint64_t hash) { + /* Count the number of zeroes starting from bit kHyperLogLogRegisterCount + * (that is a power of two corresponding to the first bit we don't use + * as index). The max run can be 64-kHyperLogLogRegisterCountPow+1 = kHyperLogLogHashBitCount+1 bits. + * + * Note that the final "1" ending the sequence of zeroes must be + * included in the count, so if we find "001" the count is 3, and + * the smallest count possible is no zeroes at all, just a 1 bit + * at the first position, that is a count of 1. + * + * This may sound like inefficient, but actually in the average case + * there are high probabilities to find a 1 after a few iterations. */ + uint32_t index = hash & kHyperLogLogRegisterCountMask; /* Register index. */ + DCHECK_LT(index, kHyperLogLogRegisterCount); + hash >>= kHyperLogLogRegisterCountPow; /* Remove bits used to address the register. */ + hash |= (static_cast(1U) << kHyperLogLogHashBitCount); + uint8_t ctz = __builtin_ctzll(hash) + 1; + return DenseHllResult{index, ctz}; +} + +/* + * Compute the register histogram in the dense representation. + */ +void HllDenseRegHisto(nonstd::span registers, int *reghisto) { + /* Redis default is to use 16384 registers 6 bits each. The code works + * with other values by modifying the defines, but for our target value + * we take a faster path with unrolled loops. */ + const uint8_t *r = registers.data(); + unsigned long r0 = 0, r1 = 0, r2 = 0, r3 = 0, r4 = 0, r5 = 0, r6 = 0, r7 = 0, r8 = 0, r9 = 0, r10 = 0, r11 = 0, + r12 = 0, r13 = 0, r14 = 0, r15 = 0; + for (size_t j = 0; j < kHyperLogLogSegmentRegisters / 16; j++) { + /* Handle 16 registers per iteration. */ + r0 = r[0] & kHyperLogLogRegisterMax; + r1 = (r[0] >> 6 | r[1] << 2) & kHyperLogLogRegisterMax; + r2 = (r[1] >> 4 | r[2] << 4) & kHyperLogLogRegisterMax; + r3 = (r[2] >> 2) & kHyperLogLogRegisterMax; + r4 = r[3] & kHyperLogLogRegisterMax; + r5 = (r[3] >> 6 | r[4] << 2) & kHyperLogLogRegisterMax; + r6 = (r[4] >> 4 | r[5] << 4) & kHyperLogLogRegisterMax; + r7 = (r[5] >> 2) & kHyperLogLogRegisterMax; + r8 = r[6] & kHyperLogLogRegisterMax; + r9 = (r[6] >> 6 | r[7] << 2) & kHyperLogLogRegisterMax; + r10 = (r[7] >> 4 | r[8] << 4) & kHyperLogLogRegisterMax; + r11 = (r[8] >> 2) & kHyperLogLogRegisterMax; + r12 = r[9] & kHyperLogLogRegisterMax; + r13 = (r[9] >> 6 | r[10] << 2) & kHyperLogLogRegisterMax; + r14 = (r[10] >> 4 | r[11] << 4) & kHyperLogLogRegisterMax; + r15 = (r[11] >> 2) & kHyperLogLogRegisterMax; + + reghisto[r0]++; + reghisto[r1]++; + reghisto[r2]++; + reghisto[r3]++; + reghisto[r4]++; + reghisto[r5]++; + reghisto[r6]++; + reghisto[r7]++; + reghisto[r8]++; + reghisto[r9]++; + reghisto[r10]++; + reghisto[r11]++; + reghisto[r12]++; + reghisto[r13]++; + reghisto[r14]++; + reghisto[r15]++; + + r += 12; + } +} + +/* ========================= HyperLogLog Count ============================== + * This is the core of the algorithm where the approximated count is computed. + * The function uses the lower level HllDenseRegHisto() + * functions as helpers to compute histogram of register values part of the + * computation, which is representation-specific, while all the rest is common. */ + +/* Helper function sigma as defined in + * "New cardinality estimation algorithms for HyperLogLog sketches" + * Otmar Ertl, arXiv:1702.01284 */ +double HllSigma(double x) { + if (x == 1.) return INFINITY; + double z_prime = NAN; + double y = 1; + double z = x; + do { + x *= x; + z_prime = z; + z += x * y; + y += y; + } while (z_prime != z); + return z; +} + +/* Helper function tau as defined in + * "New cardinality estimation algorithms for HyperLogLog sketches" + * Otmar Ertl, arXiv:1702.01284 */ +double HllTau(double x) { + if (x == 0. || x == 1.) return 0.; + double z_prime = NAN; + double y = 1.0; + double z = 1 - x; + do { + x = sqrt(x); + z_prime = z; + y *= 0.5; + z -= pow(1 - x, 2) * y; + } while (z_prime != z); + return z / 3; +} + +/* Return the approximated cardinality of the set based on the harmonic + * mean of the registers values. */ +uint64_t HllDenseEstimate(const std::vector> ®isters) { + constexpr double m = kHyperLogLogRegisterCount; + int j = 0; + /* Note that reghisto size could be just kHyperLogLogHashBitCount+2, because kHyperLogLogHashBitCount+1 is + * the maximum frequency of the "000...1" sequence the hash function is + * able to return. However it is slow to check for sanity of the + * input: instead we history array at a safe size: overflows will + * just write data to wrong, but correctly allocated, places. */ + int reghisto[64] = {0}; + + /* Compute register histogram */ + for (const auto &r : registers) { + if (r.empty()) { + // Empty segment + reghisto[0] += kHyperLogLogSegmentRegisters; + } else { + HllDenseRegHisto(r, reghisto); + } + } + + /* Estimate cardinality from register histogram. See: + * "New cardinality estimation algorithms for HyperLogLog sketches" + * Otmar Ertl, arXiv:1702.01284 */ + double z = m * HllTau((m - reghisto[kHyperLogLogHashBitCount + 1]) / m); + for (j = kHyperLogLogHashBitCount; j >= 1; --j) { + z += reghisto[j]; + z *= 0.5; + } + z += m * HllSigma(reghisto[0] / m); + return static_cast(llroundl(kHyperLogLogAlpha * m * m / z)); +} diff --git a/src/types/hyperloglog.h b/src/types/hyperloglog.h new file mode 100644 index 00000000000..c99efe5c76f --- /dev/null +++ b/src/types/hyperloglog.h @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include +#include +#include + +#include "redis_bitmap.h" + +/* The greater is Pow, the smaller the error. */ +constexpr uint32_t kHyperLogLogRegisterCountPow = 14; +/* The number of bits of the hash value used for determining the number of leading zeros. */ +constexpr uint32_t kHyperLogLogHashBitCount = 50; +constexpr uint32_t kHyperLogLogRegisterCount = 1 << kHyperLogLogRegisterCountPow; /* With Pow=14, 16384 registers. */ + +constexpr size_t kHyperLogLogSegmentBytes = 768; +constexpr size_t kHyperLogLogSegmentRegisters = 1024; + +constexpr uint32_t kHyperLogLogSegmentCount = kHyperLogLogRegisterCount / kHyperLogLogSegmentRegisters; +constexpr uint32_t kHyperLogLogRegisterBits = 6; +constexpr uint32_t kHyperLogLogRegisterCountMask = kHyperLogLogRegisterCount - 1; /* Mask to index register. */ +constexpr uint32_t kHyperLogLogRegisterMax = ((1 << kHyperLogLogRegisterBits) - 1); +/* constant for 0.5/ln(2) */ +constexpr double kHyperLogLogAlpha = 0.721347520444481703680; +constexpr uint32_t kHyperLogLogRegisterBytes = (kHyperLogLogRegisterCount * kHyperLogLogRegisterBits + 7) / 8; +// Copied from redis +// https://github.com/valkey-io/valkey/blob/14e09e981e0039edbf8c41a208a258c18624cbb7/src/hyperloglog.c#L472 +constexpr uint32_t kHyperLogLogHashSeed = 0xadc83b19; + +struct DenseHllResult { + uint32_t register_index; + uint8_t hll_trailing_zero; +}; + +DenseHllResult ExtractDenseHllResult(uint64_t hash); + +/** + * Store the value of the register at position 'index' into variable 'val'. + * 'registers' is an array of unsigned bytes. + */ +uint8_t HllDenseGetRegister(const uint8_t *registers, uint32_t register_index); +/** + * Set the value of the register at position 'index' to 'val'. + * 'registers' is an array of unsigned bytes. + */ +void HllDenseSetRegister(uint8_t *registers, uint32_t register_index, uint8_t val); +/** + * Estimate the cardinality of the HyperLogLog data structure. + * + * @param registers The HyperLogLog data structure. The element should be either empty + * or a kHyperLogLogSegmentBytes sized array. + */ +uint64_t HllDenseEstimate(const std::vector> ®isters); diff --git a/src/types/redis_bitmap.cc b/src/types/redis_bitmap.cc index 9a08c1fe5fd..9d108d3b5b1 100644 --- a/src/types/redis_bitmap.cc +++ b/src/types/redis_bitmap.cc @@ -665,7 +665,6 @@ class Bitmap::SegmentCacheStore { metadata_cf_handle_(metadata_cf_handle), ns_key_(std::move(namespace_key)), metadata_(bitmap_metadata) {} - // Get a read-only segment by given index rocksdb::Status Get(uint32_t index, const std::string **cache) { std::string *res = nullptr; diff --git a/src/types/redis_hyperloglog.cc b/src/types/redis_hyperloglog.cc new file mode 100644 index 00000000000..aef0cc6680c --- /dev/null +++ b/src/types/redis_hyperloglog.cc @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "redis_hyperloglog.h" + +#include +#include + +#include "hyperloglog.h" +#include "vendor/murmurhash2.h" + +namespace redis { + +/// Cache for writing to a HyperLogLog. +/// +/// This is a bit like Bitmap::SegmentCacheStore, but simpler because +/// 1. We would only use it for writing, hll reading always traverses all segments. +/// 2. Some write access doesn't mark the segment as dirty, because the update value +/// is less than the current value. So that we need to export `SegmentEntry` to +/// the caller. +/// +/// When read from storage, if the segment exists and it size is not equal to +/// `kHyperLogLogRegisterBytesPerSegment`, it will be treated as a corruption. +class HllSegmentCache { + public: + struct SegmentEntry { + /// The segment data, it's would always equal to `kHyperLogLogRegisterBytesPerSegment`. + std::string data; + bool dirty; + }; + std::map segments; + + /// Get the segment from cache or storage. + /// + /// If the segment in not in the cache and storage, it will be initialized with + /// string(kHyperLogLogSegmentBytes, 0) and return OK. + template + rocksdb::Status Get(uint32_t segment_index, const GetSegmentFn &get_segment, SegmentEntry **entry) { + auto iter = segments.find(segment_index); + if (iter == segments.end()) { + std::string segment_data; + auto s = get_segment(segment_index, &segment_data); + if (!s.ok()) { + if (s.IsNotFound()) { + iter = segments.emplace(segment_index, SegmentEntry{std::move(segment_data), false}).first; + // Initialize the segment with 0 + iter->second.data.resize(kHyperLogLogSegmentBytes, 0); + *entry = &iter->second; + return rocksdb::Status::OK(); + } + return s; + } + iter = segments.emplace(segment_index, SegmentEntry{std::move(segment_data), false}).first; + } + if (iter->second.data.size() != kHyperLogLogSegmentBytes) { + return rocksdb::Status::Corruption("invalid segment size: expect=" + std::to_string(kHyperLogLogSegmentBytes) + + ", actual=" + std::to_string(iter->second.data.size())); + } + *entry = &iter->second; + return rocksdb::Status::OK(); + } +}; + +rocksdb::Status HyperLogLog::GetMetadata(Database::GetOptions get_options, const Slice &ns_key, + HyperLogLogMetadata *metadata) { + return Database::GetMetadata(get_options, {kRedisHyperLogLog}, ns_key, metadata); +} + +uint64_t HyperLogLog::HllHash(std::string_view element) { + DCHECK(element.size() <= std::numeric_limits::max()); + return HllMurMurHash64A(element.data(), static_cast(element.size()), kHyperLogLogHashSeed); +} + +/* the max 0 pattern counter of the subset the element belongs to is incremented if needed */ +rocksdb::Status HyperLogLog::Add(const Slice &user_key, const std::vector &element_hashes, uint64_t *ret) { + *ret = 0; + std::string ns_key = AppendNamespacePrefix(user_key); + + LockGuard guard(storage_->GetLockManager(), ns_key); + HyperLogLogMetadata metadata{}; + rocksdb::Status s = GetMetadata(GetOptions(), ns_key, &metadata); + if (!s.ok() && !s.IsNotFound()) return s; + + auto batch = storage_->GetWriteBatchBase(); + WriteBatchLogData log_data(kRedisHyperLogLog); + batch->PutLogData(log_data.Encode()); + + HllSegmentCache cache; + for (uint64_t element_hash : element_hashes) { + DenseHllResult dense_hll_result = ExtractDenseHllResult(element_hash); + uint32_t segment_index = dense_hll_result.register_index / kHyperLogLogSegmentRegisters; + uint32_t register_index_in_segment = dense_hll_result.register_index % kHyperLogLogSegmentRegisters; + HllSegmentCache::SegmentEntry *entry{nullptr}; + s = cache.Get( + segment_index, + [this, &ns_key, &metadata](uint32_t segment_index, std::string *segment) -> rocksdb::Status { + std::string sub_key = + InternalKey(ns_key, std::to_string(segment_index), metadata.version, storage_->IsSlotIdEncoded()) + .Encode(); + return storage_->Get(rocksdb::ReadOptions(), sub_key, segment); + }, + &entry); + if (!s.ok()) return s; + DCHECK(entry != nullptr); + DCHECK_EQ(kHyperLogLogSegmentBytes, entry->data.size()); + auto *segment_data = reinterpret_cast(entry->data.data()); + uint8_t old_count = HllDenseGetRegister(segment_data, register_index_in_segment); + if (dense_hll_result.hll_trailing_zero > old_count) { + HllDenseSetRegister(segment_data, register_index_in_segment, dense_hll_result.hll_trailing_zero); + entry->dirty = true; + *ret = 1; + } + } + // Nothing changed, no need to flush the segments + if (*ret == 0) return rocksdb::Status::OK(); + + // Flush dirty segments + // Release memory after batch is written + for (auto &[segment_index, entry] : cache.segments) { + if (entry.dirty) { + std::string sub_key = + InternalKey(ns_key, std::to_string(segment_index), metadata.version, storage_->IsSlotIdEncoded()).Encode(); + batch->Put(sub_key, entry.data); + entry.data.clear(); + } + } + cache.segments.clear(); + // Update metadata + { + metadata.encode_type = HyperLogLogMetadata::EncodeType::DENSE; + std::string bytes; + metadata.Encode(&bytes); + batch->Put(metadata_cf_handle_, ns_key, bytes); + } + return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + +rocksdb::Status HyperLogLog::Count(const Slice &user_key, uint64_t *ret) { + std::string ns_key = AppendNamespacePrefix(user_key); + *ret = 0; + std::vector registers; + { + LatestSnapShot ss(storage_); + Database::GetOptions get_options(ss.GetSnapShot()); + auto s = getRegisters(get_options, ns_key, ®isters); + if (!s.ok()) return s; + } + DCHECK_EQ(kHyperLogLogSegmentCount, registers.size()); + std::vector> register_segments; + register_segments.reserve(kHyperLogLogSegmentCount); + for (const auto ®ister_segment : registers) { + if (register_segment.empty()) { + // Empty segment + register_segments.emplace_back(); + continue; + } + // NOLINTNEXTLINE + const uint8_t *segment_data_ptr = reinterpret_cast(register_segment.data()); + register_segments.emplace_back(segment_data_ptr, register_segment.size()); + } + *ret = HllDenseEstimate(register_segments); + return rocksdb::Status::OK(); +} + +rocksdb::Status HyperLogLog::getRegisters(Database::GetOptions get_options, const Slice &ns_key, + std::vector *register_segments) { + HyperLogLogMetadata metadata; + rocksdb::Status s = GetMetadata(get_options, ns_key, &metadata); + if (!s.ok()) { + if (s.IsNotFound()) { + // return empty registers with the right size. + register_segments->resize(kHyperLogLogSegmentCount); + return rocksdb::Status::OK(); + } + return s; + } + + rocksdb::ReadOptions read_options = storage_->DefaultMultiGetOptions(); + read_options.snapshot = get_options.snapshot; + // Multi get all segments + std::vector sub_segment_keys; + sub_segment_keys.reserve(kHyperLogLogSegmentCount); + for (uint32_t i = 0; i < kHyperLogLogSegmentCount; i++) { + std::string sub_key = + InternalKey(ns_key, std::to_string(i), metadata.version, storage_->IsSlotIdEncoded()).Encode(); + sub_segment_keys.push_back(std::move(sub_key)); + } + std::vector sub_segment_slices; + sub_segment_slices.reserve(kHyperLogLogSegmentCount); + for (const auto &sub_key : sub_segment_keys) { + sub_segment_slices.emplace_back(sub_key); + } + std::vector values(kHyperLogLogSegmentCount); + std::vector statuses(kHyperLogLogSegmentCount); + storage_->MultiGet(read_options, storage_->GetDB()->DefaultColumnFamily(), kHyperLogLogSegmentCount, + sub_segment_slices.data(), values.data(), statuses.data()); + for (size_t i = 0; i < kHyperLogLogSegmentCount; i++) { + if (!statuses[i].ok() && !statuses[i].IsNotFound()) { + return statuses[i]; + } + register_segments->push_back(std::move(values[i])); + } + return rocksdb::Status::OK(); +} + +} // namespace redis diff --git a/src/types/redis_hyperloglog.h b/src/types/redis_hyperloglog.h new file mode 100644 index 00000000000..d18e0335980 --- /dev/null +++ b/src/types/redis_hyperloglog.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include "storage/redis_db.h" +#include "storage/redis_metadata.h" + +namespace redis { + +class HyperLogLog : public Database { + public: + explicit HyperLogLog(engine::Storage *storage, const std::string &ns) : Database(storage, ns) {} + rocksdb::Status Add(const Slice &user_key, const std::vector &element_hashes, uint64_t *ret); + rocksdb::Status Count(const Slice &user_key, uint64_t *ret); + // TODO(mwish): Supports merge operation and related commands + // rocksdb::Status Merge(const std::vector &user_keys); + + static uint64_t HllHash(std::string_view); + + private: + rocksdb::Status GetMetadata(Database::GetOptions get_options, const Slice &ns_key, HyperLogLogMetadata *metadata); + /// Using multi-get to acquire the register_segments + rocksdb::Status getRegisters(Database::GetOptions get_options, const Slice &ns_key, + std::vector *register_segments); +}; + +} // namespace redis diff --git a/src/vendor/murmurhash2.h b/src/vendor/murmurhash2.h new file mode 100644 index 00000000000..1cd7f6639d0 --- /dev/null +++ b/src/vendor/murmurhash2.h @@ -0,0 +1,106 @@ +/* Redis HyperLogLog probabilistic cardinality approximation. + * This file implements the algorithm and the exported Redis commands. + * + * Copyright (c) 2014, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#pragma once + +#include + +#ifndef USE_ALIGNED_ACCESS +#if defined(__sparc__) || defined(__arm__) +#define USE_ALIGNED_ACCESS +#endif +#endif + +// NOLINTBEGIN + +/* MurmurHash2, 64 bit version. + * It was modified for Redis in order to provide the same result in + * big and little endian archs (endian neutral). */ +inline uint64_t HllMurMurHash64A(const void *key, int len, uint32_t seed) { + const uint64_t m = 0xc6a4a7935bd1e995; + const int r = 47; + uint64_t h = seed ^ (len * m); + const auto *data = (const uint8_t *)key; + const uint8_t *end = data + (len - (len & 7)); + + while (data != end) { + uint64_t k = 0; + +#if (__BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__) +#ifdef USE_ALIGNED_ACCESS + memcpy(&k, data, sizeof(uint64_t)); +#else + k = *((uint64_t *)data); +#endif +#else + k = (uint64_t)data[0]; + k |= (uint64_t)data[1] << 8; + k |= (uint64_t)data[2] << 16; + k |= (uint64_t)data[3] << 24; + k |= (uint64_t)data[4] << 32; + k |= (uint64_t)data[5] << 40; + k |= (uint64_t)data[6] << 48; + k |= (uint64_t)data[7] << 56; +#endif + + k *= m; + k ^= k >> r; + k *= m; + h ^= k; + h *= m; + data += 8; + } + + switch (len & 7) { + case 7: + h ^= (uint64_t)data[6] << 48; /* fall-thru */ + case 6: + h ^= (uint64_t)data[5] << 40; /* fall-thru */ + case 5: + h ^= (uint64_t)data[4] << 32; /* fall-thru */ + case 4: + h ^= (uint64_t)data[3] << 24; /* fall-thru */ + case 3: + h ^= (uint64_t)data[2] << 16; /* fall-thru */ + case 2: + h ^= (uint64_t)data[1] << 8; /* fall-thru */ + case 1: + h ^= (uint64_t)data[0]; + h *= m; /* fall-thru */ + }; + + h ^= h >> r; + h *= m; + h ^= h >> r; + return h; +} + +// NOLINTEND diff --git a/tests/cppunit/types/hyperloglog_test.cc b/tests/cppunit/types/hyperloglog_test.cc new file mode 100644 index 00000000000..bf7c4914499 --- /dev/null +++ b/tests/cppunit/types/hyperloglog_test.cc @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include + +#include + +#include "test_base.h" +#include "types/redis_hyperloglog.h" + +class RedisHyperLogLogTest : public TestBase { + protected: + explicit RedisHyperLogLogTest() : TestBase() { + hll_ = std::make_unique(storage_.get(), "hll_ns"); + } + ~RedisHyperLogLogTest() override = default; + + std::unique_ptr hll_; + + static std::vector computeHashes(const std::vector &elements) { + std::vector hashes; + hashes.reserve(elements.size()); + for (const auto &element : elements) { + hashes.push_back(redis::HyperLogLog::HllHash(element)); + } + return hashes; + } +}; + +TEST_F(RedisHyperLogLogTest, PFADD) { + uint64_t ret = 0; + ASSERT_TRUE(hll_->Add("hll", {}, &ret).ok() && ret == 0); + // Approximated cardinality after creation is zero + ASSERT_TRUE(hll_->Count("hll", &ret).ok() && ret == 0); + // PFADD returns 1 when at least 1 reg was modified + ASSERT_TRUE(hll_->Add("hll", computeHashes({"a", "b", "c"}), &ret).ok()); + ASSERT_EQ(1, ret); + ASSERT_TRUE(hll_->Count("hll", &ret).ok()); + ASSERT_EQ(3, ret); + // PFADD returns 0 when no reg was modified + ASSERT_TRUE(hll_->Add("hll", computeHashes({"a", "b", "c"}), &ret).ok() && ret == 0); + // PFADD works with empty string + ASSERT_TRUE(hll_->Add("hll", computeHashes({""}), &ret).ok() && ret == 1); + // PFADD works with similar hash, which is likely to be in the same bucket + ASSERT_TRUE(hll_->Add("hll", {1, 2, 3, 2, 1}, &ret).ok() && ret == 1); + ASSERT_TRUE(hll_->Count("hll", &ret).ok()); + ASSERT_EQ(7, ret); +} + +TEST_F(RedisHyperLogLogTest, PFCOUNT_returns_approximated_cardinality_of_set) { + uint64_t ret = 0; + // pf add "1" to "5" + ASSERT_TRUE(hll_->Add("hll", computeHashes({"1", "2", "3", "4", "5"}), &ret).ok() && ret == 1); + // pf count is 5 + ASSERT_TRUE(hll_->Count("hll", &ret).ok() && ret == 5); + // pf add "6" to "10" + ASSERT_TRUE(hll_->Add("hll", computeHashes({"6", "7", "8", "8", "9", "10"}), &ret).ok() && ret == 1); + // pf count is 10 + ASSERT_TRUE(hll_->Count("hll", &ret).ok() && ret == 10); +}