From c9e57d03e68044a59cd19dfb12ff00eb5bebd086 Mon Sep 17 00:00:00 2001 From: veds-g Date: Wed, 1 Feb 2023 15:31:36 +0530 Subject: [PATCH] modifying Watermark object in proto file Signed-off-by: veds-g --- pkg/apis/proto/daemon/daemon.pb.go | 241 +++++++----------- pkg/apis/proto/daemon/daemon.proto | 3 +- .../server/service/pipeline_metrics_query.go | 3 + .../service/pipeline_watermark_query.go | 85 ++++-- pkg/reconciler/vertex/scaling/scaling.go | 23 +- pkg/reduce/data_forward_test.go | 6 +- pkg/watermark/fetch/edge_fetcher.go | 38 +-- pkg/watermark/fetch/edge_fetcher_test.go | 5 +- pkg/watermark/fetch/interface.go | 6 +- pkg/watermark/fetch/source_fetcher.go | 30 +-- pkg/watermark/generic/noop.go | 8 +- ui/src/components/pipeline/Pipeline.tsx | 4 +- .../components/pipeline/nodeinfo/NodeInfo.tsx | 7 - 13 files changed, 190 insertions(+), 269 deletions(-) diff --git a/pkg/apis/proto/daemon/daemon.pb.go b/pkg/apis/proto/daemon/daemon.pb.go index 8f5bdd6d92..650faa9445 100644 --- a/pkg/apis/proto/daemon/daemon.pb.go +++ b/pkg/apis/proto/daemon/daemon.pb.go @@ -534,9 +534,8 @@ func (m *GetVertexMetricsResponse) GetVertexMetrics() []*VertexMetrics { type VertexWatermark struct { Pipeline *string `protobuf:"bytes,1,req,name=pipeline" json:"pipeline,omitempty"` Vertex *string `protobuf:"bytes,2,req,name=vertex" json:"vertex,omitempty"` - Watermark *int64 `protobuf:"varint,3,req,name=watermark" json:"watermark,omitempty"` + Watermarks []int64 `protobuf:"varint,3,rep,name=watermarks" json:"watermarks,omitempty"` IsWatermarkEnabled *bool `protobuf:"varint,4,req,name=isWatermarkEnabled" json:"isWatermarkEnabled,omitempty"` - PodWatermarks []int64 `protobuf:"varint,5,rep,name=podWatermarks" json:"podWatermarks,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -589,11 +588,11 @@ func (m *VertexWatermark) GetVertex() string { return "" } -func (m *VertexWatermark) GetWatermark() int64 { - if m != nil && m.Watermark != nil { - return *m.Watermark +func (m *VertexWatermark) GetWatermarks() []int64 { + if m != nil { + return m.Watermarks } - return 0 + return nil } func (m *VertexWatermark) GetIsWatermarkEnabled() bool { @@ -603,13 +602,6 @@ func (m *VertexWatermark) GetIsWatermarkEnabled() bool { return false } -func (m *VertexWatermark) GetPodWatermarks() []int64 { - if m != nil { - return m.PodWatermarks - } - return nil -} - type GetVertexWatermarkResponse struct { VertexWatermark *VertexWatermark `protobuf:"bytes,1,req,name=vertexWatermark" json:"vertexWatermark,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -831,64 +823,63 @@ func init() { } var fileDescriptor_93e327fd0d673221 = []byte{ - // 910 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xdd, 0x6e, 0xe3, 0x44, - 0x14, 0x96, 0xed, 0x6d, 0x37, 0x39, 0x25, 0x6a, 0x39, 0xec, 0xcf, 0xac, 0xb7, 0x14, 0x63, 0x0a, - 0x84, 0x6a, 0x89, 0xa1, 0x12, 0xa8, 0xea, 0x4a, 0xec, 0xd2, 0x65, 0xb7, 0x42, 0x6a, 0xa1, 0x32, - 0x3f, 0x2b, 0x71, 0x83, 0xdc, 0x74, 0x92, 0x9a, 0xc6, 0x1e, 0xe3, 0x99, 0xa4, 0x54, 0xab, 0xde, - 0xec, 0x2b, 0xa0, 0x5e, 0xf2, 0x20, 0xbc, 0x01, 0x57, 0x08, 0x89, 0x4b, 0x6e, 0x50, 0xc5, 0x83, - 0x20, 0xcf, 0x8c, 0x1d, 0x3b, 0x71, 0xd2, 0xc2, 0x55, 0x7d, 0xce, 0xf9, 0xce, 0xf9, 0xbe, 0x9c, - 0x1f, 0xbb, 0xe0, 0x26, 0x27, 0x7d, 0x2f, 0x48, 0x42, 0xee, 0x25, 0x29, 0x13, 0xcc, 0x3b, 0x0a, - 0x68, 0xc4, 0x62, 0xfd, 0xa7, 0x23, 0x7d, 0xb8, 0xa8, 0x2c, 0x7b, 0xb5, 0xcf, 0x58, 0x7f, 0x40, - 0x33, 0xb8, 0x17, 0xc4, 0x31, 0x13, 0x81, 0x08, 0x59, 0xcc, 0x15, 0xca, 0xbe, 0xaf, 0xa3, 0xd2, - 0x3a, 0x1c, 0xf6, 0x3c, 0x1a, 0x25, 0xe2, 0x4c, 0x05, 0xdd, 0x97, 0x16, 0xc0, 0xce, 0xb0, 0xd7, - 0xa3, 0xe9, 0xe7, 0x71, 0x8f, 0xa1, 0x0d, 0x8d, 0x24, 0x4c, 0xe8, 0x20, 0x8c, 0x29, 0x31, 0x1c, - 0xb3, 0xdd, 0xf4, 0x0b, 0x1b, 0xd7, 0x00, 0x7a, 0x29, 0x8b, 0xbe, 0xa5, 0xa9, 0xa0, 0x3f, 0x11, - 0x53, 0x46, 0x4b, 0x9e, 0x2c, 0x57, 0x30, 0x1d, 0xb5, 0x54, 0x6e, 0x6e, 0x67, 0xb9, 0x87, 0x92, - 0xe5, 0x8b, 0x20, 0xa2, 0xe4, 0x86, 0xca, 0x1d, 0x7b, 0xd0, 0x85, 0x57, 0x12, 0x1a, 0x1f, 0x85, - 0x71, 0xff, 0x09, 0x1b, 0xc6, 0x82, 0x2c, 0x38, 0x66, 0xdb, 0xf2, 0x2b, 0x3e, 0x6c, 0xc3, 0x72, - 0xd0, 0x3d, 0x39, 0x28, 0xc3, 0x16, 0x25, 0x6c, 0xd2, 0x8d, 0xeb, 0xd0, 0x12, 0x4c, 0x04, 0x83, - 0x7d, 0xca, 0x79, 0xd0, 0xa7, 0x9c, 0xdc, 0x94, 0xb8, 0xaa, 0x33, 0xe3, 0x54, 0x0a, 0xf6, 0x68, - 0xdc, 0x17, 0xc7, 0xa4, 0xa1, 0x38, 0xcb, 0x3e, 0xdc, 0x80, 0x15, 0x65, 0x7f, 0x93, 0xe5, 0xec, - 0x85, 0x51, 0x28, 0x48, 0xd3, 0x31, 0xdb, 0x86, 0x3f, 0xe5, 0x47, 0x07, 0x96, 0x4a, 0x3e, 0x02, - 0x12, 0x56, 0x76, 0xe1, 0x1d, 0x58, 0x0c, 0xf9, 0xb3, 0xe1, 0x60, 0x40, 0x96, 0x1c, 0xb3, 0xdd, - 0xf0, 0xb5, 0xe5, 0xfe, 0x65, 0x42, 0x4b, 0x35, 0x6a, 0x9f, 0x8a, 0x34, 0xec, 0xf2, 0xb9, 0x73, - 0xb8, 0x03, 0x8b, 0xa3, 0xf2, 0x0c, 0xb4, 0x85, 0x5f, 0xc3, 0x72, 0x92, 0xb2, 0x2e, 0xe5, 0x3c, - 0x8c, 0xfb, 0x7e, 0x20, 0x28, 0x27, 0x96, 0x63, 0xb5, 0x97, 0x36, 0x37, 0x3a, 0x7a, 0x6b, 0x2a, - 0x1c, 0x9d, 0x83, 0x2a, 0xf8, 0x69, 0x2c, 0xd2, 0x33, 0x7f, 0xb2, 0x04, 0x3e, 0x82, 0x86, 0x9e, - 0x02, 0x27, 0x37, 0x64, 0xb9, 0xb7, 0x66, 0x94, 0xd3, 0x28, 0x55, 0xa7, 0x48, 0xb2, 0x77, 0xe0, - 0x56, 0x1d, 0x13, 0xae, 0x80, 0x75, 0x42, 0xcf, 0x88, 0xe1, 0x18, 0xed, 0xa6, 0x9f, 0x3d, 0xe2, - 0x2d, 0x58, 0x18, 0x05, 0x83, 0x21, 0x25, 0xa6, 0x63, 0xb4, 0x0d, 0x5f, 0x19, 0xdb, 0xe6, 0x96, - 0x61, 0x3f, 0x84, 0x56, 0xa5, 0xfc, 0x55, 0xc9, 0x56, 0x29, 0xd9, 0xfd, 0x00, 0x70, 0x2f, 0xe4, - 0x42, 0x6d, 0x39, 0xf7, 0xe9, 0x8f, 0x43, 0xca, 0xc5, 0xbc, 0x0e, 0xbb, 0x4f, 0xe0, 0xb5, 0x4a, - 0x06, 0x4f, 0x58, 0xcc, 0x29, 0x3e, 0x80, 0x9b, 0x6a, 0x9a, 0x9c, 0x18, 0xb2, 0x13, 0x98, 0x77, - 0x62, 0x7c, 0x41, 0x7e, 0x0e, 0x71, 0x9f, 0xc1, 0xca, 0x2e, 0xd5, 0x35, 0xae, 0x41, 0x9a, 0x8d, - 0x55, 0xa5, 0xe6, 0x63, 0x55, 0x96, 0xfb, 0x08, 0x5e, 0x2d, 0xd5, 0xd1, 0x52, 0x36, 0x0a, 0x70, - 0x56, 0xa6, 0x5e, 0x49, 0x5e, 0x60, 0x1f, 0xee, 0xee, 0x52, 0x51, 0x19, 0x56, 0x9d, 0x1e, 0x73, - 0xe6, 0x9a, 0x59, 0xe5, 0x35, 0x73, 0x9f, 0x03, 0x99, 0x2e, 0xa7, 0x65, 0x3d, 0x84, 0xd6, 0xa8, - 0x1c, 0xd0, 0x7d, 0xba, 0x5d, 0xbb, 0x31, 0x7e, 0x15, 0xeb, 0xfe, 0x6a, 0xc0, 0xb2, 0x02, 0x3c, - 0x0f, 0x04, 0x4d, 0xa3, 0x20, 0x3d, 0xf9, 0x5f, 0x77, 0xb0, 0x0a, 0xcd, 0xd3, 0xbc, 0x80, 0xd4, - 0x6e, 0xf9, 0x63, 0x07, 0x76, 0x00, 0x43, 0x5e, 0x10, 0x3c, 0x8d, 0x83, 0xc3, 0x01, 0x3d, 0x92, - 0x6f, 0xa4, 0x86, 0x5f, 0x13, 0xc9, 0xde, 0x25, 0x09, 0x3b, 0x2a, 0xdc, 0x9c, 0x2c, 0x38, 0x56, - 0xf6, 0x2e, 0xa9, 0x38, 0xdd, 0xef, 0xc1, 0x2e, 0x9a, 0x52, 0xb8, 0x8b, 0xb6, 0x7c, 0x0a, 0xcb, - 0xa3, 0x6a, 0x48, 0x8f, 0xed, 0x6e, 0xb5, 0x31, 0xe3, 0xcc, 0x49, 0xbc, 0xfb, 0x25, 0xdc, 0xab, - 0x23, 0xb8, 0xd6, 0x5a, 0xd5, 0x75, 0xc9, 0x3d, 0x86, 0xd7, 0x77, 0xa9, 0x38, 0xd0, 0xb0, 0xf1, - 0x4f, 0x29, 0x44, 0xef, 0x02, 0x26, 0x53, 0x51, 0x3d, 0xd0, 0x99, 0xba, 0x6b, 0x52, 0xdc, 0x6d, - 0x58, 0x9d, 0xc1, 0x74, 0xa5, 0xfa, 0xcd, 0xdf, 0x17, 0xa0, 0xf5, 0x99, 0xa4, 0xfa, 0x8a, 0xa6, - 0xa3, 0xb0, 0x4b, 0x51, 0xc0, 0x52, 0xe9, 0x36, 0xd1, 0xce, 0x95, 0x4c, 0x9f, 0xb8, 0x7d, 0xbf, - 0x36, 0xa6, 0x7e, 0x9e, 0xfb, 0xe0, 0xe5, 0x9f, 0xff, 0xfc, 0x6c, 0xbe, 0x83, 0xeb, 0xf2, 0xab, - 0x39, 0xfa, 0xd0, 0xcb, 0x39, 0xb9, 0xf7, 0x22, 0x7f, 0x3c, 0xf7, 0xf4, 0x31, 0xe3, 0x29, 0x34, - 0x8b, 0x23, 0x44, 0x92, 0xd7, 0x9d, 0xbc, 0x6f, 0xfb, 0x5e, 0x4d, 0x44, 0xf3, 0x7d, 0x24, 0xf9, - 0x3c, 0x7c, 0xff, 0x3a, 0x7c, 0xde, 0x0b, 0xf5, 0x70, 0x8e, 0x17, 0x86, 0x7c, 0x8d, 0x54, 0xbf, - 0x0e, 0x6f, 0x94, 0x68, 0xea, 0xee, 0xda, 0x76, 0x66, 0x03, 0xb4, 0x9c, 0x4f, 0xa4, 0x9c, 0x2d, - 0xfc, 0x78, 0xae, 0x9c, 0x6c, 0x57, 0xc2, 0x6e, 0xe6, 0x53, 0x5b, 0x73, 0xee, 0x45, 0x5a, 0xc2, - 0x2f, 0x06, 0xe0, 0xf4, 0x42, 0xe2, 0x9b, 0x53, 0xc4, 0x93, 0xcb, 0x6a, 0xbb, 0xf3, 0x20, 0x5a, - 0xdd, 0x63, 0xa9, 0x6e, 0x1b, 0xb7, 0xfe, 0xa3, 0xba, 0xf1, 0x99, 0x5f, 0x18, 0x70, 0xbb, 0x76, - 0xeb, 0x70, 0xbd, 0xc4, 0x3f, 0x73, 0x29, 0xed, 0xb7, 0xaf, 0x40, 0x69, 0xa1, 0x9e, 0x14, 0xfa, - 0x1e, 0xbe, 0x3b, 0x57, 0x68, 0x21, 0x8b, 0xef, 0x3c, 0xfe, 0xed, 0x72, 0xcd, 0xf8, 0xe3, 0x72, - 0xcd, 0xf8, 0xfb, 0x72, 0xcd, 0xf8, 0x6e, 0xb3, 0x1f, 0x8a, 0xe3, 0xe1, 0x61, 0xa7, 0xcb, 0x22, - 0x2f, 0x1e, 0x46, 0x41, 0x92, 0xb2, 0x1f, 0xe4, 0x43, 0x6f, 0xc0, 0x4e, 0xbd, 0xda, 0xff, 0x00, - 0xff, 0x0d, 0x00, 0x00, 0xff, 0xff, 0x7a, 0xb4, 0xd7, 0x74, 0x19, 0x0a, 0x00, 0x00, + // 894 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0x5f, 0x6f, 0xdc, 0x44, + 0x10, 0x97, 0xed, 0x26, 0xbd, 0x4c, 0x38, 0x25, 0x0c, 0xfd, 0xe3, 0xba, 0xe5, 0x30, 0x4b, 0x80, + 0x23, 0x2a, 0x67, 0x88, 0x04, 0x8a, 0x52, 0x89, 0x96, 0x94, 0x36, 0x42, 0x4a, 0x20, 0x32, 0x7f, + 0x2a, 0xf1, 0x82, 0x9c, 0xcb, 0xde, 0xc5, 0xdc, 0xd9, 0x6b, 0xbc, 0x7b, 0x17, 0xa2, 0x2a, 0x2f, + 0xfd, 0x0a, 0xa8, 0x8f, 0x7c, 0x1f, 0x9e, 0x10, 0x12, 0x8f, 0xbc, 0xa0, 0x88, 0x0f, 0x82, 0xbc, + 0xbb, 0xf6, 0xd9, 0x77, 0xbe, 0x4b, 0xe8, 0x53, 0x3c, 0x33, 0xbf, 0x99, 0xdf, 0x6f, 0x77, 0x66, + 0x36, 0x07, 0x24, 0x19, 0xf4, 0xbd, 0x20, 0x09, 0xb9, 0x97, 0xa4, 0x4c, 0x30, 0xef, 0x38, 0xa0, + 0x11, 0x8b, 0xf5, 0x9f, 0x8e, 0xf4, 0xe1, 0xb2, 0xb2, 0x9c, 0x7b, 0x7d, 0xc6, 0xfa, 0x43, 0x9a, + 0xc1, 0xbd, 0x20, 0x8e, 0x99, 0x08, 0x44, 0xc8, 0x62, 0xae, 0x50, 0xce, 0x5d, 0x1d, 0x95, 0xd6, + 0xd1, 0xa8, 0xe7, 0xd1, 0x28, 0x11, 0x67, 0x2a, 0x48, 0x5e, 0x58, 0x00, 0xbb, 0xa3, 0x5e, 0x8f, + 0xa6, 0x5f, 0xc6, 0x3d, 0x86, 0x0e, 0x34, 0x92, 0x30, 0xa1, 0xc3, 0x30, 0xa6, 0xb6, 0xe1, 0x9a, + 0xed, 0x15, 0xbf, 0xb0, 0xb1, 0x05, 0xd0, 0x4b, 0x59, 0xf4, 0x3d, 0x4d, 0x05, 0xfd, 0xc5, 0x36, + 0x65, 0xb4, 0xe4, 0xc9, 0x72, 0x05, 0xd3, 0x51, 0x4b, 0xe5, 0xe6, 0x76, 0x96, 0x7b, 0x24, 0x59, + 0xbe, 0x0a, 0x22, 0x6a, 0x5f, 0x53, 0xb9, 0x13, 0x0f, 0x12, 0x78, 0x2d, 0xa1, 0xf1, 0x71, 0x18, + 0xf7, 0x1f, 0xb3, 0x51, 0x2c, 0xec, 0x25, 0xd7, 0x6c, 0x5b, 0x7e, 0xc5, 0x87, 0x6d, 0x58, 0x0b, + 0xba, 0x83, 0xc3, 0x32, 0x6c, 0x59, 0xc2, 0xa6, 0xdd, 0xb8, 0x01, 0x4d, 0xc1, 0x44, 0x30, 0x3c, + 0xa0, 0x9c, 0x07, 0x7d, 0xca, 0xed, 0xeb, 0x12, 0x57, 0x75, 0x66, 0x9c, 0x4a, 0xc1, 0x3e, 0x8d, + 0xfb, 0xe2, 0xc4, 0x6e, 0x28, 0xce, 0xb2, 0x0f, 0x37, 0x61, 0x5d, 0xd9, 0xdf, 0x65, 0x39, 0xfb, + 0x61, 0x14, 0x0a, 0x7b, 0xc5, 0x35, 0xdb, 0x86, 0x3f, 0xe3, 0x47, 0x17, 0x56, 0x4b, 0x3e, 0x1b, + 0x24, 0xac, 0xec, 0xc2, 0x5b, 0xb0, 0x1c, 0xf2, 0xa7, 0xa3, 0xe1, 0xd0, 0x5e, 0x75, 0xcd, 0x76, + 0xc3, 0xd7, 0x16, 0xf9, 0xdb, 0x84, 0xa6, 0xba, 0xa8, 0x03, 0x2a, 0xd2, 0xb0, 0xcb, 0x17, 0xf6, + 0xe1, 0x16, 0x2c, 0x8f, 0xcb, 0x3d, 0xd0, 0x16, 0x7e, 0x0b, 0x6b, 0x49, 0xca, 0xba, 0x94, 0xf3, + 0x30, 0xee, 0xfb, 0x81, 0xa0, 0xdc, 0xb6, 0x5c, 0xab, 0xbd, 0xba, 0xb5, 0xd9, 0xd1, 0x53, 0x53, + 0xe1, 0xe8, 0x1c, 0x56, 0xc1, 0x4f, 0x62, 0x91, 0x9e, 0xf9, 0xd3, 0x25, 0xf0, 0x21, 0x34, 0x74, + 0x17, 0xb8, 0x7d, 0x4d, 0x96, 0x7b, 0x67, 0x4e, 0x39, 0x8d, 0x52, 0x75, 0x8a, 0x24, 0x67, 0x17, + 0x6e, 0xd4, 0x31, 0xe1, 0x3a, 0x58, 0x03, 0x7a, 0x66, 0x1b, 0xae, 0xd1, 0x5e, 0xf1, 0xb3, 0x4f, + 0xbc, 0x01, 0x4b, 0xe3, 0x60, 0x38, 0xa2, 0xb6, 0xe9, 0x1a, 0x6d, 0xc3, 0x57, 0xc6, 0x8e, 0xb9, + 0x6d, 0x38, 0x0f, 0xa0, 0x59, 0x29, 0x7f, 0x59, 0xb2, 0x55, 0x4a, 0x26, 0x1f, 0x01, 0xee, 0x87, + 0x5c, 0xa8, 0x29, 0xe7, 0x3e, 0xfd, 0x79, 0x44, 0xb9, 0x58, 0x74, 0xc3, 0xe4, 0x31, 0xbc, 0x51, + 0xc9, 0xe0, 0x09, 0x8b, 0x39, 0xc5, 0xfb, 0x70, 0x5d, 0x75, 0x93, 0xdb, 0x86, 0xbc, 0x09, 0xcc, + 0x6f, 0x62, 0xb2, 0x41, 0x7e, 0x0e, 0x21, 0x4f, 0x61, 0x7d, 0x8f, 0xea, 0x1a, 0x57, 0x20, 0xcd, + 0xda, 0xaa, 0x52, 0xf3, 0xb6, 0x2a, 0x8b, 0x3c, 0x84, 0xd7, 0x4b, 0x75, 0xb4, 0x94, 0xcd, 0x02, + 0x9c, 0x95, 0xa9, 0x57, 0x92, 0x17, 0x38, 0x80, 0xdb, 0x7b, 0x54, 0x54, 0x9a, 0x55, 0xa7, 0xc7, + 0x9c, 0x3b, 0x66, 0x56, 0x79, 0xcc, 0xc8, 0x33, 0xb0, 0x67, 0xcb, 0x69, 0x59, 0x0f, 0xa0, 0x39, + 0x2e, 0x07, 0xf4, 0x3d, 0xdd, 0xac, 0x9d, 0x18, 0xbf, 0x8a, 0x25, 0x2f, 0x0d, 0x58, 0x53, 0x80, + 0x67, 0x81, 0xa0, 0x69, 0x14, 0xa4, 0x83, 0x57, 0xda, 0x83, 0x16, 0xc0, 0x69, 0x5e, 0x40, 0xad, + 0x80, 0xe5, 0x97, 0x3c, 0xd8, 0x01, 0x0c, 0x79, 0x41, 0xf1, 0x24, 0x0e, 0x8e, 0x86, 0xf4, 0x58, + 0xbe, 0x49, 0x0d, 0xbf, 0x26, 0x42, 0x7e, 0x04, 0xa7, 0x38, 0x70, 0x11, 0x2c, 0x8e, 0xfc, 0x39, + 0xac, 0x8d, 0xab, 0x21, 0xdd, 0x92, 0xdb, 0xd5, 0x43, 0x4f, 0x32, 0xa7, 0xf1, 0xe4, 0x6b, 0xb8, + 0x53, 0x47, 0x70, 0xa5, 0x91, 0xa9, 0xbb, 0x01, 0x72, 0x02, 0x6f, 0xee, 0x51, 0x71, 0xa8, 0x61, + 0x45, 0xc9, 0x49, 0x9f, 0xf6, 0x00, 0x93, 0x99, 0xa8, 0x6e, 0xd6, 0x5c, 0xdd, 0x35, 0x29, 0x64, + 0x07, 0xee, 0xcd, 0x61, 0xba, 0x54, 0xfd, 0xd6, 0x1f, 0x4b, 0xd0, 0xfc, 0x42, 0x52, 0x7d, 0x43, + 0xd3, 0x71, 0xd8, 0xa5, 0x28, 0x60, 0xb5, 0xb4, 0x77, 0xe8, 0xe4, 0x4a, 0x66, 0xd7, 0xd7, 0xb9, + 0x5b, 0x1b, 0x53, 0xc7, 0x23, 0xf7, 0x5f, 0xfc, 0xf5, 0xef, 0xaf, 0xe6, 0x7b, 0xb8, 0x21, 0xff, + 0x23, 0x8e, 0x3f, 0xf6, 0x72, 0x4e, 0xee, 0x3d, 0xcf, 0x3f, 0xcf, 0x3d, 0xbd, 0xa8, 0x78, 0x0a, + 0x2b, 0xc5, 0x82, 0xa1, 0x9d, 0xd7, 0x9d, 0xde, 0x5d, 0xe7, 0x4e, 0x4d, 0x44, 0xf3, 0x7d, 0x22, + 0xf9, 0x3c, 0xfc, 0xf0, 0x2a, 0x7c, 0xde, 0x73, 0xf5, 0x71, 0x8e, 0x2f, 0x0d, 0xf9, 0x44, 0x54, + 0x5f, 0xfe, 0xb7, 0x4a, 0x34, 0x75, 0x3b, 0xeb, 0xb8, 0xf3, 0x01, 0x5a, 0xce, 0x67, 0x52, 0xce, + 0x36, 0x7e, 0xba, 0x50, 0x4e, 0x36, 0x2b, 0x61, 0x37, 0xf3, 0xa9, 0xa9, 0x39, 0xf7, 0x22, 0x2d, + 0xe1, 0x37, 0x03, 0x70, 0x76, 0x20, 0xf1, 0xed, 0x19, 0xe2, 0xe9, 0x61, 0x75, 0xc8, 0x22, 0x88, + 0x56, 0xf7, 0x48, 0xaa, 0xdb, 0xc1, 0xed, 0xff, 0xa9, 0xae, 0xd8, 0xe0, 0xec, 0xde, 0x6e, 0xd6, + 0x4e, 0x1d, 0x6e, 0x94, 0xf8, 0xe7, 0x0e, 0xa5, 0xf3, 0xee, 0x25, 0x28, 0x2d, 0xd4, 0x93, 0x42, + 0x3f, 0xc0, 0xf7, 0x17, 0x0a, 0x9d, 0x3c, 0x2c, 0xbb, 0x8f, 0x7e, 0xbf, 0x68, 0x19, 0x7f, 0x5e, + 0xb4, 0x8c, 0x7f, 0x2e, 0x5a, 0xc6, 0x0f, 0x5b, 0xfd, 0x50, 0x9c, 0x8c, 0x8e, 0x3a, 0x5d, 0x16, + 0x79, 0xf1, 0x28, 0x0a, 0x92, 0x94, 0xfd, 0x24, 0x3f, 0x7a, 0x43, 0x76, 0xea, 0xd5, 0xfe, 0xba, + 0xfb, 0x2f, 0x00, 0x00, 0xff, 0xff, 0xdb, 0xe7, 0xd0, 0x32, 0xf5, 0x09, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1591,13 +1582,6 @@ func (m *VertexWatermark) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } - if len(m.PodWatermarks) > 0 { - for iNdEx := len(m.PodWatermarks) - 1; iNdEx >= 0; iNdEx-- { - i = encodeVarintDaemon(dAtA, i, uint64(m.PodWatermarks[iNdEx])) - i-- - dAtA[i] = 0x28 - } - } if m.IsWatermarkEnabled == nil { return 0, github_com_gogo_protobuf_proto.NewRequiredNotSetError("isWatermarkEnabled") } else { @@ -1610,12 +1594,12 @@ func (m *VertexWatermark) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x20 } - if m.Watermark == nil { - return 0, github_com_gogo_protobuf_proto.NewRequiredNotSetError("watermark") - } else { - i = encodeVarintDaemon(dAtA, i, uint64(*m.Watermark)) - i-- - dAtA[i] = 0x18 + if len(m.Watermarks) > 0 { + for iNdEx := len(m.Watermarks) - 1; iNdEx >= 0; iNdEx-- { + i = encodeVarintDaemon(dAtA, i, uint64(m.Watermarks[iNdEx])) + i-- + dAtA[i] = 0x18 + } } if m.Vertex == nil { return 0, github_com_gogo_protobuf_proto.NewRequiredNotSetError("vertex") @@ -2019,17 +2003,14 @@ func (m *VertexWatermark) Size() (n int) { l = len(*m.Vertex) n += 1 + l + sovDaemon(uint64(l)) } - if m.Watermark != nil { - n += 1 + sovDaemon(uint64(*m.Watermark)) + if len(m.Watermarks) > 0 { + for _, e := range m.Watermarks { + n += 1 + sovDaemon(uint64(e)) + } } if m.IsWatermarkEnabled != nil { n += 2 } - if len(m.PodWatermarks) > 0 { - for _, e := range m.PodWatermarks { - n += 1 + sovDaemon(uint64(e)) - } - } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -3512,49 +3493,6 @@ func (m *VertexWatermark) Unmarshal(dAtA []byte) error { iNdEx = postIndex hasFields[0] |= uint64(0x00000002) case 3: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Watermark", wireType) - } - var v int64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowDaemon - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - v |= int64(b&0x7F) << shift - if b < 0x80 { - break - } - } - m.Watermark = &v - hasFields[0] |= uint64(0x00000004) - case 4: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field IsWatermarkEnabled", wireType) - } - var v int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowDaemon - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - v |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - b := bool(v != 0) - m.IsWatermarkEnabled = &b - hasFields[0] |= uint64(0x00000008) - case 5: if wireType == 0 { var v int64 for shift := uint(0); ; shift += 7 { @@ -3571,7 +3509,7 @@ func (m *VertexWatermark) Unmarshal(dAtA []byte) error { break } } - m.PodWatermarks = append(m.PodWatermarks, v) + m.Watermarks = append(m.Watermarks, v) } else if wireType == 2 { var packedLen int for shift := uint(0); ; shift += 7 { @@ -3606,8 +3544,8 @@ func (m *VertexWatermark) Unmarshal(dAtA []byte) error { } } elementCount = count - if elementCount != 0 && len(m.PodWatermarks) == 0 { - m.PodWatermarks = make([]int64, 0, elementCount) + if elementCount != 0 && len(m.Watermarks) == 0 { + m.Watermarks = make([]int64, 0, elementCount) } for iNdEx < postIndex { var v int64 @@ -3625,11 +3563,33 @@ func (m *VertexWatermark) Unmarshal(dAtA []byte) error { break } } - m.PodWatermarks = append(m.PodWatermarks, v) + m.Watermarks = append(m.Watermarks, v) } } else { - return fmt.Errorf("proto: wrong wireType = %d for field PodWatermarks", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Watermarks", wireType) + } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field IsWatermarkEnabled", wireType) } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDaemon + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + b := bool(v != 0) + m.IsWatermarkEnabled = &b + hasFields[0] |= uint64(0x00000004) default: iNdEx = preIndex skippy, err := skipDaemon(dAtA[iNdEx:]) @@ -3653,9 +3613,6 @@ func (m *VertexWatermark) Unmarshal(dAtA []byte) error { return github_com_gogo_protobuf_proto.NewRequiredNotSetError("vertex") } if hasFields[0]&uint64(0x00000004) == 0 { - return github_com_gogo_protobuf_proto.NewRequiredNotSetError("watermark") - } - if hasFields[0]&uint64(0x00000008) == 0 { return github_com_gogo_protobuf_proto.NewRequiredNotSetError("isWatermarkEnabled") } diff --git a/pkg/apis/proto/daemon/daemon.proto b/pkg/apis/proto/daemon/daemon.proto index 6850fc43b7..ae7d64b4ef 100644 --- a/pkg/apis/proto/daemon/daemon.proto +++ b/pkg/apis/proto/daemon/daemon.proto @@ -78,9 +78,8 @@ message GetVertexMetricsResponse { message VertexWatermark { required string pipeline = 1; required string vertex = 2; - required int64 watermark = 3; + repeated int64 watermarks = 3; required bool isWatermarkEnabled = 4; - repeated int64 podWatermarks = 5; } message GetVertexWatermarkResponse { diff --git a/pkg/daemon/server/service/pipeline_metrics_query.go b/pkg/daemon/server/service/pipeline_metrics_query.go index 19a6e6a930..71caf2a770 100644 --- a/pkg/daemon/server/service/pipeline_metrics_query.go +++ b/pkg/daemon/server/service/pipeline_metrics_query.go @@ -158,6 +158,9 @@ func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daem } podNum := int64(1) // for now only reduce has parallelism might have to modify later + // checking parallelism for a vertex to identify reduce vertex + // replicas will have parallelism for reduce vertex else will be nil + // parallelism indicates replica count ~ multiple pods for a vertex here obj := ps.pipeline.GetFromEdges(req.GetVertex()) if len(obj) > 0 && obj[0].Parallelism != nil { podNum = int64(*obj[0].Parallelism) diff --git a/pkg/daemon/server/service/pipeline_watermark_query.go b/pkg/daemon/server/service/pipeline_watermark_query.go index 9afb2f5eb0..f4950b13e5 100644 --- a/pkg/daemon/server/service/pipeline_watermark_query.go +++ b/pkg/daemon/server/service/pipeline_watermark_query.go @@ -20,6 +20,9 @@ package service import ( "context" "fmt" + "sort" + "strconv" + "strings" "time" "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" @@ -41,13 +44,17 @@ func GetVertexWatermarkFetchers(ctx context.Context, pipeline *v1alpha1.Pipeline } for _, vertex := range pipeline.Spec.Vertices { + // key for fetcher map ~ vertexName/replicas + replicas := int64(1) if vertex.Sink != nil { toBufferName := v1alpha1.GenerateSinkBufferName(pipeline.Namespace, pipeline.Name, vertex.Name) wmFetcher, err := isbSvcClient.CreateWatermarkFetcher(ctx, toBufferName) if err != nil { return nil, fmt.Errorf("failed to create watermark fetcher, %w", err) } - wmFetchers[vertex.Name] = []fetch.Fetcher{wmFetcher} + // for now only reduce has parallelism so not updating replicas for sink for now as done below + fetchersKey := vertex.Name + "/" + strconv.FormatInt(replicas, 10) + wmFetchers[fetchersKey] = []fetch.Fetcher{wmFetcher} } else { // If the vertex is not a sink, to fetch the watermark, we consult all out edges and grab the latest watermark among them. var wmFetcherList []fetch.Fetcher @@ -61,7 +68,16 @@ func GetVertexWatermarkFetchers(ctx context.Context, pipeline *v1alpha1.Pipeline wmFetcherList = append(wmFetcherList, fetchWatermark) } } - wmFetchers[vertex.Name] = wmFetcherList + // for now only reduce has parallelism might have to modify later + // checking parallelism for a vertex to identify reduce vertex + // replicas will have parallelism for reduce vertex else will be nil + // parallelism indicates replica count ~ multiple pods for a vertex here + obj := pipeline.GetFromEdges(vertex.Name) + if len(obj) > 0 && obj[0].Parallelism != nil { + replicas = int64(*obj[0].Parallelism) + } + fetchersKey := vertex.Name + "/" + strconv.FormatInt(replicas, 10) + wmFetchers[fetchersKey] = wmFetcherList } } return wmFetchers, nil @@ -73,14 +89,27 @@ func (ps *pipelineMetadataQuery) GetVertexWatermark(ctx context.Context, request resp := new(daemon.GetVertexWatermarkResponse) vertexName := request.GetVertex() isWatermarkEnabled := !ps.pipeline.Spec.Watermark.Disabled + // for now only reduce has parallelism might have to modify later + // checking parallelism for a vertex to identify reduce vertex + // parallelism is only supported by reduce vertex for now else will be nil + // parallelism indicates replica count ~ multiple pods for a vertex here + replicas := int64(1) + obj := ps.pipeline.GetFromEdges(vertexName) + if len(obj) > 0 && obj[0].Parallelism != nil { + replicas = int64(*obj[0].Parallelism) + } // If watermark is not enabled, return time zero if ps.pipeline.Spec.Watermark.Disabled { timeZero := time.Unix(0, 0).UnixMilli() + watermarks := make([]int64, replicas) + for idx := range watermarks { + watermarks[idx] = timeZero + } v := &daemon.VertexWatermark{ Pipeline: &ps.pipeline.Name, Vertex: request.Vertex, - Watermark: &timeZero, + Watermarks: watermarks, IsWatermarkEnabled: &isWatermarkEnabled, } resp.VertexWatermark = v @@ -88,7 +117,8 @@ func (ps *pipelineMetadataQuery) GetVertexWatermark(ctx context.Context, request } // Watermark is enabled - vertexFetchers, ok := ps.watermarkFetchers[vertexName] + fetchersKey := vertexName + "/" + strconv.FormatInt(replicas, 10) + vertexFetchers, ok := ps.watermarkFetchers[fetchersKey] // Vertex not found if !ok { @@ -97,17 +127,24 @@ func (ps *pipelineMetadataQuery) GetVertexWatermark(ctx context.Context, request } var latestWatermark = int64(-1) + var latestWatermarks []processor.Watermark for _, fetcher := range vertexFetchers { - watermark := fetcher.GetHeadWatermark().UnixMilli() - if watermark > latestWatermark { + watermarks := fetcher.GetHeadWatermarks() + sort.Slice(watermarks, func(i, j int) bool { return watermarks[i].UnixMilli() > watermarks[j].UnixMilli() }) + watermark := watermarks[0].UnixMilli() + if watermark >= latestWatermark { latestWatermark = watermark + latestWatermarks = watermarks } } - + var watermarks []int64 + for _, v := range latestWatermarks { + watermarks = append(watermarks, v.UnixMilli()) + } v := &daemon.VertexWatermark{ Pipeline: &ps.pipeline.Name, Vertex: request.Vertex, - Watermark: &latestWatermark, + Watermarks: watermarks, IsWatermarkEnabled: &isWatermarkEnabled, } resp.VertexWatermark = v @@ -125,13 +162,17 @@ func (ps *pipelineMetadataQuery) GetPipelineWatermarks(ctx context.Context, requ watermarkArr := make([]*daemon.VertexWatermark, len(ps.watermarkFetchers)) i := 0 for k := range ps.watermarkFetchers { - vertexName := k + vertexName := strings.Split(k, "/")[0] + replicas, _ := strconv.ParseInt(strings.Split(k, "/")[1], 10, 64) + watermarks := make([]int64, replicas) + for idx := range watermarks { + watermarks[idx] = timeZero + } watermarkArr[i] = &daemon.VertexWatermark{ Pipeline: &ps.pipeline.Name, Vertex: &vertexName, - Watermark: &timeZero, + Watermarks: watermarks, IsWatermarkEnabled: &isWatermarkEnabled, - PodWatermarks: nil, } i++ } @@ -144,26 +185,26 @@ func (ps *pipelineMetadataQuery) GetPipelineWatermarks(ctx context.Context, requ i := 0 for k, vertexFetchers := range ps.watermarkFetchers { var latestWatermark = int64(-1) - var latestpodWatermarks []processor.Watermark + var latestWatermarks []processor.Watermark for _, fetcher := range vertexFetchers { - watermark := fetcher.GetHeadWatermark().UnixMilli() - podWatermark := fetcher.GetPodWatermarks() - if watermark > latestWatermark { + watermarks := fetcher.GetHeadWatermarks() + sort.Slice(watermarks, func(i, j int) bool { return watermarks[i].UnixMilli() > watermarks[j].UnixMilli() }) + watermark := watermarks[0].UnixMilli() + if watermark >= latestWatermark { latestWatermark = watermark - latestpodWatermarks = podWatermark + latestWatermarks = watermarks } } - vertexName := k - var podWatermarks []int64 - for _, v := range latestpodWatermarks { - podWatermarks = append(podWatermarks, v.UnixMilli()) + vertexName := strings.Split(k, "/")[0] + var watermarks []int64 + for _, v := range latestWatermarks { + watermarks = append(watermarks, v.UnixMilli()) } watermarkArr[i] = &daemon.VertexWatermark{ Pipeline: &ps.pipeline.Name, Vertex: &vertexName, - Watermark: &latestWatermark, + Watermarks: watermarks, IsWatermarkEnabled: &isWatermarkEnabled, - PodWatermarks: podWatermarks, } i++ } diff --git a/pkg/reconciler/vertex/scaling/scaling.go b/pkg/reconciler/vertex/scaling/scaling.go index 1c26da5897..daa7e2acd5 100644 --- a/pkg/reconciler/vertex/scaling/scaling.go +++ b/pkg/reconciler/vertex/scaling/scaling.go @@ -215,31 +215,12 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err return fmt.Errorf("failed to get metrics of vertex key %q, %w", key, err) } // Avg rate and pending for autoscaling are both in the map with key "default", see "pkg/metrics/metrics.go". - var rate float64 - existing := true - // looping over all pods to perform summation of rate values - // also marking as non-existent even when missing in any one pod - for _, v := range vMetrics { - val, exist := v.ProcessingRates["default"] - if !exist { - existing = false - } - rate += val - } + rate, existing := vMetrics[0].ProcessingRates["default"] if !existing || rate < 0 || rate == isb.RateNotAvailable { // Rate not available log.Debugf("Vertex %s has no rate information, skip scaling", vertex.Name) return nil } - - var pending int64 - existing = true - for _, v := range vMetrics { - val, exist := v.Pendings["default"] - if !exist { - existing = false - } - pending += val - } + pending, existing := vMetrics[0].Pendings["default"] if !existing || pending < 0 || pending == isb.PendingNotAvailable { // Pending not available, we don't do anything log.Debugf("Vertex %s has no pending messages information, skip scaling", vertex.Name) diff --git a/pkg/reduce/data_forward_test.go b/pkg/reduce/data_forward_test.go index 6f4d7a95b3..885121b231 100644 --- a/pkg/reduce/data_forward_test.go +++ b/pkg/reduce/data_forward_test.go @@ -91,11 +91,7 @@ func (e *EventTypeWMProgressor) GetWatermark(offset isb.Offset) processor.Waterm return e.watermarks[offset.String()] } -func (e *EventTypeWMProgressor) GetHeadWatermark() processor.Watermark { - return processor.Watermark{} -} - -func (e *EventTypeWMProgressor) GetPodWatermarks() []processor.Watermark { +func (e *EventTypeWMProgressor) GetHeadWatermarks() []processor.Watermark { return []processor.Watermark{} } diff --git a/pkg/watermark/fetch/edge_fetcher.go b/pkg/watermark/fetch/edge_fetcher.go index 034379a330..aabbf0ca38 100644 --- a/pkg/watermark/fetch/edge_fetcher.go +++ b/pkg/watermark/fetch/edge_fetcher.go @@ -53,44 +53,18 @@ func NewEdgeFetcher(ctx context.Context, bufferName string, storeWatcher store.W } } -// GetHeadWatermark returns the watermark using the HeadOffset (the latest offset among all processors). This +// GetHeadWatermarks returns the watermark using the HeadOffset (the latest offset among all processors). This // can be used in showing the watermark progression for a vertex when not consuming the messages // directly (eg. UX, tests) // NOTE // - We don't use this function in the regular pods in the vertex. // - UX only uses GetHeadWatermark, so the `p.IsDeleted()` check in the GetWatermark never happens. // Meaning, in the UX (daemon service) we never delete any processor. -func (e *edgeFetcher) GetHeadWatermark() processor.Watermark { +func (e *edgeFetcher) GetHeadWatermarks() []processor.Watermark { var debugString strings.Builder - var headOffset int64 = math.MinInt64 - var epoch int64 = math.MaxInt64 + var headWatermarks []processor.Watermark var allProcessors = e.processorManager.GetAllProcessors() // get the head offset of each processor - for _, p := range allProcessors { - if !p.IsActive() { - continue - } - var o = p.offsetTimeline.GetHeadOffset() - e.log.Debugf("Processor: %v (headoffset:%d)", p, o) - debugString.WriteString(fmt.Sprintf("[Processor:%v] (headoffset:%d) \n", p, o)) - if o != -1 && o > headOffset { - headOffset = o - epoch = p.offsetTimeline.GetEventTimeFromInt64(o) - } - } - e.log.Debugf("GetHeadWatermark: %s", debugString.String()) - if epoch == math.MaxInt64 { - // Use -1 as default watermark value to indicate there is no valid watermark yet. - return processor.Watermark(time.UnixMilli(-1)) - } - return processor.Watermark(time.UnixMilli(epoch)) -} - -// GetPodWatermarks gets the watermarks for all pods of a vertex -func (e *edgeFetcher) GetPodWatermarks() []processor.Watermark { - var debugString strings.Builder - var allProcessors = e.processorManager.GetAllProcessors() - var podWatermarks []processor.Watermark for _, p := range allProcessors { if !p.IsActive() { continue @@ -104,12 +78,12 @@ func (e *edgeFetcher) GetPodWatermarks() []processor.Watermark { } if epoch == math.MaxInt64 { // Use -1 as default watermark value to indicate there is no valid watermark yet. - podWatermarks = append(podWatermarks, processor.Watermark(time.UnixMilli(-1))) + headWatermarks = append(headWatermarks, processor.Watermark(time.UnixMilli(-1))) } else { - podWatermarks = append(podWatermarks, processor.Watermark(time.UnixMilli(epoch))) + headWatermarks = append(headWatermarks, processor.Watermark(time.UnixMilli(epoch))) } } - return podWatermarks + return headWatermarks } // GetWatermark gets the smallest timestamp for the given offset diff --git a/pkg/watermark/fetch/edge_fetcher_test.go b/pkg/watermark/fetch/edge_fetcher_test.go index a3307d9d08..4f52f8d210 100644 --- a/pkg/watermark/fetch/edge_fetcher_test.go +++ b/pkg/watermark/fetch/edge_fetcher_test.go @@ -22,10 +22,11 @@ import ( "testing" "time" - "github.com/numaproj/numaflow/pkg/watermark/store/noop" "github.com/stretchr/testify/assert" "go.uber.org/zap/zaptest" + "github.com/numaproj/numaflow/pkg/watermark/store/noop" + "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/watermark/processor" "github.com/numaproj/numaflow/pkg/watermark/store" @@ -142,7 +143,7 @@ func TestBuffer_GetWatermark(t *testing.T) { t.Errorf("GetWatermark() = %v, want %v", got, processor.Watermark(time.UnixMilli(tt.want))) } // this will always be 14 because the timeline has been populated ahead of time - assert.Equal(t, time.Time(b.GetHeadWatermark()).In(location), time.UnixMilli(14).In(location)) + assert.Equal(t, time.Time(b.GetHeadWatermarks()[len(b.GetHeadWatermarks())-1]).In(location), time.UnixMilli(14).In(location)) }) } } diff --git a/pkg/watermark/fetch/interface.go b/pkg/watermark/fetch/interface.go index 2ea227e062..3ad0fa9e79 100644 --- a/pkg/watermark/fetch/interface.go +++ b/pkg/watermark/fetch/interface.go @@ -28,8 +28,6 @@ type Fetcher interface { io.Closer // GetWatermark returns the inorder monotonically increasing watermark of the edge connected to Vn-1. GetWatermark(offset isb.Offset) processor.Watermark - // GetHeadWatermark returns the latest watermark based on the head offset - GetHeadWatermark() processor.Watermark - // GetPodWatermarks returns the watermarks for all the pods in a vertex - GetPodWatermarks() []processor.Watermark + // GetHeadWatermarks returns the latest watermark based on the head offset + GetHeadWatermarks() []processor.Watermark } diff --git a/pkg/watermark/fetch/source_fetcher.go b/pkg/watermark/fetch/source_fetcher.go index 58dcfb9163..029a027e79 100644 --- a/pkg/watermark/fetch/source_fetcher.go +++ b/pkg/watermark/fetch/source_fetcher.go @@ -52,27 +52,9 @@ func NewSourceFetcher(ctx context.Context, sourceBufferName string, storeWatcher } } -// GetHeadWatermark returns the latest watermark of all the processors. -func (e *sourceFetcher) GetHeadWatermark() processor.Watermark { - var epoch int64 = math.MinInt64 - for _, p := range e.processorManager.GetAllProcessors() { - if !p.IsActive() { - continue - } - if p.offsetTimeline.GetHeadWatermark() > epoch { - epoch = p.offsetTimeline.GetHeadWatermark() - } - } - if epoch == math.MinInt64 { - // Use -1 as default watermark value to indicate there is no valid watermark yet. - return processor.Watermark(time.UnixMilli(-1)) - } - return processor.Watermark(time.UnixMilli(epoch)) -} - -// GetPodWatermarks returns the list of watermarks of all pods in a vertex -func (e *sourceFetcher) GetPodWatermarks() []processor.Watermark { - var podWatermarks []processor.Watermark +// GetHeadWatermarks returns the latest watermark of all the processors. +func (e *sourceFetcher) GetHeadWatermarks() []processor.Watermark { + var headWatermarks []processor.Watermark for _, p := range e.processorManager.GetAllProcessors() { if !p.IsActive() { continue @@ -83,12 +65,12 @@ func (e *sourceFetcher) GetPodWatermarks() []processor.Watermark { } if epoch == math.MinInt64 { // Use -1 as default watermark value to indicate there is no valid watermark yet. - podWatermarks = append(podWatermarks, processor.Watermark(time.UnixMilli(-1))) + headWatermarks = append(headWatermarks, processor.Watermark(time.UnixMilli(-1))) } else { - podWatermarks = append(podWatermarks, processor.Watermark(time.UnixMilli(epoch))) + headWatermarks = append(headWatermarks, processor.Watermark(time.UnixMilli(epoch))) } } - return podWatermarks + return headWatermarks } // GetWatermark returns the lowest of the latest watermark of all the processors, diff --git a/pkg/watermark/generic/noop.go b/pkg/watermark/generic/noop.go index 222d602c25..f35ecf6640 100644 --- a/pkg/watermark/generic/noop.go +++ b/pkg/watermark/generic/noop.go @@ -51,12 +51,8 @@ func (n NoOpWMProgressor) GetLatestWatermark() processor.Watermark { return processor.Watermark{} } -// GetHeadWatermark returns the default head watermark. -func (n NoOpWMProgressor) GetHeadWatermark() processor.Watermark { - return processor.Watermark{} -} - -func (n NoOpWMProgressor) GetPodWatermarks() []processor.Watermark { +// GetHeadWatermarks returns the default head watermark. +func (n NoOpWMProgressor) GetHeadWatermarks() []processor.Watermark { return []processor.Watermark{} } diff --git a/ui/src/components/pipeline/Pipeline.tsx b/ui/src/components/pipeline/Pipeline.tsx index ffebf52fb1..782a63d736 100644 --- a/ui/src/components/pipeline/Pipeline.tsx +++ b/ui/src/components/pipeline/Pipeline.tsx @@ -140,11 +140,11 @@ export function Pipeline() { json.map((vertex) => { const vertexWatermark = {} as VertexWatermark; vertexWatermark.isWaterMarkEnabled = vertex["isWatermarkEnabled"]; - vertexWatermark.watermark = vertex["watermark"]; + vertexWatermark.watermark = vertex["watermarks"][0]; vertexWatermark.watermarkLocalTime = new Date( vertexWatermark.watermark ).toISOString(); - vertexWatermark.podWatermarks = vertex["podWatermarks"]; + vertexWatermark.podWatermarks = vertex["watermarks"]; vertexToWatermarkMap.set(vertex.vertex, vertexWatermark); }) }) diff --git a/ui/src/components/pipeline/nodeinfo/NodeInfo.tsx b/ui/src/components/pipeline/nodeinfo/NodeInfo.tsx index 64b0fdb2f0..619011a214 100644 --- a/ui/src/components/pipeline/nodeinfo/NodeInfo.tsx +++ b/ui/src/components/pipeline/nodeinfo/NodeInfo.tsx @@ -144,13 +144,6 @@ export default function NodeInfo(props: NodeInfoProps) { {podWatermark} ))} - {!node?.data?.vertexWatermark?.podWatermarks && - Array(node?.data?.podnum).fill(0).map((_, idx) => ( - - Pod - {idx} - -1 - - ))}