Skip to content

Commit

Permalink
Merge branch 'master' into rustin-patch-sink-hooks
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Mar 7, 2022
2 parents e5ba10a + 643ac18 commit d65308c
Show file tree
Hide file tree
Showing 146 changed files with 2,931 additions and 1,183 deletions.
7 changes: 4 additions & 3 deletions cdc/api/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
"github.com/gin-gonic/gin"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/client-go/v2/oracle"
"go.etcd.io/etcd/client/v3/concurrency"
"go.uber.org/zap"

"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/logutil"
"github.com/tikv/client-go/v2/oracle"
"go.etcd.io/etcd/clientv3/concurrency"
"go.uber.org/zap"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion cdc/api/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

"github.com/gin-gonic/gin"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/client/v3/concurrency"
)

func TestHTTPStatus(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cdc/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/version"
"go.etcd.io/etcd/clientv3"
clientv3 "go.etcd.io/etcd/client/v3"
)

// status of cdc server
Expand Down
13 changes: 7 additions & 6 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/mvcc"
"go.uber.org/zap"
"golang.org/x/time/rate"

"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
Expand All @@ -39,12 +46,6 @@ import (
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/pdtime"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/mvcc"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

// Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it.
Expand Down
25 changes: 18 additions & 7 deletions cdc/model/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ func DispatchTableTopic(changefeedID ChangeFeedID) p2p.Topic {

// DispatchTableMessage is the message body for dispatching a table.
type DispatchTableMessage struct {
OwnerRev int64 `json:"owner-rev"`
ID TableID `json:"id"`
IsDelete bool `json:"is-delete"`
OwnerRev int64 `json:"owner-rev"`
Epoch ProcessorEpoch `json:"epoch"`
ID TableID `json:"id"`
IsDelete bool `json:"is-delete"`
}

// DispatchTableResponseTopic returns a message topic for the result of
Expand All @@ -44,7 +45,8 @@ func DispatchTableResponseTopic(changefeedID ChangeFeedID) p2p.Topic {

// DispatchTableResponseMessage is the message body for the result of dispatching a table.
type DispatchTableResponseMessage struct {
ID TableID `json:"id"`
ID TableID `json:"id"`
Epoch ProcessorEpoch `json:"epoch"`
}

// AnnounceTopic returns a message topic for announcing an ownership change.
Expand All @@ -64,14 +66,23 @@ func SyncTopic(changefeedID ChangeFeedID) p2p.Topic {
return fmt.Sprintf("send-status-resp/%s", changefeedID)
}

// ProcessorEpoch designates a continuous period of the processor working normally.
type ProcessorEpoch = string

// SyncMessage is the message body for syncing the current states of a processor.
// MsgPack serialization has been implemented to minimize the size of the message.
type SyncMessage struct {
// Sends the processor's version for compatibility check
ProcessorVersion string
Running []TableID
Adding []TableID
Removing []TableID

// Epoch is reset to a unique value when the processor has
// encountered an internal error or other events so that
// it has to re-sync its states with the Owner.
Epoch ProcessorEpoch

Running []TableID
Adding []TableID
Removing []TableID
}

// Marshal serializes the message into MsgPack format.
Expand Down
8 changes: 5 additions & 3 deletions cdc/model/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,23 @@ func makeVeryLargeSyncMessage() *SyncMessage {
func TestMarshalDispatchTableMessage(t *testing.T) {
msg := &DispatchTableMessage{
OwnerRev: 1,
Epoch: "test-epoch",
ID: TableID(1),
IsDelete: true,
}
bytes, err := json.Marshal(msg)
require.NoError(t, err)
require.Equal(t, `{"owner-rev":1,"id":1,"is-delete":true}`, string(bytes))
require.Equal(t, `{"owner-rev":1,"epoch":"test-epoch","id":1,"is-delete":true}`, string(bytes))
}

func TestMarshalDispatchTableResponseMessage(t *testing.T) {
msg := &DispatchTableResponseMessage{
ID: TableID(1),
ID: TableID(1),
Epoch: "test-epoch",
}
bytes, err := json.Marshal(msg)
require.NoError(t, err)
require.Equal(t, `{"id":1}`, string(bytes))
require.Equal(t, `{"id":1,"epoch":"test-epoch"}`, string(bytes))
}

func TestMarshalAnnounceMessage(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var (
Subsystem: "owner",
Name: "maintain_table_num",
Help: "number of replicated tables maintained in owner",
}, []string{"changefeed", "type"})
}, []string{"changefeed", "capture", "type"})
changefeedStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Expand Down
20 changes: 13 additions & 7 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,23 +352,29 @@ func (o *ownerImpl) updateMetrics(state *orchestrator.GlobalReactorState) {
pendingCounts := infoProvider.GetPendingTableCounts()

for captureID, info := range o.captures {
ownerMaintainTableNumGauge.WithLabelValues(
cfID, info.AdvertiseAddr, maintainTableTypeTotal).Set(float64(totalCounts[captureID]))
ownerMaintainTableNumGauge.WithLabelValues(
cfID, info.AdvertiseAddr, maintainTableTypeWip).Set(float64(pendingCounts[captureID]))
ownerMaintainTableNumGauge.
WithLabelValues(cfID, info.AdvertiseAddr, maintainTableTypeTotal).
Set(float64(totalCounts[captureID]))
ownerMaintainTableNumGauge.
WithLabelValues(cfID, info.AdvertiseAddr, maintainTableTypeWip).
Set(float64(pendingCounts[captureID]))
}
}
return
}

for changefeedID, changefeedState := range state.Changefeeds {
for captureID := range state.Captures {
for captureID, captureInfo := range state.Captures {
taskStatus, exist := changefeedState.TaskStatuses[captureID]
if !exist {
continue
}
ownerMaintainTableNumGauge.WithLabelValues(changefeedID, maintainTableTypeTotal).Set(float64(len(taskStatus.Tables)))
ownerMaintainTableNumGauge.WithLabelValues(changefeedID, maintainTableTypeWip).Set(float64(len(taskStatus.Operation)))
ownerMaintainTableNumGauge.
WithLabelValues(changefeedID, captureInfo.AdvertiseAddr, maintainTableTypeTotal).
Set(float64(len(taskStatus.Tables)))
ownerMaintainTableNumGauge.
WithLabelValues(changefeedID, captureInfo.AdvertiseAddr, maintainTableTypeWip).
Set(float64(len(taskStatus.Operation)))
if changefeedState.Info != nil {
changefeedStatusGauge.WithLabelValues(changefeedID).Set(float64(changefeedState.Info.State.ToInt()))
}
Expand Down
29 changes: 27 additions & 2 deletions cdc/owner/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,27 @@ func (s *schedulerV2) DispatchTable(
tableID model.TableID,
captureID model.CaptureID,
isDelete bool,
epoch model.ProcessorEpoch,
) (done bool, err error) {
topic := model.DispatchTableTopic(changeFeedID)
message := &model.DispatchTableMessage{
OwnerRev: ctx.GlobalVars().OwnerRevision,
ID: tableID,
IsDelete: isDelete,
Epoch: epoch,
}

defer func() {
if err != nil {
return
}
log.Info("schedulerV2: DispatchTable",
zap.Any("message", message),
zap.Any("successful", done),
zap.String("changefeedID", changeFeedID),
zap.String("captureID", captureID))
}()

ok, err := s.trySendMessage(ctx, captureID, topic, message)
if err != nil {
return false, errors.Trace(err)
Expand All @@ -155,13 +168,24 @@ func (s *schedulerV2) Announce(
ctx context.Context,
changeFeedID model.ChangeFeedID,
captureID model.CaptureID,
) (bool, error) {
) (done bool, err error) {
topic := model.AnnounceTopic(changeFeedID)
message := &model.AnnounceMessage{
OwnerRev: ctx.GlobalVars().OwnerRevision,
OwnerVersion: version.ReleaseSemver(),
}

defer func() {
if err != nil {
return
}
log.Info("schedulerV2: Announce",
zap.Any("message", message),
zap.Any("successful", done),
zap.String("changefeedID", changeFeedID),
zap.String("captureID", captureID))
}()

ok, err := s.trySendMessage(ctx, captureID, topic, message)
if err != nil {
return false, errors.Trace(err)
Expand Down Expand Up @@ -239,7 +263,7 @@ func (s *schedulerV2) registerPeerMessageHandlers(ctx context.Context) (ret erro
func(sender string, messageI interface{}) error {
message := messageI.(*model.DispatchTableResponseMessage)
s.stats.RecordDispatchResponse()
s.OnAgentFinishedTableOperation(sender, message.ID)
s.OnAgentFinishedTableOperation(sender, message.ID, message.Epoch)
return nil
})
if err != nil {
Expand All @@ -256,6 +280,7 @@ func (s *schedulerV2) registerPeerMessageHandlers(ctx context.Context) (ret erro
s.stats.RecordSync()
s.OnAgentSyncTaskStatuses(
sender,
message.Epoch,
message.Running,
message.Adding,
message.Removing)
Expand Down
Loading

0 comments on commit d65308c

Please sign in to comment.