From b501e9c48f33e1f78f19fc9fb8998110d50063ad Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Tue, 2 Nov 2021 11:34:52 +0800 Subject: [PATCH] p2p: implement pkg/p2p (Part I) (#3156) --- errors.toml | 5 + go.mod | 2 + go.sum | 5 + pkg/errors/errors.go | 3 + pkg/p2p/main_test.go | 24 + pkg/p2p/model.go | 28 + pkg/p2p/serializer.go | 37 + pkg/p2p/serializer_test.go | 80 ++ pkg/p2p/server_wrapper.go | 120 +++ pkg/p2p/server_wrapper_test.go | 186 ++++ proto/CDCPeerToPeer.proto | 88 ++ proto/generate-proto.sh | 1 + proto/p2p/CDCPeerToPeer.pb.go | 1734 ++++++++++++++++++++++++++++++++ 13 files changed, 2313 insertions(+) create mode 100644 pkg/p2p/main_test.go create mode 100644 pkg/p2p/model.go create mode 100644 pkg/p2p/serializer.go create mode 100644 pkg/p2p/serializer_test.go create mode 100644 pkg/p2p/server_wrapper.go create mode 100644 pkg/p2p/server_wrapper_test.go create mode 100644 proto/CDCPeerToPeer.proto create mode 100644 proto/p2p/CDCPeerToPeer.pb.go diff --git a/errors.toml b/errors.toml index d35160a82b3..57860d8a95a 100755 --- a/errors.toml +++ b/errors.toml @@ -626,6 +626,11 @@ error = ''' etcd api call error ''' +["CDC:ErrPeerMessageIllegalMeta"] +error = ''' +peer-to-peer message server received an RPC call with illegal metadata +''' + ["CDC:ErrPendingRegionCancel"] error = ''' pending region cancelled due to stream disconnecting diff --git a/go.mod b/go.mod index 11577d097d0..eeacc6091c8 100644 --- a/go.mod +++ b/go.mod @@ -46,6 +46,7 @@ require ( github.com/mattn/go-colorable v0.1.11 // indirect github.com/mattn/go-shellwords v1.0.3 github.com/mattn/go-sqlite3 v2.0.2+incompatible // indirect + github.com/modern-go/reflect2 v1.0.1 github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 github.com/philhofer/fwd v1.0.0 // indirect github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 @@ -77,6 +78,7 @@ require ( github.com/tinylib/msgp v1.1.0 github.com/uber-go/atomic v1.4.0 github.com/unrolled/render v1.0.1 + github.com/vmihailenco/msgpack/v5 v5.3.5 github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c go.etcd.io/etcd v0.5.0-alpha.5.0.20210512015243-d19fbe541bf9 go.uber.org/atomic v1.9.0 diff --git a/go.sum b/go.sum index 9b3d0f125d8..b6b572356d4 100644 --- a/go.sum +++ b/go.sum @@ -985,7 +985,12 @@ github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+ github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/vmihailenco/msgpack/v4 v4.3.11/go.mod h1:gborTTJjAo/GWTqqRjrLCn9pgNN+NXzzngzBKDPIqw4= github.com/vmihailenco/msgpack/v5 v5.0.0-beta.1/go.mod h1:xlngVLeyQ/Qi05oQxhQ+oTuqa03RjMwMfk/7/TCs+QI= +github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU= +github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= +github.com/vmihailenco/tagparser v0.1.1 h1:quXMXlA39OCbd2wAdTsGDlK9RkOk6Wuw+x37wVyIuWY= github.com/vmihailenco/tagparser v0.1.1/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f h1:9DDCDwOyEy/gId+IEMrFHLuQ5R/WV0KNxWLler8X2OY= github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f/go.mod h1:8sdOQnirw1PrcnTJYkmW1iOHtUmblMmGdUOHyWYycLI= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index ef42086d356..1d6abe860a6 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -261,4 +261,7 @@ var ( // tcp server error ErrTCPServerClosed = errors.Normalize("The TCP server has been closed", errors.RFCCodeText("CDC:ErrTCPServerClosed")) + + // p2p error + ErrPeerMessageIllegalMeta = errors.Normalize("peer-to-peer message server received an RPC call with illegal metadata", errors.RFCCodeText("CDC:ErrPeerMessageIllegalMeta")) ) diff --git a/pkg/p2p/main_test.go b/pkg/p2p/main_test.go new file mode 100644 index 00000000000..85660dde59c --- /dev/null +++ b/pkg/p2p/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package p2p + +import ( + "testing" + + "github.com/pingcap/ticdc/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/pkg/p2p/model.go b/pkg/p2p/model.go new file mode 100644 index 00000000000..f701e27964f --- /dev/null +++ b/pkg/p2p/model.go @@ -0,0 +1,28 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package p2p + +import "github.com/pingcap/ticdc/proto/p2p" + +type ( + // NodeID represents the identifier of a sender node. + // Using IP address is not enough because of possible restarts. + NodeID = string + // Topic represents the topic for a peer-to-peer message + Topic = string + // Seq represents the serial number of a message for a given topic. + Seq = int64 + // MessageServerStream is an alias for the protobuf-generated interface for the message service. + MessageServerStream = p2p.CDCPeerToPeer_SendMessageServer +) diff --git a/pkg/p2p/serializer.go b/pkg/p2p/serializer.go new file mode 100644 index 00000000000..bf787c6fad2 --- /dev/null +++ b/pkg/p2p/serializer.go @@ -0,0 +1,37 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package p2p + +import "encoding/json" + +// Serializable is an interface for defining custom serialization methods +// for peer messages. +type Serializable interface { + Marshal() ([]byte, error) + Unmarshal(data []byte) error +} + +func marshalMessage(value interface{}) ([]byte, error) { + if value, ok := value.(Serializable); ok { + return value.Marshal() + } + return json.Marshal(value) +} + +func unmarshalMessage(data []byte, value interface{}) error { + if value, ok := value.(Serializable); ok { + return value.Unmarshal(data) + } + return json.Unmarshal(data, value) +} diff --git a/pkg/p2p/serializer_test.go b/pkg/p2p/serializer_test.go new file mode 100644 index 00000000000..a16543f33ac --- /dev/null +++ b/pkg/p2p/serializer_test.go @@ -0,0 +1,80 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package p2p + +import ( + "reflect" + "testing" + + "github.com/stretchr/testify/require" + "github.com/vmihailenco/msgpack/v5" +) + +type jsonSerializableMessage struct { + A int + B float64 + C string +} + +type msgpackSerializableMessage struct { + A int + B float64 + C string + D []int +} + +func (m *msgpackSerializableMessage) Marshal() ([]byte, error) { + return msgpack.Marshal(m) +} + +func (m *msgpackSerializableMessage) Unmarshal(data []byte) error { + return msgpack.Unmarshal(data, m) +} + +func TestJsonSerializable(t *testing.T) { + msg := &jsonSerializableMessage{ + A: 1, + B: 2, + C: "test", + } + + data, err := marshalMessage(msg) + require.NoError(t, err) + + msg1 := &jsonSerializableMessage{} + err = unmarshalMessage(data, msg1) + require.NoError(t, err) + + require.True(t, reflect.DeepEqual(msg, msg1)) +} + +func TestMsgpackSerializable(t *testing.T) { + msg := &msgpackSerializableMessage{ + A: 1, + B: 2, + C: "test", + D: []int{1, 2, 3, 4, 5, 6}, + } + data, err := marshalMessage(msg) + require.NoError(t, err) + + data1, err := msgpack.Marshal(msg) + require.NoError(t, err) + require.Equal(t, data1, data) + + msg1 := &msgpackSerializableMessage{} + err = unmarshalMessage(data, msg1) + require.NoError(t, err) + require.True(t, reflect.DeepEqual(msg, msg1)) +} diff --git a/pkg/p2p/server_wrapper.go b/pkg/p2p/server_wrapper.go new file mode 100644 index 00000000000..6f0f21ac6b6 --- /dev/null +++ b/pkg/p2p/server_wrapper.go @@ -0,0 +1,120 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package p2p + +import ( + "context" + "sync" + + "github.com/modern-go/reflect2" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/proto/p2p" + "go.uber.org/zap" + "google.golang.org/grpc/codes" + gRPCPeer "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" +) + +type streamWrapper struct { + MessageServerStream + ctx context.Context + cancel context.CancelFunc +} + +func wrapStream(stream MessageServerStream) *streamWrapper { + ctx, cancel := context.WithCancel(stream.Context()) + return &streamWrapper{ + MessageServerStream: stream, + ctx: ctx, + cancel: cancel, + } +} + +func (w *streamWrapper) Context() context.Context { + return w.ctx +} + +// ServerWrapper implements a CDCPeerToPeerServer, and it +// maintains an inner CDCPeerToPeerServer instance that can +// be replaced as needed. +type ServerWrapper struct { + rwMu sync.RWMutex + innerServer p2p.CDCPeerToPeerServer + + wrappedStreamsMu sync.Mutex + wrappedStreams map[*streamWrapper]struct{} +} + +// NewServerWrapper creates a new ServerWrapper +func NewServerWrapper() *ServerWrapper { + return &ServerWrapper{ + wrappedStreams: map[*streamWrapper]struct{}{}, + } +} + +// SendMessage implements p2p.CDCPeerToPeerServer +func (s *ServerWrapper) SendMessage(stream p2p.CDCPeerToPeer_SendMessageServer) error { + s.rwMu.RLock() + innerServer := s.innerServer + s.rwMu.RUnlock() + + if innerServer == nil { + var addr string + peer, ok := gRPCPeer.FromContext(stream.Context()) + if ok { + addr = peer.Addr.String() + } + log.Debug("gRPC server received request while CDC capture is not running.", zap.String("addr", addr)) + return status.New(codes.Unavailable, "CDC capture is not running").Err() + } + + wrappedStream := wrapStream(stream) + s.wrappedStreamsMu.Lock() + s.wrappedStreams[wrappedStream] = struct{}{} + s.wrappedStreamsMu.Unlock() + defer func() { + s.wrappedStreamsMu.Lock() + delete(s.wrappedStreams, wrappedStream) + s.wrappedStreamsMu.Unlock() + wrappedStream.cancel() + }() + + // Used in unit tests to simulate a race situation between `SendMessage` and `Reset`. + // TODO think of another way to make tests parallelizable. + failpoint.Inject("ServerWrapperSendMessageDelay", func() {}) + return innerServer.SendMessage(wrappedStream) +} + +// Reset resets the inner server object in the ServerWrapper +func (s *ServerWrapper) Reset(inner p2p.CDCPeerToPeerServer) { + s.rwMu.Lock() + defer s.rwMu.Unlock() + + s.wrappedStreamsMu.Lock() + defer s.wrappedStreamsMu.Unlock() + + for wrappedStream := range s.wrappedStreams { + wrappedStream.cancel() + } + + // reflect2.IsNil handles two cases for us: + // 1) null value + // 2) an interface with a null value but a not-null type info. + if reflect2.IsNil(inner) { + s.innerServer = nil + return + } + s.innerServer = inner +} diff --git a/pkg/p2p/server_wrapper_test.go b/pkg/p2p/server_wrapper_test.go new file mode 100644 index 00000000000..ddb55686118 --- /dev/null +++ b/pkg/p2p/server_wrapper_test.go @@ -0,0 +1,186 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package p2p + +import ( + "context" + "net" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/proto/p2p" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type mockGrpcService struct { + mock.Mock + t *testing.T + streamCount int64 +} + +func (s *mockGrpcService) SendMessage(stream MessageServerStream) error { + atomic.AddInt64(&s.streamCount, 1) + defer atomic.AddInt64(&s.streamCount, -1) + + go func() { + for { + msg, err := stream.Recv() + if err != nil { + log.Info("error received", zap.Error(err)) + return + } + s.Mock.MethodCalled("OnNewMessage", msg) + } + }() + + <-stream.Context().Done() + return status.Error(codes.Canceled, stream.Context().Err().Error()) +} + +func newServerWrapperForTesting(t *testing.T) (server *ServerWrapper, newClient func() (p2p.CDCPeerToPeerClient, func()), cancel func()) { + addr := t.TempDir() + "/p2p-testing.sock" + lis, err := net.Listen("unix", addr) + require.NoError(t, err) + + var opts []grpc.ServerOption + grpcServer := grpc.NewServer(opts...) + + server = NewServerWrapper() + p2p.RegisterCDCPeerToPeerServer(grpcServer, server) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _ = grpcServer.Serve(lis) + }() + + cancel = func() { + grpcServer.Stop() + wg.Wait() + } + + newClient = func() (p2p.CDCPeerToPeerClient, func()) { + conn, err := grpc.Dial( + addr, + grpc.WithInsecure(), + grpc.WithContextDialer(func(_ context.Context, s string) (net.Conn, error) { + return net.Dial("unix", addr) + })) + require.NoError(t, err) + + cancel2 := func() { + _ = conn.Close() + } + return p2p.NewCDCPeerToPeerClient(conn), cancel2 + } + return +} + +func TestServerWrapperBasics(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + serverWrapper, newClient, cancelServer := newServerWrapperForTesting(t) + defer cancelServer() + + client, closeClient := newClient() + defer closeClient() + + // initiates a stream to an empty server + clientStream, err := client.SendMessage(ctx) + require.NoError(t, err) + + _, err = clientStream.Recv() + require.Error(t, err) + + st, ok := status.FromError(err) + require.True(t, ok) + require.Equal(t, codes.Unavailable, st.Code()) + + innerServer := &mockGrpcService{t: t} + + serverWrapper.Reset(innerServer) + + clientStream, err = client.SendMessage(ctx) + require.NoError(t, err) + + innerServer.On("OnNewMessage", &p2p.MessagePacket{}) + err = clientStream.Send(&p2p.MessagePacket{}) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + innerServer.AssertExpectations(t) + + require.Equal(t, int64(1), atomic.LoadInt64(&innerServer.streamCount)) + + serverWrapper.Reset(nil) + _, err = clientStream.Recv() + require.Error(t, err) + + st, ok = status.FromError(err) + require.True(t, ok) + require.Equal(t, codes.Canceled, st.Code()) +} + +func TestServerWrapperDelayed(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + serverWrapper, newClient, cancelServer := newServerWrapperForTesting(t) + defer cancelServer() + + client, closeClient := newClient() + defer closeClient() + + err := failpoint.Enable("github.com/pingcap/ticdc/pkg/p2p/ServerWrapperSendMessageDelay", "pause") + require.NoError(t, err) + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/pkg/p2p/ServerWrapperSendMessageDelay") + }() + + innerServer := &mockGrpcService{t: t} + + serverWrapper.Reset(innerServer) + innerServer.On("OnNewMessage", &p2p.MessagePacket{}) + + // initiates a stream to an empty server + clientStream, err := client.SendMessage(ctx) + require.NoError(t, err) + + err = clientStream.Send(&p2p.MessagePacket{}) + require.NoError(t, err) + + require.Eventually(t, func() bool { + serverWrapper.wrappedStreamsMu.Lock() + defer serverWrapper.wrappedStreamsMu.Unlock() + return len(serverWrapper.wrappedStreams) > 0 + }, time.Second*1, time.Millisecond*20) + + serverWrapper.Reset(nil) + + _ = failpoint.Disable("github.com/pingcap/ticdc/pkg/p2p/ServerWrapperSendMessageDelay") + + time.Sleep(100 * time.Millisecond) + innerServer.AssertExpectations(t) +} diff --git a/proto/CDCPeerToPeer.proto b/proto/CDCPeerToPeer.proto new file mode 100644 index 00000000000..8e68bf187d7 --- /dev/null +++ b/proto/CDCPeerToPeer.proto @@ -0,0 +1,88 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package p2p; + +import "gogoproto/gogo.proto"; + +option(gogoproto.sizer_all) = true; +// Use generated code to lower performance overhead. +option(gogoproto.marshaler_all) = true; +option(gogoproto.unmarshaler_all) = true; + +service CDCPeerToPeer { + // A bidirectional stream from the sender (client) to the receiver (server) + // The send direction is used to carry the serialized payload, and the + // reply direction is used to receive ACKs (progress information) from the server. + rpc SendMessage(stream MessagePacket) returns (stream SendMessageResponse); +} + +// MessageEntry represents a single message. +message MessageEntry { + // topic is used to separate messages into order-guaranteed logical streams. + string topic = 1; + + // serialized payload. The format and schema is defined by the business logic + // using the peer-to-peer mechanism. + bytes content = 2; + + // monotonically increase. + int64 sequence = 3; +} + +// Metadata associated with one client-server bidirectional stream. +message StreamMeta { + // fields required for correctness + string sender_id = 1; + string receiver_id = 2; + int64 epoch = 3; // monotonically increasing between two given node processes. + + // fields required for compatibility check + string client_version = 50; + + // fields for metrics, logging, debugging, etc. + string sender_advertised_addr = 100; +} + +message MessagePacket { + StreamMeta meta = 1; + + // multiple messages can be batched. + repeated MessageEntry entries = 2; +} + +message Ack { + string topic = 1; + + // the sequence of an already processed message. + // Must be monotonically increasing for a given topic and two given node processes. + int64 last_seq = 2; +} + +enum ExitReason { + UNKNOWN = 0; + OK = 1; + CONGESTED = 2; + CAPTURE_SUICIDE = 3; + STALE_CONNECTION = 4; + DUPLICATE_CONNECTION = 5; + CAPTURE_ID_MISMATCH = 6; +} + +message SendMessageResponse { + repeated Ack ack = 1; + ExitReason exit_reason = 2; + string error_message = 3; +} diff --git a/proto/generate-proto.sh b/proto/generate-proto.sh index ad2ebc21f09..80a438b083f 100755 --- a/proto/generate-proto.sh +++ b/proto/generate-proto.sh @@ -9,3 +9,4 @@ echo "generate canal & craft benchmark protocol code..." protoc --gofast_out=./canal EntryProtocol.proto protoc --gofast_out=./canal CanalProtocol.proto protoc --gofast_out=./benchmark CraftBenchmark.proto +protoc --gofast_out=plugins=grpc:./p2p CDCPeerToPeer.proto diff --git a/proto/p2p/CDCPeerToPeer.pb.go b/proto/p2p/CDCPeerToPeer.pb.go new file mode 100644 index 00000000000..f33e0245405 --- /dev/null +++ b/proto/p2p/CDCPeerToPeer.pb.go @@ -0,0 +1,1734 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: CDCPeerToPeer.proto + +package p2p + +import ( + context "context" + fmt "fmt" + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + io "io" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type ExitReason int32 + +const ( + ExitReason_UNKNOWN ExitReason = 0 + ExitReason_OK ExitReason = 1 + ExitReason_CONGESTED ExitReason = 2 + ExitReason_CAPTURE_SUICIDE ExitReason = 3 + ExitReason_STALE_CONNECTION ExitReason = 4 + ExitReason_DUPLICATE_CONNECTION ExitReason = 5 + ExitReason_CAPTURE_ID_MISMATCH ExitReason = 6 +) + +var ExitReason_name = map[int32]string{ + 0: "UNKNOWN", + 1: "OK", + 2: "CONGESTED", + 3: "CAPTURE_SUICIDE", + 4: "STALE_CONNECTION", + 5: "DUPLICATE_CONNECTION", + 6: "CAPTURE_ID_MISMATCH", +} + +var ExitReason_value = map[string]int32{ + "UNKNOWN": 0, + "OK": 1, + "CONGESTED": 2, + "CAPTURE_SUICIDE": 3, + "STALE_CONNECTION": 4, + "DUPLICATE_CONNECTION": 5, + "CAPTURE_ID_MISMATCH": 6, +} + +func (x ExitReason) String() string { + return proto.EnumName(ExitReason_name, int32(x)) +} + +func (ExitReason) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_6560df28dddfd2cc, []int{0} +} + +// MessageEntry represents a single message. +type MessageEntry struct { + // topic is used to separate messages into order-guaranteed logical streams. + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + // serialized payload. The format and schema is defined by the business logic + // using the peer-to-peer mechanism. + Content []byte `protobuf:"bytes,2,opt,name=content,proto3" json:"content,omitempty"` + // monotonically increase. + Sequence int64 `protobuf:"varint,3,opt,name=sequence,proto3" json:"sequence,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *MessageEntry) Reset() { *m = MessageEntry{} } +func (m *MessageEntry) String() string { return proto.CompactTextString(m) } +func (*MessageEntry) ProtoMessage() {} +func (*MessageEntry) Descriptor() ([]byte, []int) { + return fileDescriptor_6560df28dddfd2cc, []int{0} +} +func (m *MessageEntry) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *MessageEntry) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_MessageEntry.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *MessageEntry) XXX_Merge(src proto.Message) { + xxx_messageInfo_MessageEntry.Merge(m, src) +} +func (m *MessageEntry) XXX_Size() int { + return m.Size() +} +func (m *MessageEntry) XXX_DiscardUnknown() { + xxx_messageInfo_MessageEntry.DiscardUnknown(m) +} + +var xxx_messageInfo_MessageEntry proto.InternalMessageInfo + +func (m *MessageEntry) GetTopic() string { + if m != nil { + return m.Topic + } + return "" +} + +func (m *MessageEntry) GetContent() []byte { + if m != nil { + return m.Content + } + return nil +} + +func (m *MessageEntry) GetSequence() int64 { + if m != nil { + return m.Sequence + } + return 0 +} + +// Metadata associated with one client-server bidirectional stream. +type StreamMeta struct { + // fields required for correctness + SenderId string `protobuf:"bytes,1,opt,name=sender_id,json=senderId,proto3" json:"sender_id,omitempty"` + ReceiverId string `protobuf:"bytes,2,opt,name=receiver_id,json=receiverId,proto3" json:"receiver_id,omitempty"` + Epoch int64 `protobuf:"varint,3,opt,name=epoch,proto3" json:"epoch,omitempty"` + // fields required for compatibility check + ClientVersion string `protobuf:"bytes,50,opt,name=client_version,json=clientVersion,proto3" json:"client_version,omitempty"` + // fields for metrics, logging, debugging, etc. + SenderAdvertisedAddr string `protobuf:"bytes,100,opt,name=sender_advertised_addr,json=senderAdvertisedAddr,proto3" json:"sender_advertised_addr,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StreamMeta) Reset() { *m = StreamMeta{} } +func (m *StreamMeta) String() string { return proto.CompactTextString(m) } +func (*StreamMeta) ProtoMessage() {} +func (*StreamMeta) Descriptor() ([]byte, []int) { + return fileDescriptor_6560df28dddfd2cc, []int{1} +} +func (m *StreamMeta) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *StreamMeta) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_StreamMeta.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *StreamMeta) XXX_Merge(src proto.Message) { + xxx_messageInfo_StreamMeta.Merge(m, src) +} +func (m *StreamMeta) XXX_Size() int { + return m.Size() +} +func (m *StreamMeta) XXX_DiscardUnknown() { + xxx_messageInfo_StreamMeta.DiscardUnknown(m) +} + +var xxx_messageInfo_StreamMeta proto.InternalMessageInfo + +func (m *StreamMeta) GetSenderId() string { + if m != nil { + return m.SenderId + } + return "" +} + +func (m *StreamMeta) GetReceiverId() string { + if m != nil { + return m.ReceiverId + } + return "" +} + +func (m *StreamMeta) GetEpoch() int64 { + if m != nil { + return m.Epoch + } + return 0 +} + +func (m *StreamMeta) GetClientVersion() string { + if m != nil { + return m.ClientVersion + } + return "" +} + +func (m *StreamMeta) GetSenderAdvertisedAddr() string { + if m != nil { + return m.SenderAdvertisedAddr + } + return "" +} + +type MessagePacket struct { + Meta *StreamMeta `protobuf:"bytes,1,opt,name=meta,proto3" json:"meta,omitempty"` + // multiple messages can be batched. + Entries []*MessageEntry `protobuf:"bytes,2,rep,name=entries,proto3" json:"entries,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *MessagePacket) Reset() { *m = MessagePacket{} } +func (m *MessagePacket) String() string { return proto.CompactTextString(m) } +func (*MessagePacket) ProtoMessage() {} +func (*MessagePacket) Descriptor() ([]byte, []int) { + return fileDescriptor_6560df28dddfd2cc, []int{2} +} +func (m *MessagePacket) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *MessagePacket) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_MessagePacket.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *MessagePacket) XXX_Merge(src proto.Message) { + xxx_messageInfo_MessagePacket.Merge(m, src) +} +func (m *MessagePacket) XXX_Size() int { + return m.Size() +} +func (m *MessagePacket) XXX_DiscardUnknown() { + xxx_messageInfo_MessagePacket.DiscardUnknown(m) +} + +var xxx_messageInfo_MessagePacket proto.InternalMessageInfo + +func (m *MessagePacket) GetMeta() *StreamMeta { + if m != nil { + return m.Meta + } + return nil +} + +func (m *MessagePacket) GetEntries() []*MessageEntry { + if m != nil { + return m.Entries + } + return nil +} + +type Ack struct { + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + // the sequence of an already processed message. + // Must be monotonically increasing for a given topic and two given node processes. + LastSeq int64 `protobuf:"varint,2,opt,name=last_seq,json=lastSeq,proto3" json:"last_seq,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Ack) Reset() { *m = Ack{} } +func (m *Ack) String() string { return proto.CompactTextString(m) } +func (*Ack) ProtoMessage() {} +func (*Ack) Descriptor() ([]byte, []int) { + return fileDescriptor_6560df28dddfd2cc, []int{3} +} +func (m *Ack) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Ack) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Ack.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Ack) XXX_Merge(src proto.Message) { + xxx_messageInfo_Ack.Merge(m, src) +} +func (m *Ack) XXX_Size() int { + return m.Size() +} +func (m *Ack) XXX_DiscardUnknown() { + xxx_messageInfo_Ack.DiscardUnknown(m) +} + +var xxx_messageInfo_Ack proto.InternalMessageInfo + +func (m *Ack) GetTopic() string { + if m != nil { + return m.Topic + } + return "" +} + +func (m *Ack) GetLastSeq() int64 { + if m != nil { + return m.LastSeq + } + return 0 +} + +type SendMessageResponse struct { + Ack []*Ack `protobuf:"bytes,1,rep,name=ack,proto3" json:"ack,omitempty"` + ExitReason ExitReason `protobuf:"varint,2,opt,name=exit_reason,json=exitReason,proto3,enum=p2p.ExitReason" json:"exit_reason,omitempty"` + ErrorMessage string `protobuf:"bytes,3,opt,name=error_message,json=errorMessage,proto3" json:"error_message,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SendMessageResponse) Reset() { *m = SendMessageResponse{} } +func (m *SendMessageResponse) String() string { return proto.CompactTextString(m) } +func (*SendMessageResponse) ProtoMessage() {} +func (*SendMessageResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_6560df28dddfd2cc, []int{4} +} +func (m *SendMessageResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SendMessageResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SendMessageResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SendMessageResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_SendMessageResponse.Merge(m, src) +} +func (m *SendMessageResponse) XXX_Size() int { + return m.Size() +} +func (m *SendMessageResponse) XXX_DiscardUnknown() { + xxx_messageInfo_SendMessageResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_SendMessageResponse proto.InternalMessageInfo + +func (m *SendMessageResponse) GetAck() []*Ack { + if m != nil { + return m.Ack + } + return nil +} + +func (m *SendMessageResponse) GetExitReason() ExitReason { + if m != nil { + return m.ExitReason + } + return ExitReason_UNKNOWN +} + +func (m *SendMessageResponse) GetErrorMessage() string { + if m != nil { + return m.ErrorMessage + } + return "" +} + +func init() { + proto.RegisterEnum("p2p.ExitReason", ExitReason_name, ExitReason_value) + proto.RegisterType((*MessageEntry)(nil), "p2p.MessageEntry") + proto.RegisterType((*StreamMeta)(nil), "p2p.StreamMeta") + proto.RegisterType((*MessagePacket)(nil), "p2p.MessagePacket") + proto.RegisterType((*Ack)(nil), "p2p.Ack") + proto.RegisterType((*SendMessageResponse)(nil), "p2p.SendMessageResponse") +} + +func init() { proto.RegisterFile("CDCPeerToPeer.proto", fileDescriptor_6560df28dddfd2cc) } + +var fileDescriptor_6560df28dddfd2cc = []byte{ + // 563 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x53, 0xd1, 0x6e, 0xd3, 0x30, + 0x14, 0xc5, 0xcd, 0xb6, 0xb6, 0xb7, 0xed, 0x16, 0xdc, 0x0a, 0x42, 0x91, 0x46, 0xd5, 0x09, 0xa9, + 0x02, 0x69, 0x4c, 0x05, 0xf1, 0x8a, 0x42, 0x1a, 0x41, 0xb4, 0x35, 0xad, 0x92, 0x14, 0x24, 0x5e, + 0xa2, 0x10, 0x5f, 0x8d, 0xa8, 0x5b, 0x9c, 0xd9, 0x66, 0x1a, 0x7f, 0xc0, 0x13, 0x7f, 0xc4, 0x3b, + 0x8f, 0x7c, 0x02, 0xda, 0x97, 0xa0, 0x3a, 0xe9, 0xd6, 0x09, 0xf1, 0x62, 0xf9, 0x9c, 0x73, 0x7d, + 0x8f, 0x7d, 0xae, 0x0c, 0x5d, 0x67, 0xe2, 0xcc, 0x11, 0x45, 0xc4, 0x57, 0xeb, 0x61, 0x21, 0xb8, + 0xe2, 0xd4, 0x28, 0xc6, 0x45, 0xbf, 0x77, 0xca, 0x4f, 0xb9, 0xc6, 0x2f, 0x56, 0xbb, 0x52, 0x1a, + 0x7e, 0x82, 0xf6, 0x14, 0xa5, 0x4c, 0x4e, 0xd1, 0xcd, 0x95, 0xf8, 0x46, 0x7b, 0xb0, 0xad, 0x78, + 0x91, 0xa5, 0x16, 0x19, 0x90, 0x51, 0x33, 0x28, 0x01, 0xb5, 0xa0, 0x9e, 0xf2, 0x5c, 0x61, 0xae, + 0xac, 0xda, 0x80, 0x8c, 0xda, 0xc1, 0x1a, 0xd2, 0x3e, 0x34, 0x24, 0x5e, 0x7c, 0xc5, 0x3c, 0x45, + 0xcb, 0x18, 0x90, 0x91, 0x11, 0xdc, 0xe0, 0xe1, 0x4f, 0x02, 0x10, 0x2a, 0x81, 0xc9, 0xf9, 0x14, + 0x55, 0x42, 0x1f, 0x43, 0x53, 0x62, 0xce, 0x50, 0xc4, 0x19, 0xab, 0xda, 0x37, 0x4a, 0xc2, 0x63, + 0xf4, 0x09, 0xb4, 0x04, 0xa6, 0x98, 0x5d, 0x96, 0x72, 0x4d, 0xcb, 0xb0, 0xa6, 0x3c, 0xb6, 0xba, + 0x18, 0x16, 0x3c, 0xfd, 0x52, 0xb9, 0x94, 0x80, 0x3e, 0x85, 0xdd, 0xf4, 0x2c, 0xc3, 0x5c, 0xc5, + 0x97, 0x28, 0x64, 0xc6, 0x73, 0x6b, 0xac, 0x4f, 0x76, 0x4a, 0xf6, 0x43, 0x49, 0xd2, 0x57, 0xf0, + 0xa0, 0xb2, 0x4e, 0xd8, 0x25, 0x0a, 0x95, 0x49, 0x64, 0x71, 0xc2, 0x98, 0xb0, 0x98, 0x2e, 0xef, + 0x95, 0xaa, 0x7d, 0x23, 0xda, 0x8c, 0x89, 0x61, 0x02, 0x9d, 0x2a, 0x9b, 0x79, 0x92, 0x2e, 0x51, + 0xd1, 0x03, 0xd8, 0x3a, 0x47, 0x95, 0xe8, 0xcb, 0xb7, 0xc6, 0x7b, 0x87, 0xc5, 0xb8, 0x38, 0xbc, + 0x7d, 0x60, 0xa0, 0x45, 0xfa, 0x1c, 0xea, 0x98, 0x2b, 0x91, 0xa1, 0xb4, 0x6a, 0x03, 0x63, 0xd4, + 0x1a, 0xdf, 0xd7, 0x75, 0x9b, 0x29, 0x07, 0xeb, 0x8a, 0xe1, 0x6b, 0x30, 0xec, 0x74, 0xf9, 0x9f, + 0xd4, 0x1f, 0x41, 0xe3, 0x2c, 0x91, 0x2a, 0x96, 0x78, 0xa1, 0x03, 0x31, 0x82, 0xfa, 0x0a, 0x87, + 0x78, 0x31, 0xfc, 0x4e, 0xa0, 0x1b, 0x62, 0xce, 0xaa, 0xae, 0x01, 0xca, 0x82, 0xe7, 0x12, 0x69, + 0x1f, 0x8c, 0x24, 0x5d, 0x5a, 0x44, 0x1b, 0x37, 0xb4, 0xb1, 0x9d, 0x2e, 0x83, 0x15, 0x49, 0x8f, + 0xa0, 0x85, 0x57, 0x99, 0x8a, 0x05, 0x26, 0x92, 0xe7, 0xba, 0xe3, 0x6e, 0xf5, 0x08, 0xf7, 0x2a, + 0x53, 0x81, 0xa6, 0x03, 0xc0, 0x9b, 0x3d, 0x3d, 0x80, 0x0e, 0x0a, 0xc1, 0x45, 0x7c, 0x5e, 0xda, + 0xe8, 0xec, 0x9b, 0x41, 0x5b, 0x93, 0x95, 0xf5, 0xb3, 0x1f, 0x04, 0xe0, 0xf6, 0x3c, 0x6d, 0x41, + 0x7d, 0xe1, 0x1f, 0xfb, 0xb3, 0x8f, 0xbe, 0x79, 0x8f, 0xee, 0x40, 0x6d, 0x76, 0x6c, 0x12, 0xda, + 0x81, 0xa6, 0x33, 0xf3, 0xdf, 0xb9, 0x61, 0xe4, 0x4e, 0xcc, 0x1a, 0xed, 0xc2, 0x9e, 0x63, 0xcf, + 0xa3, 0x45, 0xe0, 0xc6, 0xe1, 0xc2, 0x73, 0xbc, 0x89, 0x6b, 0x1a, 0xb4, 0x07, 0x66, 0x18, 0xd9, + 0x27, 0x6e, 0xec, 0xcc, 0x7c, 0xdf, 0x75, 0x22, 0x6f, 0xe6, 0x9b, 0x5b, 0xd4, 0x82, 0xde, 0x64, + 0x31, 0x3f, 0xf1, 0x1c, 0x3b, 0xba, 0xa3, 0x6c, 0xd3, 0x87, 0xd0, 0x5d, 0x37, 0xf1, 0x26, 0xf1, + 0xd4, 0x0b, 0xa7, 0x76, 0xe4, 0xbc, 0x37, 0x77, 0xc6, 0x73, 0xe8, 0xdc, 0xf9, 0x04, 0xf4, 0x0d, + 0xb4, 0x36, 0xb2, 0xa2, 0x74, 0x73, 0x1e, 0xe5, 0x64, 0xfb, 0x56, 0x39, 0xcb, 0x7f, 0x13, 0x1d, + 0x91, 0x23, 0xf2, 0xb6, 0xfd, 0xeb, 0x7a, 0x9f, 0xfc, 0xbe, 0xde, 0x27, 0x7f, 0xae, 0xf7, 0xc9, + 0xe7, 0x1d, 0xfd, 0x73, 0x5e, 0xfe, 0x0d, 0x00, 0x00, 0xff, 0xff, 0xf4, 0xf4, 0xd3, 0xf5, 0x6b, + 0x03, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// CDCPeerToPeerClient is the client API for CDCPeerToPeer service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type CDCPeerToPeerClient interface { + // A bidirectional stream from the sender (client) to the receiver (server) + // The send direction is used to carry the serialized payload, and the + // reply direction is used to receive ACKs (progress information) from the server. + SendMessage(ctx context.Context, opts ...grpc.CallOption) (CDCPeerToPeer_SendMessageClient, error) +} + +type cDCPeerToPeerClient struct { + cc *grpc.ClientConn +} + +func NewCDCPeerToPeerClient(cc *grpc.ClientConn) CDCPeerToPeerClient { + return &cDCPeerToPeerClient{cc} +} + +func (c *cDCPeerToPeerClient) SendMessage(ctx context.Context, opts ...grpc.CallOption) (CDCPeerToPeer_SendMessageClient, error) { + stream, err := c.cc.NewStream(ctx, &_CDCPeerToPeer_serviceDesc.Streams[0], "/p2p.CDCPeerToPeer/SendMessage", opts...) + if err != nil { + return nil, err + } + x := &cDCPeerToPeerSendMessageClient{stream} + return x, nil +} + +type CDCPeerToPeer_SendMessageClient interface { + Send(*MessagePacket) error + Recv() (*SendMessageResponse, error) + grpc.ClientStream +} + +type cDCPeerToPeerSendMessageClient struct { + grpc.ClientStream +} + +func (x *cDCPeerToPeerSendMessageClient) Send(m *MessagePacket) error { + return x.ClientStream.SendMsg(m) +} + +func (x *cDCPeerToPeerSendMessageClient) Recv() (*SendMessageResponse, error) { + m := new(SendMessageResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// CDCPeerToPeerServer is the server API for CDCPeerToPeer service. +type CDCPeerToPeerServer interface { + // A bidirectional stream from the sender (client) to the receiver (server) + // The send direction is used to carry the serialized payload, and the + // reply direction is used to receive ACKs (progress information) from the server. + SendMessage(CDCPeerToPeer_SendMessageServer) error +} + +// UnimplementedCDCPeerToPeerServer can be embedded to have forward compatible implementations. +type UnimplementedCDCPeerToPeerServer struct { +} + +func (*UnimplementedCDCPeerToPeerServer) SendMessage(srv CDCPeerToPeer_SendMessageServer) error { + return status.Errorf(codes.Unimplemented, "method SendMessage not implemented") +} + +func RegisterCDCPeerToPeerServer(s *grpc.Server, srv CDCPeerToPeerServer) { + s.RegisterService(&_CDCPeerToPeer_serviceDesc, srv) +} + +func _CDCPeerToPeer_SendMessage_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(CDCPeerToPeerServer).SendMessage(&cDCPeerToPeerSendMessageServer{stream}) +} + +type CDCPeerToPeer_SendMessageServer interface { + Send(*SendMessageResponse) error + Recv() (*MessagePacket, error) + grpc.ServerStream +} + +type cDCPeerToPeerSendMessageServer struct { + grpc.ServerStream +} + +func (x *cDCPeerToPeerSendMessageServer) Send(m *SendMessageResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *cDCPeerToPeerSendMessageServer) Recv() (*MessagePacket, error) { + m := new(MessagePacket) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _CDCPeerToPeer_serviceDesc = grpc.ServiceDesc{ + ServiceName: "p2p.CDCPeerToPeer", + HandlerType: (*CDCPeerToPeerServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "SendMessage", + Handler: _CDCPeerToPeer_SendMessage_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "CDCPeerToPeer.proto", +} + +func (m *MessageEntry) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *MessageEntry) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *MessageEntry) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if m.Sequence != 0 { + i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(m.Sequence)) + i-- + dAtA[i] = 0x18 + } + if len(m.Content) > 0 { + i -= len(m.Content) + copy(dAtA[i:], m.Content) + i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(len(m.Content))) + i-- + dAtA[i] = 0x12 + } + if len(m.Topic) > 0 { + i -= len(m.Topic) + copy(dAtA[i:], m.Topic) + i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(len(m.Topic))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *StreamMeta) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *StreamMeta) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *StreamMeta) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.SenderAdvertisedAddr) > 0 { + i -= len(m.SenderAdvertisedAddr) + copy(dAtA[i:], m.SenderAdvertisedAddr) + i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(len(m.SenderAdvertisedAddr))) + i-- + dAtA[i] = 0x6 + i-- + dAtA[i] = 0xa2 + } + if len(m.ClientVersion) > 0 { + i -= len(m.ClientVersion) + copy(dAtA[i:], m.ClientVersion) + i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(len(m.ClientVersion))) + i-- + dAtA[i] = 0x3 + i-- + dAtA[i] = 0x92 + } + if m.Epoch != 0 { + i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(m.Epoch)) + i-- + dAtA[i] = 0x18 + } + if len(m.ReceiverId) > 0 { + i -= len(m.ReceiverId) + copy(dAtA[i:], m.ReceiverId) + i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(len(m.ReceiverId))) + i-- + dAtA[i] = 0x12 + } + if len(m.SenderId) > 0 { + i -= len(m.SenderId) + copy(dAtA[i:], m.SenderId) + i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(len(m.SenderId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *MessagePacket) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *MessagePacket) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *MessagePacket) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.Entries) > 0 { + for iNdEx := len(m.Entries) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Entries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + } + if m.Meta != nil { + { + size, err := m.Meta.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *Ack) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Ack) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Ack) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if m.LastSeq != 0 { + i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(m.LastSeq)) + i-- + dAtA[i] = 0x10 + } + if len(m.Topic) > 0 { + i -= len(m.Topic) + copy(dAtA[i:], m.Topic) + i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(len(m.Topic))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *SendMessageResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SendMessageResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SendMessageResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.ErrorMessage) > 0 { + i -= len(m.ErrorMessage) + copy(dAtA[i:], m.ErrorMessage) + i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(len(m.ErrorMessage))) + i-- + dAtA[i] = 0x1a + } + if m.ExitReason != 0 { + i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(m.ExitReason)) + i-- + dAtA[i] = 0x10 + } + if len(m.Ack) > 0 { + for iNdEx := len(m.Ack) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Ack[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func encodeVarintCDCPeerToPeer(dAtA []byte, offset int, v uint64) int { + offset -= sovCDCPeerToPeer(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *MessageEntry) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Topic) + if l > 0 { + n += 1 + l + sovCDCPeerToPeer(uint64(l)) + } + l = len(m.Content) + if l > 0 { + n += 1 + l + sovCDCPeerToPeer(uint64(l)) + } + if m.Sequence != 0 { + n += 1 + sovCDCPeerToPeer(uint64(m.Sequence)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *StreamMeta) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.SenderId) + if l > 0 { + n += 1 + l + sovCDCPeerToPeer(uint64(l)) + } + l = len(m.ReceiverId) + if l > 0 { + n += 1 + l + sovCDCPeerToPeer(uint64(l)) + } + if m.Epoch != 0 { + n += 1 + sovCDCPeerToPeer(uint64(m.Epoch)) + } + l = len(m.ClientVersion) + if l > 0 { + n += 2 + l + sovCDCPeerToPeer(uint64(l)) + } + l = len(m.SenderAdvertisedAddr) + if l > 0 { + n += 2 + l + sovCDCPeerToPeer(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *MessagePacket) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Meta != nil { + l = m.Meta.Size() + n += 1 + l + sovCDCPeerToPeer(uint64(l)) + } + if len(m.Entries) > 0 { + for _, e := range m.Entries { + l = e.Size() + n += 1 + l + sovCDCPeerToPeer(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *Ack) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Topic) + if l > 0 { + n += 1 + l + sovCDCPeerToPeer(uint64(l)) + } + if m.LastSeq != 0 { + n += 1 + sovCDCPeerToPeer(uint64(m.LastSeq)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *SendMessageResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Ack) > 0 { + for _, e := range m.Ack { + l = e.Size() + n += 1 + l + sovCDCPeerToPeer(uint64(l)) + } + } + if m.ExitReason != 0 { + n += 1 + sovCDCPeerToPeer(uint64(m.ExitReason)) + } + l = len(m.ErrorMessage) + if l > 0 { + n += 1 + l + sovCDCPeerToPeer(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovCDCPeerToPeer(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozCDCPeerToPeer(x uint64) (n int) { + return sovCDCPeerToPeer(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *MessageEntry) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: MessageEntry: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: MessageEntry: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Topic = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Content", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Content = append(m.Content[:0], dAtA[iNdEx:postIndex]...) + if m.Content == nil { + m.Content = []byte{} + } + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Sequence", wireType) + } + m.Sequence = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Sequence |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipCDCPeerToPeer(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *StreamMeta) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StreamMeta: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StreamMeta: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SenderId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.SenderId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ReceiverId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ReceiverId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Epoch", wireType) + } + m.Epoch = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Epoch |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 50: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ClientVersion", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ClientVersion = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 100: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SenderAdvertisedAddr", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.SenderAdvertisedAddr = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipCDCPeerToPeer(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *MessagePacket) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: MessagePacket: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: MessagePacket: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Meta", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Meta == nil { + m.Meta = &StreamMeta{} + } + if err := m.Meta.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Entries", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Entries = append(m.Entries, &MessageEntry{}) + if err := m.Entries[len(m.Entries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipCDCPeerToPeer(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Ack) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Ack: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Ack: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Topic = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field LastSeq", wireType) + } + m.LastSeq = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.LastSeq |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipCDCPeerToPeer(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SendMessageResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SendMessageResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SendMessageResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Ack", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Ack = append(m.Ack, &Ack{}) + if err := m.Ack[len(m.Ack)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ExitReason", wireType) + } + m.ExitReason = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ExitReason |= ExitReason(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ErrorMessage", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ErrorMessage = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipCDCPeerToPeer(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthCDCPeerToPeer + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipCDCPeerToPeer(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowCDCPeerToPeer + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthCDCPeerToPeer + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupCDCPeerToPeer + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthCDCPeerToPeer + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthCDCPeerToPeer = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowCDCPeerToPeer = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupCDCPeerToPeer = fmt.Errorf("proto: unexpected end of group") +)