diff --git a/Makefile b/Makefile index 645a604a7..e66a6a736 100644 --- a/Makefile +++ b/Makefile @@ -114,6 +114,8 @@ $(PROTO_PBS): $(PROTO_SRCS) $(PROTOC) $(PROTO_INCS) \ --gogo_out=plugins=grpc,Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/duration.proto=github.com/gogo/protobuf/types,paths=source_relative:. $$src ; \ done + $(MAKE) fmt + git apply -v proto/patches/*.patch proto-check: $(MAKE) proto diff --git a/internal/storagenode/replication_server.go b/internal/storagenode/replication_server.go index dfa9c776a..49b9f353e 100644 --- a/internal/storagenode/replication_server.go +++ b/internal/storagenode/replication_server.go @@ -93,15 +93,12 @@ type replicationServerTask struct { err error } -func newReplicationServerTask(req snpb.ReplicateRequest, err error) *replicationServerTask { - rst := replicationServerTaskPool.Get().(*replicationServerTask) - rst.req = req - rst.err = err - return rst +func newReplicationServerTask() *replicationServerTask { + return replicationServerTaskPool.Get().(*replicationServerTask) } func (rst *replicationServerTask) release() { - rst.req = snpb.ReplicateRequest{} + rst.req.ResetReuse() rst.err = nil replicationServerTaskPool.Put(rst) } @@ -113,11 +110,10 @@ func (rs *replicationServer) recv(ctx context.Context, stream snpb.Replicator_Re go func() { defer wg.Done() defer close(c) - req := &snpb.ReplicateRequest{} for { - req.Reset() - err := stream.RecvMsg(req) - rst := newReplicationServerTask(*req, err) + rst := newReplicationServerTask() + err := stream.RecvMsg(&rst.req) + rst.err = err select { case c <- rst: if err != nil { diff --git a/proto/patches/snpb_replicator.patch b/proto/patches/snpb_replicator.patch new file mode 100644 index 000000000..a87409c4f --- /dev/null +++ b/proto/patches/snpb_replicator.patch @@ -0,0 +1,545 @@ +diff --git a/proto/snpb/replicator.pb.go b/proto/snpb/replicator.pb.go +index 139a31eb..e5105c63 100644 +--- a/proto/snpb/replicator.pb.go ++++ b/proto/snpb/replicator.pb.go +@@ -10,6 +10,7 @@ import ( + io "io" + math "math" + math_bits "math/bits" ++ "slices" + strconv "strconv" + + _ "github.com/gogo/protobuf/gogoproto" +@@ -24,9 +25,11 @@ import ( + ) + + // Reference imports to suppress errors if they are not otherwise used. +-var _ = proto.Marshal +-var _ = fmt.Errorf +-var _ = math.Inf ++var ( ++ _ = proto.Marshal ++ _ = fmt.Errorf ++ _ = math.Inf ++) + + // This is a compile-time assertion to ensure that this generated file + // is compatible with the proto package it is being compiled against. +@@ -80,9 +83,11 @@ func (*ReplicateRequest) ProtoMessage() {} + func (*ReplicateRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_85705cb817486b63, []int{0} + } ++ + func (m *ReplicateRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) + } ++ + func (m *ReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ReplicateRequest.Marshal(b, m, deterministic) +@@ -95,12 +100,15 @@ func (m *ReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, er + return b[:n], nil + } + } ++ + func (m *ReplicateRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReplicateRequest.Merge(m, src) + } ++ + func (m *ReplicateRequest) XXX_Size() int { + return m.ProtoSize() + } ++ + func (m *ReplicateRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ReplicateRequest.DiscardUnknown(m) + } +@@ -135,8 +143,7 @@ func (m *ReplicateRequest) GetData() [][]byte { + return nil + } + +-type ReplicateResponse struct { +-} ++type ReplicateResponse struct{} + + func (m *ReplicateResponse) Reset() { *m = ReplicateResponse{} } + func (m *ReplicateResponse) String() string { return proto.CompactTextString(m) } +@@ -144,9 +151,11 @@ func (*ReplicateResponse) ProtoMessage() {} + func (*ReplicateResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_85705cb817486b63, []int{1} + } ++ + func (m *ReplicateResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) + } ++ + func (m *ReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ReplicateResponse.Marshal(b, m, deterministic) +@@ -159,12 +168,15 @@ func (m *ReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, e + return b[:n], nil + } + } ++ + func (m *ReplicateResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReplicateResponse.Merge(m, src) + } ++ + func (m *ReplicateResponse) XXX_Size() int { + return m.ProtoSize() + } ++ + func (m *ReplicateResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ReplicateResponse.DiscardUnknown(m) + } +@@ -182,9 +194,11 @@ func (*SyncPosition) ProtoMessage() {} + func (*SyncPosition) Descriptor() ([]byte, []int) { + return fileDescriptor_85705cb817486b63, []int{2} + } ++ + func (m *SyncPosition) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) + } ++ + func (m *SyncPosition) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SyncPosition.Marshal(b, m, deterministic) +@@ -197,12 +211,15 @@ func (m *SyncPosition) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) + return b[:n], nil + } + } ++ + func (m *SyncPosition) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncPosition.Merge(m, src) + } ++ + func (m *SyncPosition) XXX_Size() int { + return m.ProtoSize() + } ++ + func (m *SyncPosition) XXX_DiscardUnknown() { + xxx_messageInfo_SyncPosition.DiscardUnknown(m) + } +@@ -239,9 +256,11 @@ func (*SyncRange) ProtoMessage() {} + func (*SyncRange) Descriptor() ([]byte, []int) { + return fileDescriptor_85705cb817486b63, []int{3} + } ++ + func (m *SyncRange) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) + } ++ + func (m *SyncRange) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SyncRange.Marshal(b, m, deterministic) +@@ -254,12 +273,15 @@ func (m *SyncRange) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return b[:n], nil + } + } ++ + func (m *SyncRange) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncRange.Merge(m, src) + } ++ + func (m *SyncRange) XXX_Size() int { + return m.ProtoSize() + } ++ + func (m *SyncRange) XXX_DiscardUnknown() { + xxx_messageInfo_SyncRange.DiscardUnknown(m) + } +@@ -305,9 +327,11 @@ func (*SyncInitRequest) ProtoMessage() {} + func (*SyncInitRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_85705cb817486b63, []int{4} + } ++ + func (m *SyncInitRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) + } ++ + func (m *SyncInitRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SyncInitRequest.Marshal(b, m, deterministic) +@@ -320,12 +344,15 @@ func (m *SyncInitRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, err + return b[:n], nil + } + } ++ + func (m *SyncInitRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncInitRequest.Merge(m, src) + } ++ + func (m *SyncInitRequest) XXX_Size() int { + return m.ProtoSize() + } ++ + func (m *SyncInitRequest) XXX_DiscardUnknown() { + xxx_messageInfo_SyncInitRequest.DiscardUnknown(m) + } +@@ -381,9 +408,11 @@ func (*SyncInitResponse) ProtoMessage() {} + func (*SyncInitResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_85705cb817486b63, []int{5} + } ++ + func (m *SyncInitResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) + } ++ + func (m *SyncInitResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SyncInitResponse.Marshal(b, m, deterministic) +@@ -396,12 +425,15 @@ func (m *SyncInitResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, er + return b[:n], nil + } + } ++ + func (m *SyncInitResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncInitResponse.Merge(m, src) + } ++ + func (m *SyncInitResponse) XXX_Size() int { + return m.ProtoSize() + } ++ + func (m *SyncInitResponse) XXX_DiscardUnknown() { + xxx_messageInfo_SyncInitResponse.DiscardUnknown(m) + } +@@ -428,9 +460,11 @@ func (*SyncStatus) ProtoMessage() {} + func (*SyncStatus) Descriptor() ([]byte, []int) { + return fileDescriptor_85705cb817486b63, []int{6} + } ++ + func (m *SyncStatus) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) + } ++ + func (m *SyncStatus) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SyncStatus.Marshal(b, m, deterministic) +@@ -443,12 +477,15 @@ func (m *SyncStatus) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return b[:n], nil + } + } ++ + func (m *SyncStatus) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncStatus.Merge(m, src) + } ++ + func (m *SyncStatus) XXX_Size() int { + return m.ProtoSize() + } ++ + func (m *SyncStatus) XXX_DiscardUnknown() { + xxx_messageInfo_SyncStatus.DiscardUnknown(m) + } +@@ -494,9 +531,11 @@ func (*SyncPayload) ProtoMessage() {} + func (*SyncPayload) Descriptor() ([]byte, []int) { + return fileDescriptor_85705cb817486b63, []int{7} + } ++ + func (m *SyncPayload) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) + } ++ + func (m *SyncPayload) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SyncPayload.Marshal(b, m, deterministic) +@@ -509,12 +548,15 @@ func (m *SyncPayload) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) + return b[:n], nil + } + } ++ + func (m *SyncPayload) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncPayload.Merge(m, src) + } ++ + func (m *SyncPayload) XXX_Size() int { + return m.ProtoSize() + } ++ + func (m *SyncPayload) XXX_DiscardUnknown() { + xxx_messageInfo_SyncPayload.DiscardUnknown(m) + } +@@ -548,9 +590,11 @@ func (*SyncReplicateRequest) ProtoMessage() {} + func (*SyncReplicateRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_85705cb817486b63, []int{8} + } ++ + func (m *SyncReplicateRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) + } ++ + func (m *SyncReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SyncReplicateRequest.Marshal(b, m, deterministic) +@@ -563,12 +607,15 @@ func (m *SyncReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte + return b[:n], nil + } + } ++ + func (m *SyncReplicateRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncReplicateRequest.Merge(m, src) + } ++ + func (m *SyncReplicateRequest) XXX_Size() int { + return m.ProtoSize() + } ++ + func (m *SyncReplicateRequest) XXX_DiscardUnknown() { + xxx_messageInfo_SyncReplicateRequest.DiscardUnknown(m) + } +@@ -613,9 +660,11 @@ func (*SyncReplicateResponse) ProtoMessage() {} + func (*SyncReplicateResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_85705cb817486b63, []int{9} + } ++ + func (m *SyncReplicateResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) + } ++ + func (m *SyncReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SyncReplicateResponse.Marshal(b, m, deterministic) +@@ -628,12 +677,15 @@ func (m *SyncReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byt + return b[:n], nil + } + } ++ + func (m *SyncReplicateResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncReplicateResponse.Merge(m, src) + } ++ + func (m *SyncReplicateResponse) XXX_Size() int { + return m.ProtoSize() + } ++ + func (m *SyncReplicateResponse) XXX_DiscardUnknown() { + xxx_messageInfo_SyncReplicateResponse.DiscardUnknown(m) + } +@@ -738,6 +790,7 @@ func (x SyncState) String() string { + } + return strconv.Itoa(int(x)) + } ++ + func (this *ReplicateRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil +@@ -781,6 +834,7 @@ func (this *ReplicateRequest) Equal(that interface{}) bool { + } + return true + } ++ + func (this *SyncPosition) Equal(that interface{}) bool { + if that == nil { + return this == nil +@@ -810,8 +864,10 @@ func (this *SyncPosition) Equal(that interface{}) bool { + } + + // Reference imports to suppress errors if they are not otherwise used. +-var _ context.Context +-var _ grpc.ClientConn ++var ( ++ _ context.Context ++ _ grpc.ClientConn ++) + + // This is a compile-time assertion to ensure that this generated file + // is compatible with the grpc package it is being compiled against. +@@ -1022,18 +1078,20 @@ type ReplicatorServer interface { + } + + // UnimplementedReplicatorServer can be embedded to have forward compatible implementations. +-type UnimplementedReplicatorServer struct { +-} ++type UnimplementedReplicatorServer struct{} + + func (*UnimplementedReplicatorServer) Replicate(srv Replicator_ReplicateServer) error { + return status.Errorf(codes.Unimplemented, "method Replicate not implemented") + } ++ + func (*UnimplementedReplicatorServer) SyncInit(ctx context.Context, req *SyncInitRequest) (*SyncInitResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method SyncInit not implemented") + } ++ + func (*UnimplementedReplicatorServer) SyncReplicate(ctx context.Context, req *SyncReplicateRequest) (*SyncReplicateResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method SyncReplicate not implemented") + } ++ + func (*UnimplementedReplicatorServer) SyncReplicateStream(srv Replicator_SyncReplicateStreamServer) error { + return status.Errorf(codes.Unimplemented, "method SyncReplicateStream not implemented") + } +@@ -1612,6 +1670,7 @@ func encodeVarintReplicator(dAtA []byte, offset int, v uint64) int { + dAtA[offset] = uint8(v) + return base + } ++ + func NewPopulatedReplicateRequest(r randyReplicator, easy bool) *ReplicateRequest { + this := &ReplicateRequest{} + this.TopicID = github_com_kakao_varlog_pkg_types.TopicID(r.Int31()) +@@ -1659,6 +1718,7 @@ func randUTF8RuneReplicator(r randyReplicator) rune { + } + return rune(ru + 61) + } ++ + func randStringReplicator(r randyReplicator) string { + v4 := r.Intn(100) + tmps := make([]rune, v4) +@@ -1667,6 +1727,7 @@ func randStringReplicator(r randyReplicator) string { + } + return string(tmps) + } ++ + func randUnrecognizedReplicator(r randyReplicator, maxFieldNumber int) (dAtA []byte) { + l := r.Intn(5) + for i := 0; i < l; i++ { +@@ -1679,6 +1740,7 @@ func randUnrecognizedReplicator(r randyReplicator, maxFieldNumber int) (dAtA []b + } + return dAtA + } ++ + func randFieldReplicator(dAtA []byte, r randyReplicator, fieldNumber int, wire int) []byte { + key := uint32(fieldNumber)<<3 | uint32(wire) + switch wire { +@@ -1705,6 +1767,7 @@ func randFieldReplicator(dAtA []byte, r randyReplicator, fieldNumber int, wire i + } + return dAtA + } ++ + func encodeVarintPopulateReplicator(dAtA []byte, v uint64) []byte { + for v >= 1<<7 { + dAtA = append(dAtA, uint8(uint64(v)&0x7f|0x80)) +@@ -1713,6 +1776,7 @@ func encodeVarintPopulateReplicator(dAtA []byte, v uint64) []byte { + dAtA = append(dAtA, uint8(v)) + return dAtA + } ++ + func (m *ReplicateRequest) ProtoSize() (n int) { + if m == nil { + return 0 +@@ -1881,9 +1945,11 @@ func (m *SyncReplicateResponse) ProtoSize() (n int) { + func sovReplicator(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 + } ++ + func sozReplicator(x uint64) (n int) { + return sovReplicator(uint64((x << 1) ^ uint64((int64(x) >> 63)))) + } ++ + func (this *SyncPayload) GetValue() interface{} { + if this.CommitContext != nil { + return this.CommitContext +@@ -1905,6 +1971,7 @@ func (this *SyncPayload) SetValue(value interface{}) bool { + } + return true + } ++ + func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 +@@ -2025,7 +2092,8 @@ func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { + } + elementCount = count + if elementCount != 0 && len(m.LLSN) == 0 { +- m.LLSN = make([]github_com_kakao_varlog_pkg_types.LLSN, 0, elementCount) ++ m.LLSN = slices.Grow(m.LLSN, elementCount) ++ m.LLSN = m.LLSN[:0] + } + for iNdEx < postIndex { + var v github_com_kakao_varlog_pkg_types.LLSN +@@ -2077,8 +2145,15 @@ func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { + if postIndex > l { + return io.ErrUnexpectedEOF + } +- m.Data = append(m.Data, make([]byte, postIndex-iNdEx)) +- copy(m.Data[len(m.Data)-1], dAtA[iNdEx:postIndex]) ++ if len(m.Data) < cap(m.Data) { ++ m.Data = m.Data[:len(m.Data)+1] ++ } else { ++ m.Data = append(m.Data, nil) ++ } ++ lastIndex := len(m.Data) - 1 ++ m.Data[lastIndex] = slices.Grow(m.Data[lastIndex], postIndex-iNdEx) ++ m.Data[lastIndex] = m.Data[lastIndex][:postIndex-iNdEx] ++ copy(m.Data[lastIndex], dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex +@@ -2101,6 +2176,7 @@ func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { + } + return nil + } ++ + func (m *ReplicateResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 +@@ -2151,6 +2227,7 @@ func (m *ReplicateResponse) Unmarshal(dAtA []byte) error { + } + return nil + } ++ + func (m *SyncPosition) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 +@@ -2239,6 +2316,7 @@ func (m *SyncPosition) Unmarshal(dAtA []byte) error { + } + return nil + } ++ + func (m *SyncRange) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 +@@ -2327,6 +2405,7 @@ func (m *SyncRange) Unmarshal(dAtA []byte) error { + } + return nil + } ++ + func (m *SyncInitRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 +@@ -2514,6 +2593,7 @@ func (m *SyncInitRequest) Unmarshal(dAtA []byte) error { + } + return nil + } ++ + func (m *SyncInitResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 +@@ -2597,6 +2677,7 @@ func (m *SyncInitResponse) Unmarshal(dAtA []byte) error { + } + return nil + } ++ + func (m *SyncStatus) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 +@@ -2765,6 +2846,7 @@ func (m *SyncStatus) Unmarshal(dAtA []byte) error { + } + return nil + } ++ + func (m *SyncPayload) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 +@@ -2887,6 +2969,7 @@ func (m *SyncPayload) Unmarshal(dAtA []byte) error { + } + return nil + } ++ + func (m *SyncReplicateRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 +@@ -3055,6 +3138,7 @@ func (m *SyncReplicateRequest) Unmarshal(dAtA []byte) error { + } + return nil + } ++ + func (m *SyncReplicateResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 +@@ -3141,6 +3225,7 @@ func (m *SyncReplicateResponse) Unmarshal(dAtA []byte) error { + } + return nil + } ++ + func skipReplicator(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 diff --git a/proto/snpb/replicator.go b/proto/snpb/replicator.go index 6d9800cf8..2c28ca8f6 100644 --- a/proto/snpb/replicator.go +++ b/proto/snpb/replicator.go @@ -6,6 +6,17 @@ import ( "github.com/kakao/varlog/pkg/types" ) +func (m *ReplicateRequest) ResetReuse() { + llsnSlice := m.LLSN[:0] + for i := range m.Data { + m.Data[i] = m.Data[i][:0] + } + dataSlice := m.Data[:0] + m.Reset() + m.LLSN = llsnSlice + m.Data = dataSlice +} + func InvalidSyncPosition() SyncPosition { return SyncPosition{LLSN: types.InvalidLLSN, GLSN: types.InvalidGLSN} } diff --git a/proto/snpb/replicator.pb.go b/proto/snpb/replicator.pb.go index 139a31eb6..e5105c631 100644 --- a/proto/snpb/replicator.pb.go +++ b/proto/snpb/replicator.pb.go @@ -10,6 +10,7 @@ import ( io "io" math "math" math_bits "math/bits" + "slices" strconv "strconv" _ "github.com/gogo/protobuf/gogoproto" @@ -24,9 +25,11 @@ import ( ) // Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf +var ( + _ = proto.Marshal + _ = fmt.Errorf + _ = math.Inf +) // This is a compile-time assertion to ensure that this generated file // is compatible with the proto package it is being compiled against. @@ -80,9 +83,11 @@ func (*ReplicateRequest) ProtoMessage() {} func (*ReplicateRequest) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{0} } + func (m *ReplicateRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *ReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_ReplicateRequest.Marshal(b, m, deterministic) @@ -95,12 +100,15 @@ func (m *ReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, er return b[:n], nil } } + func (m *ReplicateRequest) XXX_Merge(src proto.Message) { xxx_messageInfo_ReplicateRequest.Merge(m, src) } + func (m *ReplicateRequest) XXX_Size() int { return m.ProtoSize() } + func (m *ReplicateRequest) XXX_DiscardUnknown() { xxx_messageInfo_ReplicateRequest.DiscardUnknown(m) } @@ -135,8 +143,7 @@ func (m *ReplicateRequest) GetData() [][]byte { return nil } -type ReplicateResponse struct { -} +type ReplicateResponse struct{} func (m *ReplicateResponse) Reset() { *m = ReplicateResponse{} } func (m *ReplicateResponse) String() string { return proto.CompactTextString(m) } @@ -144,9 +151,11 @@ func (*ReplicateResponse) ProtoMessage() {} func (*ReplicateResponse) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{1} } + func (m *ReplicateResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *ReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_ReplicateResponse.Marshal(b, m, deterministic) @@ -159,12 +168,15 @@ func (m *ReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, e return b[:n], nil } } + func (m *ReplicateResponse) XXX_Merge(src proto.Message) { xxx_messageInfo_ReplicateResponse.Merge(m, src) } + func (m *ReplicateResponse) XXX_Size() int { return m.ProtoSize() } + func (m *ReplicateResponse) XXX_DiscardUnknown() { xxx_messageInfo_ReplicateResponse.DiscardUnknown(m) } @@ -182,9 +194,11 @@ func (*SyncPosition) ProtoMessage() {} func (*SyncPosition) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{2} } + func (m *SyncPosition) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncPosition) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncPosition.Marshal(b, m, deterministic) @@ -197,12 +211,15 @@ func (m *SyncPosition) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) return b[:n], nil } } + func (m *SyncPosition) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncPosition.Merge(m, src) } + func (m *SyncPosition) XXX_Size() int { return m.ProtoSize() } + func (m *SyncPosition) XXX_DiscardUnknown() { xxx_messageInfo_SyncPosition.DiscardUnknown(m) } @@ -239,9 +256,11 @@ func (*SyncRange) ProtoMessage() {} func (*SyncRange) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{3} } + func (m *SyncRange) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncRange) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncRange.Marshal(b, m, deterministic) @@ -254,12 +273,15 @@ func (m *SyncRange) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return b[:n], nil } } + func (m *SyncRange) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncRange.Merge(m, src) } + func (m *SyncRange) XXX_Size() int { return m.ProtoSize() } + func (m *SyncRange) XXX_DiscardUnknown() { xxx_messageInfo_SyncRange.DiscardUnknown(m) } @@ -305,9 +327,11 @@ func (*SyncInitRequest) ProtoMessage() {} func (*SyncInitRequest) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{4} } + func (m *SyncInitRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncInitRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncInitRequest.Marshal(b, m, deterministic) @@ -320,12 +344,15 @@ func (m *SyncInitRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, err return b[:n], nil } } + func (m *SyncInitRequest) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncInitRequest.Merge(m, src) } + func (m *SyncInitRequest) XXX_Size() int { return m.ProtoSize() } + func (m *SyncInitRequest) XXX_DiscardUnknown() { xxx_messageInfo_SyncInitRequest.DiscardUnknown(m) } @@ -381,9 +408,11 @@ func (*SyncInitResponse) ProtoMessage() {} func (*SyncInitResponse) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{5} } + func (m *SyncInitResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncInitResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncInitResponse.Marshal(b, m, deterministic) @@ -396,12 +425,15 @@ func (m *SyncInitResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, er return b[:n], nil } } + func (m *SyncInitResponse) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncInitResponse.Merge(m, src) } + func (m *SyncInitResponse) XXX_Size() int { return m.ProtoSize() } + func (m *SyncInitResponse) XXX_DiscardUnknown() { xxx_messageInfo_SyncInitResponse.DiscardUnknown(m) } @@ -428,9 +460,11 @@ func (*SyncStatus) ProtoMessage() {} func (*SyncStatus) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{6} } + func (m *SyncStatus) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncStatus) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncStatus.Marshal(b, m, deterministic) @@ -443,12 +477,15 @@ func (m *SyncStatus) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return b[:n], nil } } + func (m *SyncStatus) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncStatus.Merge(m, src) } + func (m *SyncStatus) XXX_Size() int { return m.ProtoSize() } + func (m *SyncStatus) XXX_DiscardUnknown() { xxx_messageInfo_SyncStatus.DiscardUnknown(m) } @@ -494,9 +531,11 @@ func (*SyncPayload) ProtoMessage() {} func (*SyncPayload) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{7} } + func (m *SyncPayload) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncPayload) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncPayload.Marshal(b, m, deterministic) @@ -509,12 +548,15 @@ func (m *SyncPayload) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) return b[:n], nil } } + func (m *SyncPayload) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncPayload.Merge(m, src) } + func (m *SyncPayload) XXX_Size() int { return m.ProtoSize() } + func (m *SyncPayload) XXX_DiscardUnknown() { xxx_messageInfo_SyncPayload.DiscardUnknown(m) } @@ -548,9 +590,11 @@ func (*SyncReplicateRequest) ProtoMessage() {} func (*SyncReplicateRequest) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{8} } + func (m *SyncReplicateRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncReplicateRequest.Marshal(b, m, deterministic) @@ -563,12 +607,15 @@ func (m *SyncReplicateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte return b[:n], nil } } + func (m *SyncReplicateRequest) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncReplicateRequest.Merge(m, src) } + func (m *SyncReplicateRequest) XXX_Size() int { return m.ProtoSize() } + func (m *SyncReplicateRequest) XXX_DiscardUnknown() { xxx_messageInfo_SyncReplicateRequest.DiscardUnknown(m) } @@ -613,9 +660,11 @@ func (*SyncReplicateResponse) ProtoMessage() {} func (*SyncReplicateResponse) Descriptor() ([]byte, []int) { return fileDescriptor_85705cb817486b63, []int{9} } + func (m *SyncReplicateResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } + func (m *SyncReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { return xxx_messageInfo_SyncReplicateResponse.Marshal(b, m, deterministic) @@ -628,12 +677,15 @@ func (m *SyncReplicateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byt return b[:n], nil } } + func (m *SyncReplicateResponse) XXX_Merge(src proto.Message) { xxx_messageInfo_SyncReplicateResponse.Merge(m, src) } + func (m *SyncReplicateResponse) XXX_Size() int { return m.ProtoSize() } + func (m *SyncReplicateResponse) XXX_DiscardUnknown() { xxx_messageInfo_SyncReplicateResponse.DiscardUnknown(m) } @@ -738,6 +790,7 @@ func (x SyncState) String() string { } return strconv.Itoa(int(x)) } + func (this *ReplicateRequest) Equal(that interface{}) bool { if that == nil { return this == nil @@ -781,6 +834,7 @@ func (this *ReplicateRequest) Equal(that interface{}) bool { } return true } + func (this *SyncPosition) Equal(that interface{}) bool { if that == nil { return this == nil @@ -810,8 +864,10 @@ func (this *SyncPosition) Equal(that interface{}) bool { } // Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn +var ( + _ context.Context + _ grpc.ClientConn +) // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. @@ -1022,18 +1078,20 @@ type ReplicatorServer interface { } // UnimplementedReplicatorServer can be embedded to have forward compatible implementations. -type UnimplementedReplicatorServer struct { -} +type UnimplementedReplicatorServer struct{} func (*UnimplementedReplicatorServer) Replicate(srv Replicator_ReplicateServer) error { return status.Errorf(codes.Unimplemented, "method Replicate not implemented") } + func (*UnimplementedReplicatorServer) SyncInit(ctx context.Context, req *SyncInitRequest) (*SyncInitResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method SyncInit not implemented") } + func (*UnimplementedReplicatorServer) SyncReplicate(ctx context.Context, req *SyncReplicateRequest) (*SyncReplicateResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method SyncReplicate not implemented") } + func (*UnimplementedReplicatorServer) SyncReplicateStream(srv Replicator_SyncReplicateStreamServer) error { return status.Errorf(codes.Unimplemented, "method SyncReplicateStream not implemented") } @@ -1612,6 +1670,7 @@ func encodeVarintReplicator(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return base } + func NewPopulatedReplicateRequest(r randyReplicator, easy bool) *ReplicateRequest { this := &ReplicateRequest{} this.TopicID = github_com_kakao_varlog_pkg_types.TopicID(r.Int31()) @@ -1659,6 +1718,7 @@ func randUTF8RuneReplicator(r randyReplicator) rune { } return rune(ru + 61) } + func randStringReplicator(r randyReplicator) string { v4 := r.Intn(100) tmps := make([]rune, v4) @@ -1667,6 +1727,7 @@ func randStringReplicator(r randyReplicator) string { } return string(tmps) } + func randUnrecognizedReplicator(r randyReplicator, maxFieldNumber int) (dAtA []byte) { l := r.Intn(5) for i := 0; i < l; i++ { @@ -1679,6 +1740,7 @@ func randUnrecognizedReplicator(r randyReplicator, maxFieldNumber int) (dAtA []b } return dAtA } + func randFieldReplicator(dAtA []byte, r randyReplicator, fieldNumber int, wire int) []byte { key := uint32(fieldNumber)<<3 | uint32(wire) switch wire { @@ -1705,6 +1767,7 @@ func randFieldReplicator(dAtA []byte, r randyReplicator, fieldNumber int, wire i } return dAtA } + func encodeVarintPopulateReplicator(dAtA []byte, v uint64) []byte { for v >= 1<<7 { dAtA = append(dAtA, uint8(uint64(v)&0x7f|0x80)) @@ -1713,6 +1776,7 @@ func encodeVarintPopulateReplicator(dAtA []byte, v uint64) []byte { dAtA = append(dAtA, uint8(v)) return dAtA } + func (m *ReplicateRequest) ProtoSize() (n int) { if m == nil { return 0 @@ -1881,9 +1945,11 @@ func (m *SyncReplicateResponse) ProtoSize() (n int) { func sovReplicator(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } + func sozReplicator(x uint64) (n int) { return sovReplicator(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } + func (this *SyncPayload) GetValue() interface{} { if this.CommitContext != nil { return this.CommitContext @@ -1905,6 +1971,7 @@ func (this *SyncPayload) SetValue(value interface{}) bool { } return true } + func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2025,7 +2092,8 @@ func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { } elementCount = count if elementCount != 0 && len(m.LLSN) == 0 { - m.LLSN = make([]github_com_kakao_varlog_pkg_types.LLSN, 0, elementCount) + m.LLSN = slices.Grow(m.LLSN, elementCount) + m.LLSN = m.LLSN[:0] } for iNdEx < postIndex { var v github_com_kakao_varlog_pkg_types.LLSN @@ -2077,8 +2145,15 @@ func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Data = append(m.Data, make([]byte, postIndex-iNdEx)) - copy(m.Data[len(m.Data)-1], dAtA[iNdEx:postIndex]) + if len(m.Data) < cap(m.Data) { + m.Data = m.Data[:len(m.Data)+1] + } else { + m.Data = append(m.Data, nil) + } + lastIndex := len(m.Data) - 1 + m.Data[lastIndex] = slices.Grow(m.Data[lastIndex], postIndex-iNdEx) + m.Data[lastIndex] = m.Data[lastIndex][:postIndex-iNdEx] + copy(m.Data[lastIndex], dAtA[iNdEx:postIndex]) iNdEx = postIndex default: iNdEx = preIndex @@ -2101,6 +2176,7 @@ func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { } return nil } + func (m *ReplicateResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2151,6 +2227,7 @@ func (m *ReplicateResponse) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncPosition) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2239,6 +2316,7 @@ func (m *SyncPosition) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncRange) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2327,6 +2405,7 @@ func (m *SyncRange) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncInitRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2514,6 +2593,7 @@ func (m *SyncInitRequest) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncInitResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2597,6 +2677,7 @@ func (m *SyncInitResponse) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncStatus) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2765,6 +2846,7 @@ func (m *SyncStatus) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncPayload) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2887,6 +2969,7 @@ func (m *SyncPayload) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncReplicateRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -3055,6 +3138,7 @@ func (m *SyncReplicateRequest) Unmarshal(dAtA []byte) error { } return nil } + func (m *SyncReplicateResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -3141,6 +3225,7 @@ func (m *SyncReplicateResponse) Unmarshal(dAtA []byte) error { } return nil } + func skipReplicator(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/proto/snpb/replicator_test.go b/proto/snpb/replicator_test.go index 437c06d5d..58766e4df 100644 --- a/proto/snpb/replicator_test.go +++ b/proto/snpb/replicator_test.go @@ -2,10 +2,171 @@ package snpb import ( "testing" + "unsafe" "github.com/stretchr/testify/require" + + "github.com/kakao/varlog/pkg/types" ) +func getPointers[S ~[]E, E any](elems S) []uintptr { + ptrs := make([]uintptr, len(elems)) + for i := range elems { + p := uintptr(unsafe.Pointer(&elems[i])) + ptrs[i] = p + } + return ptrs +} + +func TestReplicateRequest(t *testing.T) { + smallRequest := ReplicateRequest{ + LLSN: []types.LLSN{1}, + Data: [][]byte{[]byte("I")}, + } + smallPayload, err := smallRequest.Marshal() + require.NoError(t, err) + + mediumRequest := ReplicateRequest{ + LLSN: []types.LLSN{11, 12, 13, 14, 15}, + Data: [][]byte{[]byte("XI"), []byte("XII"), []byte("XIII"), []byte("XIV"), []byte("XV")}, + } + mediumPayload, err := mediumRequest.Marshal() + require.NoError(t, err) + + largeRequest := ReplicateRequest{ + LLSN: []types.LLSN{21, 22, 23, 24, 25, 26, 27, 28, 29, 30}, + Data: [][]byte{[]byte("XXI"), []byte("XXII"), []byte("XXIII"), []byte("XXIV"), []byte("XXV"), []byte("XXVI"), []byte("XXVII"), []byte("XXVIII"), []byte("XXIX"), []byte("XXX")}, + } + laregePayload, err := largeRequest.Marshal() + require.NoError(t, err) + + t.Run("EqualLength", func(t *testing.T) { + tcs := []struct { + name string + req ReplicateRequest + payload []byte + }{ + { + name: "Small", + req: smallRequest, + payload: smallPayload, + }, + { + name: "Medium", + req: mediumRequest, + payload: mediumPayload, + }, + { + name: "Large", + req: largeRequest, + payload: laregePayload, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + var req ReplicateRequest + + err := req.Unmarshal(tc.payload) + require.NoError(t, err) + require.Equal(t, tc.req, req) + wantLLSNPtrs := getPointers(req.LLSN) + wantDataPtrs := getPointers(req.Data) + + req.ResetReuse() + err = req.Unmarshal(tc.payload) + require.NoError(t, err) + require.Equal(t, tc.req, req) + gotLLSNPtrs := getPointers(req.LLSN) + gotDataPtrs := getPointers(req.Data) + + require.Equal(t, wantLLSNPtrs, gotLLSNPtrs) + require.Equal(t, wantDataPtrs, gotDataPtrs) + }) + } + }) + + t.Run("GrowingLengthGrowingCapacity", func(t *testing.T) { + var req ReplicateRequest + + err := req.Unmarshal(smallPayload) + require.NoError(t, err) + require.Equal(t, smallRequest, req) + smallLLSNPtrs := getPointers(req.LLSN) + smallDataPtrs := getPointers(req.Data) + smallLLSNCap := cap(req.LLSN) + smallDataCap := cap(req.Data) + + req.ResetReuse() + err = req.Unmarshal(mediumPayload) + require.NoError(t, err) + require.Equal(t, mediumRequest, req) + mediumLLSNPtrs := getPointers(req.LLSN) + mediumDataPtrs := getPointers(req.Data) + mediumLLSNCap := cap(req.LLSN) + mediumDataCap := cap(req.Data) + + require.NotEqual(t, smallLLSNPtrs, mediumLLSNPtrs) + require.NotEqual(t, smallDataPtrs, mediumDataPtrs) + require.Less(t, smallLLSNCap, mediumLLSNCap) + require.Less(t, smallDataCap, mediumDataCap) + + req.ResetReuse() + err = req.Unmarshal(laregePayload) + require.NoError(t, err) + require.Equal(t, largeRequest, req) + largeLLSNPtrs := getPointers(req.LLSN) + largeDataPtrs := getPointers(req.Data) + largeLLSNCap := cap(req.LLSN) + largeDataCap := cap(req.Data) + + require.NotEqual(t, mediumLLSNPtrs, largeLLSNPtrs) + require.NotEqual(t, mediumDataPtrs, largeDataPtrs) + require.Less(t, mediumLLSNCap, largeLLSNCap) + require.Less(t, mediumDataCap, largeDataCap) + }) + + t.Run("ShrinkingLengthEqualCapacity", func(t *testing.T) { + var req ReplicateRequest + + err := req.Unmarshal(laregePayload) + require.NoError(t, err) + require.Equal(t, largeRequest, req) + largeLLSNPtrs := getPointers(req.LLSN) + largeDataPtrs := getPointers(req.Data) + largeLLSNCap := cap(req.LLSN) + largeDataCap := cap(req.Data) + + req.ResetReuse() + err = req.Unmarshal(mediumPayload) + require.NoError(t, err) + require.Equal(t, mediumRequest, req) + mediumLLSNPtrs := getPointers(req.LLSN) + mediumDataPtrs := getPointers(req.Data) + mediumLLSNCap := cap(req.LLSN) + mediumDataCap := cap(req.Data) + + require.NotEqual(t, largeLLSNPtrs, mediumLLSNPtrs) + require.NotEqual(t, largeDataPtrs, mediumDataPtrs) + require.Equal(t, largeLLSNCap, mediumLLSNCap) + require.Equal(t, largeDataCap, mediumDataCap) + + req.ResetReuse() + err = req.Unmarshal(smallPayload) + require.NoError(t, err) + require.Equal(t, smallRequest, req) + smallLLSNPtrs := getPointers(req.LLSN) + smallDataPtrs := getPointers(req.Data) + smallLLSNCap := cap(req.LLSN) + smallDataCap := cap(req.Data) + + require.NotEqual(t, mediumLLSNPtrs, smallLLSNPtrs) + require.NotEqual(t, mediumDataPtrs, smallDataPtrs) + require.Equal(t, mediumLLSNCap, smallLLSNCap) + require.Equal(t, mediumDataCap, smallDataCap) + }) +} + func TestSyncPositionInvalid(t *testing.T) { tcs := []struct { input SyncPosition