Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move streams implementation to storage layer. #2242

Merged
merged 14 commits into from
Feb 1, 2024

Conversation

KKorpse
Copy link
Contributor

@KKorpse KKorpse commented Dec 25, 2023

Issue:#2241
Impl doc:#1717

Main modifications

Stream 元数据格式

同其它五类元数据不一样的是,Stream 没有超时机制,不需要时间戳。所以 Stream 的元数据并没有继承于 BaseMetaValue,而是独立设计的格式。

Stream 的元数据格式如下,其中包括了所有 stream 必要的元数据信息。

class StreamMetaValue {
 //...
 private:
  tree_id_t groups_id_ = kINVALID_TREE_ID;
  size_t entries_added_{0};
  streamID first_id_;
  streamID last_id_;
  streamID max_deleted_entry_id_;
  int32_t length_{0};  // number of the messages in the stream
  int32_t version_{0};
};
|-----------------------|-----------|
|     groups_id_        |  int32_t  |
|-----------------------|-----------|
|    entries_added_     | uint64_t  |
|-----------------------|-----------|
|     first_id_ms       | uint64_t  |
|-----------------------|-----------|
|     first_id_seq      | uint64_t  |
|-----------------------|-----------|
|      last_id_ms       | uint64_t  |
|-----------------------|-----------|
|      last_id_seq      | uint64_t  |
|-----------------------|-----------|
| max_deleted_entry_ms  | uint64_t  |
|-----------------------|-----------|
| max_deleted_entry_seq | uint64_t  |
|-----------------------|-----------|
|       length_         | int32_t   |
|-----------------------|-----------|
|       version_        |  int32_t  |
|-----------------------|-----------|

元数据在一个单独的 cf 中存储,存储格式为 key - StreamMetaValue

Stream 数据格式

Stream 的数据类似于 key - value 形式,其中 key 是每条消息的 message-id,而 value 是 message 本身的内容。

其中 key 的格式可以复用 BaseDataKey 的格式,包括了 key, version ,data 三个字段,data 存储的是序列化后的 message-id。

而 value 字段无需额外的编码,只需要将 message 序列化即可,具体细节可以看 #1717

Tree 抽象

在实现 stream base 指令时,无需存储 cgroup 相关的元数据,所以也无需 rax-tree 的抽象了,故删除此部分,未来加入 xgroup 命令支持时,建议用第三个 cf 来存储相关元数据的 pel 数据。

此处删除的类为:class TreeIDGenerator

结构变更

pika_stream_base.h 和 pika_stream_base.cc 中的定义和实现移动到了 redis_stream.h 和 redis_stream.cc 中。

不支持的 Redis 父类接口

由于 stream 不应该支持设置超时,故其不支持下列 Redis 父类中定义的这些接口:

  //===--------------------------------------------------------------------===//
  // Not needed for streams
  //===--------------------------------------------------------------------===//
  Status Expire(const Slice& key, int32_t ttl) override;
  bool PKExpireScan(const std::string& start_key, int32_t min_timestamp, int32_t max_timestamp,
                    std::vector<std::string>* keys, int64_t* leftover_visits, std::string* next_key) override;
  Status Expireat(const Slice& key, int32_t timestamp) override;
  Status Persist(const Slice& key) override;
  Status TTL(const Slice& key, int64_t* timestamp) override;

这些接口如果被调用,会返回错误,它们都以下列形式实现:

Status RedisStreams::TTL(const Slice& key, int64_t* timestamp) {
  rocksdb::Status s(rocksdb::Status::NotSupported("RedisStreams::TTL not supported by stream"));
  return Status::Corruption(s.ToString());
}

@github-actions github-actions bot added the ✏️ Feature New feature or request label Dec 25, 2023
@cheniujh
Copy link
Collaborator

cheniujh commented Jan 6, 2024

当Pika内有stream数据时,使用 key * 命令应当可以扫描出stream类型的key,但目前key * 不会返回stream keys,请做一下支持。

@KKorpse
Copy link
Contributor Author

KKorpse commented Jan 6, 2024

当Pika内有stream数据时,使用 key * 命令应当可以扫描出stream类型的key,但目前key * 不会返回stream keys,请做一下支持。
ok

* KEYS
* Storage::DoCompactRange()
* Storage::PKHScanRange()
* Storage::PKHRSranRange()
@github-actions github-actions bot added the 📒 Documentation Improvements or additions to documentation label Jan 11, 2024
@@ -25,7 +28,7 @@ using streamID = struct streamID {
// We must store the streamID in memory in big-endian format. This way, our comparison of the serialized streamID byte
// code will be equivalent to the comparison of the uint64_t numbers.
inline void EncodeUint64InBigEndian(char* buf, uint64_t value) const {
if (storage::kLittleEndian) {
if (kLittleEndian) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这也算是通用函数了, 放到 coding.h 里统一起来会不会更合适点

inline void EncodeFixed64(char* buf, uint64_t value) {

就像这里, 下面的 inline uint64_t DecodeUint64OfBigEndian 同理

Copy link
Contributor Author

@KKorpse KKorpse Jan 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rocksdb 里面 key 应该是默认字典序?stream id 是 data key 的结尾,以大端编码能正常 range scan。pstd::EncodeFixed64 正好是小端编码,所以直接用会报错。
我看 List 的 data key 格式也是类似的,需要对 index 做 range scan,但它是实现了一个单独的 Comparator ,比较的时候还得解析出 index,感觉代码复杂度和效率都会低一些。

@AlexStocks
Copy link
Collaborator

@Mixficsol own Review

src/storage/src/redis_streams.cc Outdated Show resolved Hide resolved
src/storage/src/redis_streams.cc Outdated Show resolved Hide resolved
src/storage/src/redis_streams.cc Outdated Show resolved Hide resolved
src/storage/src/redis_streams.cc Outdated Show resolved Hide resolved
@AlexStocks AlexStocks merged commit a885758 into OpenAtomFoundation:unstable Feb 1, 2024
10 of 13 checks passed
bigdaronlee163 pushed a commit to bigdaronlee163/pika that referenced this pull request Jun 8, 2024
…on#2242)

* feat: move stream implememtation to storage with some bugs.

* fix: fix all problem of stream commands

* refactor: adjust stream code structure.

1. remove logic of tree_id generate, the tree will be implemented when supporting
   XGROUP commands.
2. move all the helper functions to redis_stream.h.

* feat: support some basic function of storage layer.

* fix: Add licence.

* fix: compile problem in macos.

* fix: length of stream now using int32_t.

* fix: Support some commands and API of blackwidow.

* KEYS
* Storage::DoCompactRange()
* Storage::PKHScanRange()
* Storage::PKHRSranRange()

* fix: build problem on ubuntu.

* fix: remote key lock.
cheniujh pushed a commit to cheniujh/pika that referenced this pull request Sep 24, 2024
…on#2242)

* feat: move stream implememtation to storage with some bugs.

* fix: fix all problem of stream commands

* refactor: adjust stream code structure.

1. remove logic of tree_id generate, the tree will be implemented when supporting
   XGROUP commands.
2. move all the helper functions to redis_stream.h.

* feat: support some basic function of storage layer.

* fix: Add licence.

* fix: compile problem in macos.

* fix: length of stream now using int32_t.

* fix: Support some commands and API of blackwidow.

* KEYS
* Storage::DoCompactRange()
* Storage::PKHScanRange()
* Storage::PKHRSranRange()

* fix: build problem on ubuntu.

* fix: remote key lock.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
📒 Documentation Improvements or additions to documentation ✏️ Feature New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants