Skip to content

Commit

Permalink
p2p: implement pkg/p2p (Part I) (#3156)
Browse files Browse the repository at this point in the history
  • Loading branch information
liuzix committed Nov 2, 2021
1 parent 61c7a4e commit b501e9c
Show file tree
Hide file tree
Showing 13 changed files with 2,313 additions and 0 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,11 @@ error = '''
etcd api call error
'''

["CDC:ErrPeerMessageIllegalMeta"]
error = '''
peer-to-peer message server received an RPC call with illegal metadata
'''

["CDC:ErrPendingRegionCancel"]
error = '''
pending region cancelled due to stream disconnecting
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ require (
github.com/mattn/go-colorable v0.1.11 // indirect
github.com/mattn/go-shellwords v1.0.3
github.com/mattn/go-sqlite3 v2.0.2+incompatible // indirect
github.com/modern-go/reflect2 v1.0.1
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/philhofer/fwd v1.0.0 // indirect
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
Expand Down Expand Up @@ -77,6 +78,7 @@ require (
github.com/tinylib/msgp v1.1.0
github.com/uber-go/atomic v1.4.0
github.com/unrolled/render v1.0.1
github.com/vmihailenco/msgpack/v5 v5.3.5
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
go.etcd.io/etcd v0.5.0-alpha.5.0.20210512015243-d19fbe541bf9
go.uber.org/atomic v1.9.0
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,12 @@ github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/vmihailenco/msgpack/v4 v4.3.11/go.mod h1:gborTTJjAo/GWTqqRjrLCn9pgNN+NXzzngzBKDPIqw4=
github.com/vmihailenco/msgpack/v5 v5.0.0-beta.1/go.mod h1:xlngVLeyQ/Qi05oQxhQ+oTuqa03RjMwMfk/7/TCs+QI=
github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU=
github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc=
github.com/vmihailenco/tagparser v0.1.1 h1:quXMXlA39OCbd2wAdTsGDlK9RkOk6Wuw+x37wVyIuWY=
github.com/vmihailenco/tagparser v0.1.1/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f h1:9DDCDwOyEy/gId+IEMrFHLuQ5R/WV0KNxWLler8X2OY=
github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f/go.mod h1:8sdOQnirw1PrcnTJYkmW1iOHtUmblMmGdUOHyWYycLI=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
Expand Down
3 changes: 3 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,7 @@ var (

// tcp server error
ErrTCPServerClosed = errors.Normalize("The TCP server has been closed", errors.RFCCodeText("CDC:ErrTCPServerClosed"))

// p2p error
ErrPeerMessageIllegalMeta = errors.Normalize("peer-to-peer message server received an RPC call with illegal metadata", errors.RFCCodeText("CDC:ErrPeerMessageIllegalMeta"))
)
24 changes: 24 additions & 0 deletions pkg/p2p/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package p2p

import (
"testing"

"github.com/pingcap/ticdc/pkg/leakutil"
)

func TestMain(m *testing.M) {
leakutil.SetUpLeakTest(m)
}
28 changes: 28 additions & 0 deletions pkg/p2p/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package p2p

import "github.com/pingcap/ticdc/proto/p2p"

type (
// NodeID represents the identifier of a sender node.
// Using IP address is not enough because of possible restarts.
NodeID = string
// Topic represents the topic for a peer-to-peer message
Topic = string
// Seq represents the serial number of a message for a given topic.
Seq = int64
// MessageServerStream is an alias for the protobuf-generated interface for the message service.
MessageServerStream = p2p.CDCPeerToPeer_SendMessageServer
)
37 changes: 37 additions & 0 deletions pkg/p2p/serializer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package p2p

import "encoding/json"

// Serializable is an interface for defining custom serialization methods
// for peer messages.
type Serializable interface {
Marshal() ([]byte, error)
Unmarshal(data []byte) error
}

func marshalMessage(value interface{}) ([]byte, error) {
if value, ok := value.(Serializable); ok {
return value.Marshal()
}
return json.Marshal(value)
}

func unmarshalMessage(data []byte, value interface{}) error {
if value, ok := value.(Serializable); ok {
return value.Unmarshal(data)
}
return json.Unmarshal(data, value)
}
80 changes: 80 additions & 0 deletions pkg/p2p/serializer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package p2p

import (
"reflect"
"testing"

"github.com/stretchr/testify/require"
"github.com/vmihailenco/msgpack/v5"
)

type jsonSerializableMessage struct {
A int
B float64
C string
}

type msgpackSerializableMessage struct {
A int
B float64
C string
D []int
}

func (m *msgpackSerializableMessage) Marshal() ([]byte, error) {
return msgpack.Marshal(m)
}

func (m *msgpackSerializableMessage) Unmarshal(data []byte) error {
return msgpack.Unmarshal(data, m)
}

func TestJsonSerializable(t *testing.T) {
msg := &jsonSerializableMessage{
A: 1,
B: 2,
C: "test",
}

data, err := marshalMessage(msg)
require.NoError(t, err)

msg1 := &jsonSerializableMessage{}
err = unmarshalMessage(data, msg1)
require.NoError(t, err)

require.True(t, reflect.DeepEqual(msg, msg1))
}

func TestMsgpackSerializable(t *testing.T) {
msg := &msgpackSerializableMessage{
A: 1,
B: 2,
C: "test",
D: []int{1, 2, 3, 4, 5, 6},
}
data, err := marshalMessage(msg)
require.NoError(t, err)

data1, err := msgpack.Marshal(msg)
require.NoError(t, err)
require.Equal(t, data1, data)

msg1 := &msgpackSerializableMessage{}
err = unmarshalMessage(data, msg1)
require.NoError(t, err)
require.True(t, reflect.DeepEqual(msg, msg1))
}
120 changes: 120 additions & 0 deletions pkg/p2p/server_wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package p2p

import (
"context"
"sync"

"github.com/modern-go/reflect2"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/proto/p2p"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
gRPCPeer "google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)

type streamWrapper struct {
MessageServerStream
ctx context.Context
cancel context.CancelFunc
}

func wrapStream(stream MessageServerStream) *streamWrapper {
ctx, cancel := context.WithCancel(stream.Context())
return &streamWrapper{
MessageServerStream: stream,
ctx: ctx,
cancel: cancel,
}
}

func (w *streamWrapper) Context() context.Context {
return w.ctx
}

// ServerWrapper implements a CDCPeerToPeerServer, and it
// maintains an inner CDCPeerToPeerServer instance that can
// be replaced as needed.
type ServerWrapper struct {
rwMu sync.RWMutex
innerServer p2p.CDCPeerToPeerServer

wrappedStreamsMu sync.Mutex
wrappedStreams map[*streamWrapper]struct{}
}

// NewServerWrapper creates a new ServerWrapper
func NewServerWrapper() *ServerWrapper {
return &ServerWrapper{
wrappedStreams: map[*streamWrapper]struct{}{},
}
}

// SendMessage implements p2p.CDCPeerToPeerServer
func (s *ServerWrapper) SendMessage(stream p2p.CDCPeerToPeer_SendMessageServer) error {
s.rwMu.RLock()
innerServer := s.innerServer
s.rwMu.RUnlock()

if innerServer == nil {
var addr string
peer, ok := gRPCPeer.FromContext(stream.Context())
if ok {
addr = peer.Addr.String()
}
log.Debug("gRPC server received request while CDC capture is not running.", zap.String("addr", addr))
return status.New(codes.Unavailable, "CDC capture is not running").Err()
}

wrappedStream := wrapStream(stream)
s.wrappedStreamsMu.Lock()
s.wrappedStreams[wrappedStream] = struct{}{}
s.wrappedStreamsMu.Unlock()
defer func() {
s.wrappedStreamsMu.Lock()
delete(s.wrappedStreams, wrappedStream)
s.wrappedStreamsMu.Unlock()
wrappedStream.cancel()
}()

// Used in unit tests to simulate a race situation between `SendMessage` and `Reset`.
// TODO think of another way to make tests parallelizable.
failpoint.Inject("ServerWrapperSendMessageDelay", func() {})
return innerServer.SendMessage(wrappedStream)
}

// Reset resets the inner server object in the ServerWrapper
func (s *ServerWrapper) Reset(inner p2p.CDCPeerToPeerServer) {
s.rwMu.Lock()
defer s.rwMu.Unlock()

s.wrappedStreamsMu.Lock()
defer s.wrappedStreamsMu.Unlock()

for wrappedStream := range s.wrappedStreams {
wrappedStream.cancel()
}

// reflect2.IsNil handles two cases for us:
// 1) null value
// 2) an interface with a null value but a not-null type info.
if reflect2.IsNil(inner) {
s.innerServer = nil
return
}
s.innerServer = inner
}
Loading

0 comments on commit b501e9c

Please sign in to comment.