diff --git a/pkg/apis/proto/daemon/daemon.pb.go b/pkg/apis/proto/daemon/daemon.pb.go
index 650faa9445..7668ea82ca 100644
--- a/pkg/apis/proto/daemon/daemon.pb.go
+++ b/pkg/apis/proto/daemon/daemon.pb.go
@@ -534,7 +534,7 @@ func (m *GetVertexMetricsResponse) GetVertexMetrics() []*VertexMetrics {
type VertexWatermark struct {
Pipeline *string `protobuf:"bytes,1,req,name=pipeline" json:"pipeline,omitempty"`
Vertex *string `protobuf:"bytes,2,req,name=vertex" json:"vertex,omitempty"`
- Watermarks []int64 `protobuf:"varint,3,rep,name=watermarks" json:"watermarks,omitempty"`
+ Watermark *int64 `protobuf:"varint,3,req,name=watermark" json:"watermark,omitempty"`
IsWatermarkEnabled *bool `protobuf:"varint,4,req,name=isWatermarkEnabled" json:"isWatermarkEnabled,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
@@ -588,11 +588,11 @@ func (m *VertexWatermark) GetVertex() string {
return ""
}
-func (m *VertexWatermark) GetWatermarks() []int64 {
- if m != nil {
- return m.Watermarks
+func (m *VertexWatermark) GetWatermark() int64 {
+ if m != nil && m.Watermark != nil {
+ return *m.Watermark
}
- return nil
+ return 0
}
func (m *VertexWatermark) GetIsWatermarkEnabled() bool {
@@ -824,62 +824,62 @@ func init() {
var fileDescriptor_93e327fd0d673221 = []byte{
// 894 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0x5f, 0x6f, 0xdc, 0x44,
- 0x10, 0x97, 0xed, 0x26, 0xbd, 0x4c, 0x38, 0x25, 0x0c, 0xfd, 0xe3, 0xba, 0xe5, 0x30, 0x4b, 0x80,
- 0x23, 0x2a, 0x67, 0x88, 0x04, 0x8a, 0x52, 0x89, 0x96, 0x94, 0x36, 0x42, 0x4a, 0x20, 0x32, 0x7f,
- 0x2a, 0xf1, 0x82, 0x9c, 0xcb, 0xde, 0xc5, 0xdc, 0xd9, 0x6b, 0xbc, 0x7b, 0x17, 0xa2, 0x2a, 0x2f,
- 0xfd, 0x0a, 0xa8, 0x8f, 0x7c, 0x1f, 0x9e, 0x10, 0x12, 0x8f, 0xbc, 0xa0, 0x88, 0x0f, 0x82, 0xbc,
- 0xbb, 0xf6, 0xd9, 0x77, 0xbe, 0x4b, 0xe8, 0x53, 0x3c, 0x33, 0xbf, 0x99, 0xdf, 0x6f, 0x77, 0x66,
- 0x36, 0x07, 0x24, 0x19, 0xf4, 0xbd, 0x20, 0x09, 0xb9, 0x97, 0xa4, 0x4c, 0x30, 0xef, 0x38, 0xa0,
- 0x11, 0x8b, 0xf5, 0x9f, 0x8e, 0xf4, 0xe1, 0xb2, 0xb2, 0x9c, 0x7b, 0x7d, 0xc6, 0xfa, 0x43, 0x9a,
- 0xc1, 0xbd, 0x20, 0x8e, 0x99, 0x08, 0x44, 0xc8, 0x62, 0xae, 0x50, 0xce, 0x5d, 0x1d, 0x95, 0xd6,
- 0xd1, 0xa8, 0xe7, 0xd1, 0x28, 0x11, 0x67, 0x2a, 0x48, 0x5e, 0x58, 0x00, 0xbb, 0xa3, 0x5e, 0x8f,
- 0xa6, 0x5f, 0xc6, 0x3d, 0x86, 0x0e, 0x34, 0x92, 0x30, 0xa1, 0xc3, 0x30, 0xa6, 0xb6, 0xe1, 0x9a,
- 0xed, 0x15, 0xbf, 0xb0, 0xb1, 0x05, 0xd0, 0x4b, 0x59, 0xf4, 0x3d, 0x4d, 0x05, 0xfd, 0xc5, 0x36,
- 0x65, 0xb4, 0xe4, 0xc9, 0x72, 0x05, 0xd3, 0x51, 0x4b, 0xe5, 0xe6, 0x76, 0x96, 0x7b, 0x24, 0x59,
- 0xbe, 0x0a, 0x22, 0x6a, 0x5f, 0x53, 0xb9, 0x13, 0x0f, 0x12, 0x78, 0x2d, 0xa1, 0xf1, 0x71, 0x18,
- 0xf7, 0x1f, 0xb3, 0x51, 0x2c, 0xec, 0x25, 0xd7, 0x6c, 0x5b, 0x7e, 0xc5, 0x87, 0x6d, 0x58, 0x0b,
- 0xba, 0x83, 0xc3, 0x32, 0x6c, 0x59, 0xc2, 0xa6, 0xdd, 0xb8, 0x01, 0x4d, 0xc1, 0x44, 0x30, 0x3c,
- 0xa0, 0x9c, 0x07, 0x7d, 0xca, 0xed, 0xeb, 0x12, 0x57, 0x75, 0x66, 0x9c, 0x4a, 0xc1, 0x3e, 0x8d,
- 0xfb, 0xe2, 0xc4, 0x6e, 0x28, 0xce, 0xb2, 0x0f, 0x37, 0x61, 0x5d, 0xd9, 0xdf, 0x65, 0x39, 0xfb,
- 0x61, 0x14, 0x0a, 0x7b, 0xc5, 0x35, 0xdb, 0x86, 0x3f, 0xe3, 0x47, 0x17, 0x56, 0x4b, 0x3e, 0x1b,
- 0x24, 0xac, 0xec, 0xc2, 0x5b, 0xb0, 0x1c, 0xf2, 0xa7, 0xa3, 0xe1, 0xd0, 0x5e, 0x75, 0xcd, 0x76,
- 0xc3, 0xd7, 0x16, 0xf9, 0xdb, 0x84, 0xa6, 0xba, 0xa8, 0x03, 0x2a, 0xd2, 0xb0, 0xcb, 0x17, 0xf6,
- 0xe1, 0x16, 0x2c, 0x8f, 0xcb, 0x3d, 0xd0, 0x16, 0x7e, 0x0b, 0x6b, 0x49, 0xca, 0xba, 0x94, 0xf3,
- 0x30, 0xee, 0xfb, 0x81, 0xa0, 0xdc, 0xb6, 0x5c, 0xab, 0xbd, 0xba, 0xb5, 0xd9, 0xd1, 0x53, 0x53,
- 0xe1, 0xe8, 0x1c, 0x56, 0xc1, 0x4f, 0x62, 0x91, 0x9e, 0xf9, 0xd3, 0x25, 0xf0, 0x21, 0x34, 0x74,
- 0x17, 0xb8, 0x7d, 0x4d, 0x96, 0x7b, 0x67, 0x4e, 0x39, 0x8d, 0x52, 0x75, 0x8a, 0x24, 0x67, 0x17,
- 0x6e, 0xd4, 0x31, 0xe1, 0x3a, 0x58, 0x03, 0x7a, 0x66, 0x1b, 0xae, 0xd1, 0x5e, 0xf1, 0xb3, 0x4f,
- 0xbc, 0x01, 0x4b, 0xe3, 0x60, 0x38, 0xa2, 0xb6, 0xe9, 0x1a, 0x6d, 0xc3, 0x57, 0xc6, 0x8e, 0xb9,
- 0x6d, 0x38, 0x0f, 0xa0, 0x59, 0x29, 0x7f, 0x59, 0xb2, 0x55, 0x4a, 0x26, 0x1f, 0x01, 0xee, 0x87,
- 0x5c, 0xa8, 0x29, 0xe7, 0x3e, 0xfd, 0x79, 0x44, 0xb9, 0x58, 0x74, 0xc3, 0xe4, 0x31, 0xbc, 0x51,
- 0xc9, 0xe0, 0x09, 0x8b, 0x39, 0xc5, 0xfb, 0x70, 0x5d, 0x75, 0x93, 0xdb, 0x86, 0xbc, 0x09, 0xcc,
- 0x6f, 0x62, 0xb2, 0x41, 0x7e, 0x0e, 0x21, 0x4f, 0x61, 0x7d, 0x8f, 0xea, 0x1a, 0x57, 0x20, 0xcd,
- 0xda, 0xaa, 0x52, 0xf3, 0xb6, 0x2a, 0x8b, 0x3c, 0x84, 0xd7, 0x4b, 0x75, 0xb4, 0x94, 0xcd, 0x02,
- 0x9c, 0x95, 0xa9, 0x57, 0x92, 0x17, 0x38, 0x80, 0xdb, 0x7b, 0x54, 0x54, 0x9a, 0x55, 0xa7, 0xc7,
- 0x9c, 0x3b, 0x66, 0x56, 0x79, 0xcc, 0xc8, 0x33, 0xb0, 0x67, 0xcb, 0x69, 0x59, 0x0f, 0xa0, 0x39,
- 0x2e, 0x07, 0xf4, 0x3d, 0xdd, 0xac, 0x9d, 0x18, 0xbf, 0x8a, 0x25, 0x2f, 0x0d, 0x58, 0x53, 0x80,
- 0x67, 0x81, 0xa0, 0x69, 0x14, 0xa4, 0x83, 0x57, 0xda, 0x83, 0x16, 0xc0, 0x69, 0x5e, 0x40, 0xad,
- 0x80, 0xe5, 0x97, 0x3c, 0xd8, 0x01, 0x0c, 0x79, 0x41, 0xf1, 0x24, 0x0e, 0x8e, 0x86, 0xf4, 0x58,
- 0xbe, 0x49, 0x0d, 0xbf, 0x26, 0x42, 0x7e, 0x04, 0xa7, 0x38, 0x70, 0x11, 0x2c, 0x8e, 0xfc, 0x39,
- 0xac, 0x8d, 0xab, 0x21, 0xdd, 0x92, 0xdb, 0xd5, 0x43, 0x4f, 0x32, 0xa7, 0xf1, 0xe4, 0x6b, 0xb8,
- 0x53, 0x47, 0x70, 0xa5, 0x91, 0xa9, 0xbb, 0x01, 0x72, 0x02, 0x6f, 0xee, 0x51, 0x71, 0xa8, 0x61,
- 0x45, 0xc9, 0x49, 0x9f, 0xf6, 0x00, 0x93, 0x99, 0xa8, 0x6e, 0xd6, 0x5c, 0xdd, 0x35, 0x29, 0x64,
- 0x07, 0xee, 0xcd, 0x61, 0xba, 0x54, 0xfd, 0xd6, 0x1f, 0x4b, 0xd0, 0xfc, 0x42, 0x52, 0x7d, 0x43,
- 0xd3, 0x71, 0xd8, 0xa5, 0x28, 0x60, 0xb5, 0xb4, 0x77, 0xe8, 0xe4, 0x4a, 0x66, 0xd7, 0xd7, 0xb9,
- 0x5b, 0x1b, 0x53, 0xc7, 0x23, 0xf7, 0x5f, 0xfc, 0xf5, 0xef, 0xaf, 0xe6, 0x7b, 0xb8, 0x21, 0xff,
- 0x23, 0x8e, 0x3f, 0xf6, 0x72, 0x4e, 0xee, 0x3d, 0xcf, 0x3f, 0xcf, 0x3d, 0xbd, 0xa8, 0x78, 0x0a,
- 0x2b, 0xc5, 0x82, 0xa1, 0x9d, 0xd7, 0x9d, 0xde, 0x5d, 0xe7, 0x4e, 0x4d, 0x44, 0xf3, 0x7d, 0x22,
- 0xf9, 0x3c, 0xfc, 0xf0, 0x2a, 0x7c, 0xde, 0x73, 0xf5, 0x71, 0x8e, 0x2f, 0x0d, 0xf9, 0x44, 0x54,
- 0x5f, 0xfe, 0xb7, 0x4a, 0x34, 0x75, 0x3b, 0xeb, 0xb8, 0xf3, 0x01, 0x5a, 0xce, 0x67, 0x52, 0xce,
- 0x36, 0x7e, 0xba, 0x50, 0x4e, 0x36, 0x2b, 0x61, 0x37, 0xf3, 0xa9, 0xa9, 0x39, 0xf7, 0x22, 0x2d,
- 0xe1, 0x37, 0x03, 0x70, 0x76, 0x20, 0xf1, 0xed, 0x19, 0xe2, 0xe9, 0x61, 0x75, 0xc8, 0x22, 0x88,
- 0x56, 0xf7, 0x48, 0xaa, 0xdb, 0xc1, 0xed, 0xff, 0xa9, 0xae, 0xd8, 0xe0, 0xec, 0xde, 0x6e, 0xd6,
- 0x4e, 0x1d, 0x6e, 0x94, 0xf8, 0xe7, 0x0e, 0xa5, 0xf3, 0xee, 0x25, 0x28, 0x2d, 0xd4, 0x93, 0x42,
- 0x3f, 0xc0, 0xf7, 0x17, 0x0a, 0x9d, 0x3c, 0x2c, 0xbb, 0x8f, 0x7e, 0xbf, 0x68, 0x19, 0x7f, 0x5e,
- 0xb4, 0x8c, 0x7f, 0x2e, 0x5a, 0xc6, 0x0f, 0x5b, 0xfd, 0x50, 0x9c, 0x8c, 0x8e, 0x3a, 0x5d, 0x16,
- 0x79, 0xf1, 0x28, 0x0a, 0x92, 0x94, 0xfd, 0x24, 0x3f, 0x7a, 0x43, 0x76, 0xea, 0xd5, 0xfe, 0xba,
- 0xfb, 0x2f, 0x00, 0x00, 0xff, 0xff, 0xdb, 0xe7, 0xd0, 0x32, 0xf5, 0x09, 0x00, 0x00,
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0x5f, 0x6f, 0xe3, 0x44,
+ 0x10, 0x97, 0xed, 0x6b, 0x2f, 0x99, 0x12, 0xb5, 0x0c, 0xf7, 0xc7, 0xe7, 0x2b, 0xc5, 0x2c, 0x05,
+ 0x42, 0x75, 0xc4, 0x50, 0x09, 0x54, 0xf5, 0x24, 0xee, 0xe8, 0x71, 0x57, 0x21, 0xb5, 0x50, 0x99,
+ 0x3f, 0x27, 0xf1, 0x82, 0xdc, 0x74, 0x93, 0x9a, 0xc6, 0x5e, 0xe3, 0xdd, 0xa4, 0x54, 0xa7, 0xbe,
+ 0xdc, 0x57, 0x38, 0xf5, 0x91, 0xef, 0xc3, 0x13, 0x42, 0xe2, 0x91, 0x17, 0x54, 0xf1, 0x41, 0x90,
+ 0x77, 0xd7, 0x8e, 0x9d, 0x38, 0x69, 0xe1, 0xa9, 0x3b, 0x33, 0xbf, 0x99, 0xdf, 0x6f, 0x77, 0x66,
+ 0xdc, 0x00, 0x49, 0x4e, 0xfa, 0x5e, 0x90, 0x84, 0xdc, 0x4b, 0x52, 0x26, 0x98, 0x77, 0x14, 0xd0,
+ 0x88, 0xc5, 0xfa, 0x4f, 0x47, 0xfa, 0x70, 0x51, 0x59, 0xce, 0x6a, 0x9f, 0xb1, 0xfe, 0x80, 0x66,
+ 0x70, 0x2f, 0x88, 0x63, 0x26, 0x02, 0x11, 0xb2, 0x98, 0x2b, 0x94, 0x73, 0x5f, 0x47, 0xa5, 0x75,
+ 0x38, 0xec, 0x79, 0x34, 0x4a, 0xc4, 0x99, 0x0a, 0x92, 0x97, 0x16, 0xc0, 0xce, 0xb0, 0xd7, 0xa3,
+ 0xe9, 0x97, 0x71, 0x8f, 0xa1, 0x03, 0x8d, 0x24, 0x4c, 0xe8, 0x20, 0x8c, 0xa9, 0x6d, 0xb8, 0x66,
+ 0xbb, 0xe9, 0x17, 0x36, 0xae, 0x01, 0xf4, 0x52, 0x16, 0x7d, 0x4f, 0x53, 0x41, 0x7f, 0xb1, 0x4d,
+ 0x19, 0x2d, 0x79, 0xb2, 0x5c, 0xc1, 0x74, 0xd4, 0x52, 0xb9, 0xb9, 0x9d, 0xe5, 0x1e, 0x4a, 0x96,
+ 0xaf, 0x82, 0x88, 0xda, 0x37, 0x54, 0xee, 0xd8, 0x83, 0x04, 0x5e, 0x4b, 0x68, 0x7c, 0x14, 0xc6,
+ 0xfd, 0x27, 0x6c, 0x18, 0x0b, 0x7b, 0xc1, 0x35, 0xdb, 0x96, 0x5f, 0xf1, 0x61, 0x1b, 0x96, 0x83,
+ 0xee, 0xc9, 0x41, 0x19, 0xb6, 0x28, 0x61, 0x93, 0x6e, 0x5c, 0x87, 0x96, 0x60, 0x22, 0x18, 0xec,
+ 0x53, 0xce, 0x83, 0x3e, 0xe5, 0xf6, 0x4d, 0x89, 0xab, 0x3a, 0x33, 0x4e, 0xa5, 0x60, 0x8f, 0xc6,
+ 0x7d, 0x71, 0x6c, 0x37, 0x14, 0x67, 0xd9, 0x87, 0x1b, 0xb0, 0xa2, 0xec, 0xef, 0xb2, 0x9c, 0xbd,
+ 0x30, 0x0a, 0x85, 0xdd, 0x74, 0xcd, 0xb6, 0xe1, 0x4f, 0xf9, 0xd1, 0x85, 0xa5, 0x92, 0xcf, 0x06,
+ 0x09, 0x2b, 0xbb, 0xf0, 0x0e, 0x2c, 0x86, 0xfc, 0xd9, 0x70, 0x30, 0xb0, 0x97, 0x5c, 0xb3, 0xdd,
+ 0xf0, 0xb5, 0x45, 0xfe, 0x32, 0xa1, 0xa5, 0x1e, 0x6a, 0x9f, 0x8a, 0x34, 0xec, 0xf2, 0xb9, 0x7d,
+ 0xb8, 0x03, 0x8b, 0xa3, 0x72, 0x0f, 0xb4, 0x85, 0xdf, 0xc2, 0x72, 0x92, 0xb2, 0x2e, 0xe5, 0x3c,
+ 0x8c, 0xfb, 0x7e, 0x20, 0x28, 0xb7, 0x2d, 0xd7, 0x6a, 0x2f, 0x6d, 0x6e, 0x74, 0xf4, 0xd4, 0x54,
+ 0x38, 0x3a, 0x07, 0x55, 0xf0, 0xd3, 0x58, 0xa4, 0x67, 0xfe, 0x64, 0x09, 0x7c, 0x04, 0x0d, 0xdd,
+ 0x05, 0x6e, 0xdf, 0x90, 0xe5, 0xde, 0x99, 0x51, 0x4e, 0xa3, 0x54, 0x9d, 0x22, 0xc9, 0xd9, 0x81,
+ 0x5b, 0x75, 0x4c, 0xb8, 0x02, 0xd6, 0x09, 0x3d, 0xb3, 0x0d, 0xd7, 0x68, 0x37, 0xfd, 0xec, 0x88,
+ 0xb7, 0x60, 0x61, 0x14, 0x0c, 0x86, 0xd4, 0x36, 0x5d, 0xa3, 0x6d, 0xf8, 0xca, 0xd8, 0x36, 0xb7,
+ 0x0c, 0xe7, 0x21, 0xb4, 0x2a, 0xe5, 0xaf, 0x4a, 0xb6, 0x4a, 0xc9, 0xe4, 0x23, 0xc0, 0xbd, 0x90,
+ 0x0b, 0x35, 0xe5, 0xdc, 0xa7, 0x3f, 0x0f, 0x29, 0x17, 0xf3, 0x5e, 0x98, 0x3c, 0x81, 0x37, 0x2a,
+ 0x19, 0x3c, 0x61, 0x31, 0xa7, 0xf8, 0x00, 0x6e, 0xaa, 0x6e, 0x72, 0xdb, 0x90, 0x2f, 0x81, 0xf9,
+ 0x4b, 0x8c, 0x37, 0xc8, 0xcf, 0x21, 0xe4, 0x19, 0xac, 0xec, 0x52, 0x5d, 0xe3, 0x1a, 0xa4, 0x59,
+ 0x5b, 0x55, 0x6a, 0xde, 0x56, 0x65, 0x91, 0x47, 0xf0, 0x7a, 0xa9, 0x8e, 0x96, 0xb2, 0x51, 0x80,
+ 0xb3, 0x32, 0xf5, 0x4a, 0xf2, 0x02, 0xfb, 0x70, 0x77, 0x97, 0x8a, 0x4a, 0xb3, 0xea, 0xf4, 0x98,
+ 0x33, 0xc7, 0xcc, 0x2a, 0x8f, 0x19, 0x79, 0x0e, 0xf6, 0x74, 0x39, 0x2d, 0xeb, 0x21, 0xb4, 0x46,
+ 0xe5, 0x80, 0x7e, 0xa7, 0xdb, 0xb5, 0x13, 0xe3, 0x57, 0xb1, 0xe4, 0x95, 0x01, 0xcb, 0x0a, 0xf0,
+ 0x3c, 0x10, 0x34, 0x8d, 0x82, 0xf4, 0xe4, 0x7f, 0xed, 0xc1, 0x2a, 0x34, 0x4f, 0xf3, 0x02, 0x52,
+ 0xbb, 0xe5, 0x8f, 0x1d, 0xd8, 0x01, 0x0c, 0x79, 0x41, 0xf0, 0x34, 0x0e, 0x0e, 0x07, 0xf4, 0x48,
+ 0x7e, 0x91, 0x1a, 0x7e, 0x4d, 0x84, 0xfc, 0x08, 0x4e, 0x71, 0xdd, 0x22, 0x58, 0x5c, 0xf8, 0x73,
+ 0x58, 0x1e, 0x55, 0x43, 0xba, 0x21, 0x77, 0xab, 0x57, 0x1e, 0x67, 0x4e, 0xe2, 0xc9, 0xd7, 0x70,
+ 0xaf, 0x8e, 0xe0, 0x5a, 0x03, 0x53, 0x77, 0x7f, 0x72, 0x0c, 0x6f, 0xee, 0x52, 0x71, 0xa0, 0x61,
+ 0x45, 0xc9, 0x71, 0x97, 0x76, 0x01, 0x93, 0xa9, 0xa8, 0x6e, 0xd5, 0x4c, 0xdd, 0x35, 0x29, 0x64,
+ 0x1b, 0x56, 0x67, 0x30, 0x5d, 0xa9, 0x7e, 0xf3, 0xf7, 0x05, 0x68, 0x7d, 0x21, 0xa9, 0xbe, 0xa1,
+ 0xe9, 0x28, 0xec, 0x52, 0x14, 0xb0, 0x54, 0xda, 0x3a, 0x74, 0x72, 0x25, 0xd3, 0xcb, 0xeb, 0xdc,
+ 0xaf, 0x8d, 0xa9, 0xeb, 0x91, 0x07, 0x2f, 0xff, 0xfc, 0xe7, 0x95, 0xf9, 0x1e, 0xae, 0xcb, 0xff,
+ 0x87, 0xa3, 0x8f, 0xbd, 0x9c, 0x93, 0x7b, 0x2f, 0xf2, 0xe3, 0xb9, 0xa7, 0xd7, 0x14, 0x4f, 0xa1,
+ 0x59, 0xac, 0x17, 0xda, 0x79, 0xdd, 0xc9, 0xcd, 0x75, 0xee, 0xd5, 0x44, 0x34, 0xdf, 0x27, 0x92,
+ 0xcf, 0xc3, 0x0f, 0xaf, 0xc3, 0xe7, 0xbd, 0x50, 0x87, 0x73, 0xbc, 0x30, 0xe4, 0x07, 0xa2, 0xfa,
+ 0xdd, 0x7f, 0xab, 0x44, 0x53, 0xb7, 0xb1, 0x8e, 0x3b, 0x1b, 0xa0, 0xe5, 0x7c, 0x26, 0xe5, 0x6c,
+ 0xe1, 0xa7, 0x73, 0xe5, 0x64, 0xb3, 0x12, 0x76, 0x33, 0x9f, 0x9a, 0x9a, 0x73, 0x2f, 0xd2, 0x12,
+ 0x7e, 0x35, 0x00, 0xa7, 0x07, 0x12, 0xdf, 0x9e, 0x22, 0x9e, 0x1c, 0x56, 0x87, 0xcc, 0x83, 0x68,
+ 0x75, 0x8f, 0xa5, 0xba, 0x6d, 0xdc, 0xfa, 0x8f, 0xea, 0xc6, 0x0b, 0x7c, 0x61, 0xc0, 0xed, 0xda,
+ 0xa9, 0xc3, 0xf5, 0x12, 0xff, 0xcc, 0xa1, 0x74, 0xde, 0xbd, 0x02, 0xa5, 0x85, 0x7a, 0x52, 0xe8,
+ 0x07, 0xf8, 0xfe, 0x5c, 0xa1, 0x85, 0x2c, 0xbe, 0xf3, 0xf8, 0xb7, 0xcb, 0x35, 0xe3, 0x8f, 0xcb,
+ 0x35, 0xe3, 0xef, 0xcb, 0x35, 0xe3, 0x87, 0xcd, 0x7e, 0x28, 0x8e, 0x87, 0x87, 0x9d, 0x2e, 0x8b,
+ 0xbc, 0x78, 0x18, 0x05, 0x49, 0xca, 0x7e, 0x92, 0x87, 0xde, 0x80, 0x9d, 0x7a, 0xb5, 0xbf, 0xed,
+ 0xfe, 0x0d, 0x00, 0x00, 0xff, 0xff, 0xe4, 0x90, 0xc1, 0x21, 0xf3, 0x09, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@@ -1594,12 +1594,12 @@ func (m *VertexWatermark) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i--
dAtA[i] = 0x20
}
- if len(m.Watermarks) > 0 {
- for iNdEx := len(m.Watermarks) - 1; iNdEx >= 0; iNdEx-- {
- i = encodeVarintDaemon(dAtA, i, uint64(m.Watermarks[iNdEx]))
- i--
- dAtA[i] = 0x18
- }
+ if m.Watermark == nil {
+ return 0, github_com_gogo_protobuf_proto.NewRequiredNotSetError("watermark")
+ } else {
+ i = encodeVarintDaemon(dAtA, i, uint64(*m.Watermark))
+ i--
+ dAtA[i] = 0x18
}
if m.Vertex == nil {
return 0, github_com_gogo_protobuf_proto.NewRequiredNotSetError("vertex")
@@ -2003,10 +2003,8 @@ func (m *VertexWatermark) Size() (n int) {
l = len(*m.Vertex)
n += 1 + l + sovDaemon(uint64(l))
}
- if len(m.Watermarks) > 0 {
- for _, e := range m.Watermarks {
- n += 1 + sovDaemon(uint64(e))
- }
+ if m.Watermark != nil {
+ n += 1 + sovDaemon(uint64(*m.Watermark))
}
if m.IsWatermarkEnabled != nil {
n += 2
@@ -3493,81 +3491,26 @@ func (m *VertexWatermark) Unmarshal(dAtA []byte) error {
iNdEx = postIndex
hasFields[0] |= uint64(0x00000002)
case 3:
- if wireType == 0 {
- var v int64
- for shift := uint(0); ; shift += 7 {
- if shift >= 64 {
- return ErrIntOverflowDaemon
- }
- if iNdEx >= l {
- return io.ErrUnexpectedEOF
- }
- b := dAtA[iNdEx]
- iNdEx++
- v |= int64(b&0x7F) << shift
- if b < 0x80 {
- break
- }
- }
- m.Watermarks = append(m.Watermarks, v)
- } else if wireType == 2 {
- var packedLen int
- for shift := uint(0); ; shift += 7 {
- if shift >= 64 {
- return ErrIntOverflowDaemon
- }
- if iNdEx >= l {
- return io.ErrUnexpectedEOF
- }
- b := dAtA[iNdEx]
- iNdEx++
- packedLen |= int(b&0x7F) << shift
- if b < 0x80 {
- break
- }
- }
- if packedLen < 0 {
- return ErrInvalidLengthDaemon
- }
- postIndex := iNdEx + packedLen
- if postIndex < 0 {
- return ErrInvalidLengthDaemon
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Watermark", wireType)
+ }
+ var v int64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowDaemon
}
- if postIndex > l {
+ if iNdEx >= l {
return io.ErrUnexpectedEOF
}
- var elementCount int
- var count int
- for _, integer := range dAtA[iNdEx:postIndex] {
- if integer < 128 {
- count++
- }
- }
- elementCount = count
- if elementCount != 0 && len(m.Watermarks) == 0 {
- m.Watermarks = make([]int64, 0, elementCount)
- }
- for iNdEx < postIndex {
- var v int64
- for shift := uint(0); ; shift += 7 {
- if shift >= 64 {
- return ErrIntOverflowDaemon
- }
- if iNdEx >= l {
- return io.ErrUnexpectedEOF
- }
- b := dAtA[iNdEx]
- iNdEx++
- v |= int64(b&0x7F) << shift
- if b < 0x80 {
- break
- }
- }
- m.Watermarks = append(m.Watermarks, v)
+ b := dAtA[iNdEx]
+ iNdEx++
+ v |= int64(b&0x7F) << shift
+ if b < 0x80 {
+ break
}
- } else {
- return fmt.Errorf("proto: wrong wireType = %d for field Watermarks", wireType)
}
+ m.Watermark = &v
+ hasFields[0] |= uint64(0x00000004)
case 4:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field IsWatermarkEnabled", wireType)
@@ -3589,7 +3532,7 @@ func (m *VertexWatermark) Unmarshal(dAtA []byte) error {
}
b := bool(v != 0)
m.IsWatermarkEnabled = &b
- hasFields[0] |= uint64(0x00000004)
+ hasFields[0] |= uint64(0x00000008)
default:
iNdEx = preIndex
skippy, err := skipDaemon(dAtA[iNdEx:])
@@ -3613,6 +3556,9 @@ func (m *VertexWatermark) Unmarshal(dAtA []byte) error {
return github_com_gogo_protobuf_proto.NewRequiredNotSetError("vertex")
}
if hasFields[0]&uint64(0x00000004) == 0 {
+ return github_com_gogo_protobuf_proto.NewRequiredNotSetError("watermark")
+ }
+ if hasFields[0]&uint64(0x00000008) == 0 {
return github_com_gogo_protobuf_proto.NewRequiredNotSetError("isWatermarkEnabled")
}
diff --git a/pkg/apis/proto/daemon/daemon.proto b/pkg/apis/proto/daemon/daemon.proto
index ae7d64b4ef..44a2f5eba7 100644
--- a/pkg/apis/proto/daemon/daemon.proto
+++ b/pkg/apis/proto/daemon/daemon.proto
@@ -78,7 +78,7 @@ message GetVertexMetricsResponse {
message VertexWatermark {
required string pipeline = 1;
required string vertex = 2;
- repeated int64 watermarks = 3;
+ required int64 watermark = 3;
required bool isWatermarkEnabled = 4;
}
diff --git a/pkg/daemon/server/service/pipeline_watermark_query.go b/pkg/daemon/server/service/pipeline_watermark_query.go
index f4950b13e5..f204407eda 100644
--- a/pkg/daemon/server/service/pipeline_watermark_query.go
+++ b/pkg/daemon/server/service/pipeline_watermark_query.go
@@ -20,9 +20,6 @@ package service
import (
"context"
"fmt"
- "sort"
- "strconv"
- "strings"
"time"
"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
@@ -30,7 +27,6 @@ import (
"github.com/numaproj/numaflow/pkg/isbsvc"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
- "github.com/numaproj/numaflow/pkg/watermark/processor"
)
// TODO - Write Unit Tests for this file
@@ -44,17 +40,13 @@ func GetVertexWatermarkFetchers(ctx context.Context, pipeline *v1alpha1.Pipeline
}
for _, vertex := range pipeline.Spec.Vertices {
- // key for fetcher map ~ vertexName/replicas
- replicas := int64(1)
if vertex.Sink != nil {
toBufferName := v1alpha1.GenerateSinkBufferName(pipeline.Namespace, pipeline.Name, vertex.Name)
wmFetcher, err := isbSvcClient.CreateWatermarkFetcher(ctx, toBufferName)
if err != nil {
return nil, fmt.Errorf("failed to create watermark fetcher, %w", err)
}
- // for now only reduce has parallelism so not updating replicas for sink for now as done below
- fetchersKey := vertex.Name + "/" + strconv.FormatInt(replicas, 10)
- wmFetchers[fetchersKey] = []fetch.Fetcher{wmFetcher}
+ wmFetchers[vertex.Name] = []fetch.Fetcher{wmFetcher}
} else {
// If the vertex is not a sink, to fetch the watermark, we consult all out edges and grab the latest watermark among them.
var wmFetcherList []fetch.Fetcher
@@ -68,16 +60,7 @@ func GetVertexWatermarkFetchers(ctx context.Context, pipeline *v1alpha1.Pipeline
wmFetcherList = append(wmFetcherList, fetchWatermark)
}
}
- // for now only reduce has parallelism might have to modify later
- // checking parallelism for a vertex to identify reduce vertex
- // replicas will have parallelism for reduce vertex else will be nil
- // parallelism indicates replica count ~ multiple pods for a vertex here
- obj := pipeline.GetFromEdges(vertex.Name)
- if len(obj) > 0 && obj[0].Parallelism != nil {
- replicas = int64(*obj[0].Parallelism)
- }
- fetchersKey := vertex.Name + "/" + strconv.FormatInt(replicas, 10)
- wmFetchers[fetchersKey] = wmFetcherList
+ wmFetchers[vertex.Name] = wmFetcherList
}
}
return wmFetchers, nil
@@ -89,27 +72,14 @@ func (ps *pipelineMetadataQuery) GetVertexWatermark(ctx context.Context, request
resp := new(daemon.GetVertexWatermarkResponse)
vertexName := request.GetVertex()
isWatermarkEnabled := !ps.pipeline.Spec.Watermark.Disabled
- // for now only reduce has parallelism might have to modify later
- // checking parallelism for a vertex to identify reduce vertex
- // parallelism is only supported by reduce vertex for now else will be nil
- // parallelism indicates replica count ~ multiple pods for a vertex here
- replicas := int64(1)
- obj := ps.pipeline.GetFromEdges(vertexName)
- if len(obj) > 0 && obj[0].Parallelism != nil {
- replicas = int64(*obj[0].Parallelism)
- }
// If watermark is not enabled, return time zero
if ps.pipeline.Spec.Watermark.Disabled {
timeZero := time.Unix(0, 0).UnixMilli()
- watermarks := make([]int64, replicas)
- for idx := range watermarks {
- watermarks[idx] = timeZero
- }
v := &daemon.VertexWatermark{
Pipeline: &ps.pipeline.Name,
Vertex: request.Vertex,
- Watermarks: watermarks,
+ Watermark: &timeZero,
IsWatermarkEnabled: &isWatermarkEnabled,
}
resp.VertexWatermark = v
@@ -117,8 +87,7 @@ func (ps *pipelineMetadataQuery) GetVertexWatermark(ctx context.Context, request
}
// Watermark is enabled
- fetchersKey := vertexName + "/" + strconv.FormatInt(replicas, 10)
- vertexFetchers, ok := ps.watermarkFetchers[fetchersKey]
+ vertexFetchers, ok := ps.watermarkFetchers[vertexName]
// Vertex not found
if !ok {
@@ -127,24 +96,17 @@ func (ps *pipelineMetadataQuery) GetVertexWatermark(ctx context.Context, request
}
var latestWatermark = int64(-1)
- var latestWatermarks []processor.Watermark
for _, fetcher := range vertexFetchers {
- watermarks := fetcher.GetHeadWatermarks()
- sort.Slice(watermarks, func(i, j int) bool { return watermarks[i].UnixMilli() > watermarks[j].UnixMilli() })
- watermark := watermarks[0].UnixMilli()
- if watermark >= latestWatermark {
+ watermark := fetcher.GetHeadWatermark().UnixMilli()
+ if watermark > latestWatermark {
latestWatermark = watermark
- latestWatermarks = watermarks
}
}
- var watermarks []int64
- for _, v := range latestWatermarks {
- watermarks = append(watermarks, v.UnixMilli())
- }
+
v := &daemon.VertexWatermark{
Pipeline: &ps.pipeline.Name,
Vertex: request.Vertex,
- Watermarks: watermarks,
+ Watermark: &latestWatermark,
IsWatermarkEnabled: &isWatermarkEnabled,
}
resp.VertexWatermark = v
@@ -162,16 +124,11 @@ func (ps *pipelineMetadataQuery) GetPipelineWatermarks(ctx context.Context, requ
watermarkArr := make([]*daemon.VertexWatermark, len(ps.watermarkFetchers))
i := 0
for k := range ps.watermarkFetchers {
- vertexName := strings.Split(k, "/")[0]
- replicas, _ := strconv.ParseInt(strings.Split(k, "/")[1], 10, 64)
- watermarks := make([]int64, replicas)
- for idx := range watermarks {
- watermarks[idx] = timeZero
- }
+ vertexName := k
watermarkArr[i] = &daemon.VertexWatermark{
Pipeline: &ps.pipeline.Name,
Vertex: &vertexName,
- Watermarks: watermarks,
+ Watermark: &timeZero,
IsWatermarkEnabled: &isWatermarkEnabled,
}
i++
@@ -185,25 +142,17 @@ func (ps *pipelineMetadataQuery) GetPipelineWatermarks(ctx context.Context, requ
i := 0
for k, vertexFetchers := range ps.watermarkFetchers {
var latestWatermark = int64(-1)
- var latestWatermarks []processor.Watermark
for _, fetcher := range vertexFetchers {
- watermarks := fetcher.GetHeadWatermarks()
- sort.Slice(watermarks, func(i, j int) bool { return watermarks[i].UnixMilli() > watermarks[j].UnixMilli() })
- watermark := watermarks[0].UnixMilli()
- if watermark >= latestWatermark {
+ watermark := fetcher.GetHeadWatermark().UnixMilli()
+ if watermark > latestWatermark {
latestWatermark = watermark
- latestWatermarks = watermarks
}
}
- vertexName := strings.Split(k, "/")[0]
- var watermarks []int64
- for _, v := range latestWatermarks {
- watermarks = append(watermarks, v.UnixMilli())
- }
+ vertexName := k
watermarkArr[i] = &daemon.VertexWatermark{
Pipeline: &ps.pipeline.Name,
Vertex: &vertexName,
- Watermarks: watermarks,
+ Watermark: &latestWatermark,
IsWatermarkEnabled: &isWatermarkEnabled,
}
i++
diff --git a/pkg/reduce/data_forward_test.go b/pkg/reduce/data_forward_test.go
index 885121b231..83ac2b26aa 100644
--- a/pkg/reduce/data_forward_test.go
+++ b/pkg/reduce/data_forward_test.go
@@ -91,8 +91,8 @@ func (e *EventTypeWMProgressor) GetWatermark(offset isb.Offset) processor.Waterm
return e.watermarks[offset.String()]
}
-func (e *EventTypeWMProgressor) GetHeadWatermarks() []processor.Watermark {
- return []processor.Watermark{}
+func (e *EventTypeWMProgressor) GetHeadWatermark() processor.Watermark {
+ return processor.Watermark{}
}
// PayloadForTest is a dummy payload for testing.
diff --git a/pkg/watermark/fetch/edge_fetcher.go b/pkg/watermark/fetch/edge_fetcher.go
index aabbf0ca38..950e67c6a0 100644
--- a/pkg/watermark/fetch/edge_fetcher.go
+++ b/pkg/watermark/fetch/edge_fetcher.go
@@ -53,16 +53,17 @@ func NewEdgeFetcher(ctx context.Context, bufferName string, storeWatcher store.W
}
}
-// GetHeadWatermarks returns the watermark using the HeadOffset (the latest offset among all processors). This
+// GetHeadWatermark returns the watermark using the HeadOffset (the latest offset among all processors). This
// can be used in showing the watermark progression for a vertex when not consuming the messages
// directly (eg. UX, tests)
// NOTE
// - We don't use this function in the regular pods in the vertex.
// - UX only uses GetHeadWatermark, so the `p.IsDeleted()` check in the GetWatermark never happens.
// Meaning, in the UX (daemon service) we never delete any processor.
-func (e *edgeFetcher) GetHeadWatermarks() []processor.Watermark {
+func (e *edgeFetcher) GetHeadWatermark() processor.Watermark {
var debugString strings.Builder
- var headWatermarks []processor.Watermark
+ var headOffset int64 = math.MinInt64
+ var epoch int64 = math.MaxInt64
var allProcessors = e.processorManager.GetAllProcessors()
// get the head offset of each processor
for _, p := range allProcessors {
@@ -70,20 +71,19 @@ func (e *edgeFetcher) GetHeadWatermarks() []processor.Watermark {
continue
}
var o = p.offsetTimeline.GetHeadOffset()
- var epoch int64 = math.MaxInt64
e.log.Debugf("Processor: %v (headoffset:%d)", p, o)
debugString.WriteString(fmt.Sprintf("[Processor:%v] (headoffset:%d) \n", p, o))
- if o != -1 {
+ if o != -1 && o > headOffset {
+ headOffset = o
epoch = p.offsetTimeline.GetEventTimeFromInt64(o)
}
- if epoch == math.MaxInt64 {
- // Use -1 as default watermark value to indicate there is no valid watermark yet.
- headWatermarks = append(headWatermarks, processor.Watermark(time.UnixMilli(-1)))
- } else {
- headWatermarks = append(headWatermarks, processor.Watermark(time.UnixMilli(epoch)))
- }
}
- return headWatermarks
+ e.log.Debugf("GetHeadWatermark: %s", debugString.String())
+ if epoch == math.MaxInt64 {
+ // Use -1 as default watermark value to indicate there is no valid watermark yet.
+ return processor.Watermark(time.UnixMilli(-1))
+ }
+ return processor.Watermark(time.UnixMilli(epoch))
}
// GetWatermark gets the smallest timestamp for the given offset
diff --git a/pkg/watermark/fetch/edge_fetcher_test.go b/pkg/watermark/fetch/edge_fetcher_test.go
index 4f52f8d210..85153d3ec9 100644
--- a/pkg/watermark/fetch/edge_fetcher_test.go
+++ b/pkg/watermark/fetch/edge_fetcher_test.go
@@ -143,7 +143,7 @@ func TestBuffer_GetWatermark(t *testing.T) {
t.Errorf("GetWatermark() = %v, want %v", got, processor.Watermark(time.UnixMilli(tt.want)))
}
// this will always be 14 because the timeline has been populated ahead of time
- assert.Equal(t, time.Time(b.GetHeadWatermarks()[len(b.GetHeadWatermarks())-1]).In(location), time.UnixMilli(14).In(location))
+ assert.Equal(t, time.Time(b.GetHeadWatermark()).In(location), time.UnixMilli(14).In(location))
})
}
}
diff --git a/pkg/watermark/fetch/interface.go b/pkg/watermark/fetch/interface.go
index 3ad0fa9e79..8aba4f3bd9 100644
--- a/pkg/watermark/fetch/interface.go
+++ b/pkg/watermark/fetch/interface.go
@@ -28,6 +28,6 @@ type Fetcher interface {
io.Closer
// GetWatermark returns the inorder monotonically increasing watermark of the edge connected to Vn-1.
GetWatermark(offset isb.Offset) processor.Watermark
- // GetHeadWatermarks returns the latest watermark based on the head offset
- GetHeadWatermarks() []processor.Watermark
+ // GetHeadWatermark returns the latest watermark based on the head offset
+ GetHeadWatermark() processor.Watermark
}
diff --git a/pkg/watermark/fetch/source_fetcher.go b/pkg/watermark/fetch/source_fetcher.go
index 029a027e79..92344dd186 100644
--- a/pkg/watermark/fetch/source_fetcher.go
+++ b/pkg/watermark/fetch/source_fetcher.go
@@ -52,25 +52,22 @@ func NewSourceFetcher(ctx context.Context, sourceBufferName string, storeWatcher
}
}
-// GetHeadWatermarks returns the latest watermark of all the processors.
-func (e *sourceFetcher) GetHeadWatermarks() []processor.Watermark {
- var headWatermarks []processor.Watermark
+// GetHeadWatermark returns the latest watermark of all the processors.
+func (e *sourceFetcher) GetHeadWatermark() processor.Watermark {
+ var epoch int64 = math.MinInt64
for _, p := range e.processorManager.GetAllProcessors() {
if !p.IsActive() {
continue
}
- var epoch int64 = math.MinInt64
- if p.offsetTimeline.GetHeadWatermark() != -1 {
+ if p.offsetTimeline.GetHeadWatermark() > epoch {
epoch = p.offsetTimeline.GetHeadWatermark()
}
- if epoch == math.MinInt64 {
- // Use -1 as default watermark value to indicate there is no valid watermark yet.
- headWatermarks = append(headWatermarks, processor.Watermark(time.UnixMilli(-1)))
- } else {
- headWatermarks = append(headWatermarks, processor.Watermark(time.UnixMilli(epoch)))
- }
}
- return headWatermarks
+ if epoch == math.MinInt64 {
+ // Use -1 as default watermark value to indicate there is no valid watermark yet.
+ return processor.Watermark(time.UnixMilli(-1))
+ }
+ return processor.Watermark(time.UnixMilli(epoch))
}
// GetWatermark returns the lowest of the latest watermark of all the processors,
diff --git a/pkg/watermark/generic/noop.go b/pkg/watermark/generic/noop.go
index f35ecf6640..efc6c9c4b2 100644
--- a/pkg/watermark/generic/noop.go
+++ b/pkg/watermark/generic/noop.go
@@ -51,9 +51,9 @@ func (n NoOpWMProgressor) GetLatestWatermark() processor.Watermark {
return processor.Watermark{}
}
-// GetHeadWatermarks returns the default head watermark.
-func (n NoOpWMProgressor) GetHeadWatermarks() []processor.Watermark {
- return []processor.Watermark{}
+// GetHeadWatermark returns the default head watermark.
+func (n NoOpWMProgressor) GetHeadWatermark() processor.Watermark {
+ return processor.Watermark{}
}
// Close stops the no-op progressor.
diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go
index 79750f8ea9..3977eb745d 100644
--- a/test/e2e/functional_test.go
+++ b/test/e2e/functional_test.go
@@ -266,7 +266,7 @@ func isWatermarkProgressing(ctx context.Context, client *daemonclient.DaemonClie
pipelineWatermarks := make([]int64, len(vertexList))
idx := 0
for _, v := range wm {
- pipelineWatermarks[idx] = v.Watermarks[0]
+ pipelineWatermarks[idx] = *v.Watermark
idx++
}
currentWatermark = pipelineWatermarks
diff --git a/ui/src/components/pipeline/Pipeline.tsx b/ui/src/components/pipeline/Pipeline.tsx
index 782a63d736..6d55bda723 100644
--- a/ui/src/components/pipeline/Pipeline.tsx
+++ b/ui/src/components/pipeline/Pipeline.tsx
@@ -140,11 +140,10 @@ export function Pipeline() {
json.map((vertex) => {
const vertexWatermark = {} as VertexWatermark;
vertexWatermark.isWaterMarkEnabled = vertex["isWatermarkEnabled"];
- vertexWatermark.watermark = vertex["watermarks"][0];
+ vertexWatermark.watermark = vertex["watermark"];
vertexWatermark.watermarkLocalTime = new Date(
vertexWatermark.watermark
).toISOString();
- vertexWatermark.podWatermarks = vertex["watermarks"];
vertexToWatermarkMap.set(vertex.vertex, vertexWatermark);
})
})
diff --git a/ui/src/components/pipeline/nodeinfo/NodeInfo.tsx b/ui/src/components/pipeline/nodeinfo/NodeInfo.tsx
index 619011a214..19ea79520a 100644
--- a/ui/src/components/pipeline/nodeinfo/NodeInfo.tsx
+++ b/ui/src/components/pipeline/nodeinfo/NodeInfo.tsx
@@ -54,20 +54,12 @@ export default function NodeInfo(props: NodeInfoProps) {
{...a11yProps(1)}
/>
)}
- {node?.data?.vertexWatermark && (
-
- )}
{node?.data?.vertexMetrics && (
)}
@@ -124,32 +116,6 @@ export default function NodeInfo(props: NodeInfoProps) {
)}
- {node?.data?.vertexWatermark && (
-
-
-
-
- Pod
- Watermark
-
-
-
- {node?.data?.vertexWatermark?.podWatermarks &&
- node.data.vertexWatermark.podWatermarks.map((podWatermark, idx) => (
-
- Pod - {idx}
- {podWatermark}
-
- ))}
-
-
-
- )}
-
-
{node?.data?.vertexMetrics && (