Skip to content

Commit

Permalink
feat: use protobuf to store wmb in KV (#1782)
Browse files Browse the repository at this point in the history
  • Loading branch information
yhl25 authored Jul 1, 2024
1 parent 73bd723 commit 5f3766a
Show file tree
Hide file tree
Showing 5 changed files with 297 additions and 184 deletions.
2 changes: 2 additions & 0 deletions hack/generate-proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,5 @@ gen-protoc3(){
gen-protoc2 pkg/apis/proto/daemon/daemon.proto

gen-protoc3 pkg/apis/proto/isb/message.proto

gen-protoc3 pkg/apis/proto/wmb/wmb.proto
199 changes: 199 additions & 0 deletions pkg/apis/proto/wmb/wmb.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 41 additions & 0 deletions pkg/apis/proto/wmb/wmb.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

syntax = "proto3";
option go_package = "github.com/numaproj/numaflow/pkg/apis/proto/isb";

package wmb;

// WMB is used in the KV offset timeline bucket as the value for the given processor entity key.
message WMB {
// Idle is set to true if the given processor entity hasn't published anything
// to the offset timeline bucket in a batch processing cycle.
// Idle is used to signal an idle watermark.
bool idle = 1;

// Offset is the monotonically increasing index/offset of the buffer (buffer is the physical representation
// of the partition of the edge).
int64 offset = 2;

// Watermark is tightly coupled with the offset and will be monotonically increasing for a given ProcessorEntity
// as the offset increases.
// When it is idling (Idle==true), for a given offset, the watermark can monotonically increase without offset
// increasing.
int64 watermark = 3;

// Partition to identify the partition to which the watermark belongs.
int32 partition = 4;
}
34 changes: 20 additions & 14 deletions pkg/watermark/wmb/wmb.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ limitations under the License.
package wmb

import (
"bytes"
"encoding/binary"
"google.golang.org/protobuf/proto"

wmbpb "github.com/numaproj/numaflow/pkg/apis/proto/wmb"
)

// WMB is used in the KV offset timeline bucket as the value for the given processor entity key.
Expand All @@ -40,23 +41,28 @@ type WMB struct {
Partition int32
}

// EncodeToBytes encodes a WMB object into byte array.
// EncodeToBytes encodes a WMB object into byte array using protobuf.
func (w WMB) EncodeToBytes() ([]byte, error) {
buf := new(bytes.Buffer)
err := binary.Write(buf, binary.LittleEndian, w)
if err != nil {
return nil, err
pb := &wmbpb.WMB{
Idle: w.Idle,
Offset: w.Offset,
Watermark: w.Watermark,
Partition: w.Partition,
}
return buf.Bytes(), nil
return proto.Marshal(pb)
}

// DecodeToWMB decodes the given byte array into a WMB object.
// DecodeToWMB decodes the given byte array into a WMB object using protobuf.
func DecodeToWMB(b []byte) (WMB, error) {
var v WMB
buf := bytes.NewReader(b)
err := binary.Read(buf, binary.LittleEndian, &v)
if err != nil {
var pb wmbpb.WMB
if err := proto.Unmarshal(b, &pb); err != nil {
return WMB{}, err
}
return v, nil

return WMB{
Idle: pb.Idle,
Offset: pb.Offset,
Watermark: pb.Watermark,
Partition: pb.Partition,
}, nil
}
Loading

0 comments on commit 5f3766a

Please sign in to comment.