Skip to content

Commit

Permalink
metrics(cdc): fix mq sink write row count metrics. (pingcap#4192) (pi…
Browse files Browse the repository at this point in the history
…ngcap#4323)

* fix the txn_batch_size metric inaccuracy bug when the sink target is MQ

* address comments

* add comments for exported functions

* fix the compiling problem

* workerpool: limit the rate to output deadlock warning (pingcap#3775) (pingcap#3795)

* tests(ticdc): set up the sync diff output directory correctly (pingcap#3725) (pingcap#3741)

* relay(dm): use binlog name comparison (pingcap#3710) (pingcap#3712)

* dm/load: fix concurrent call Loader.Status (pingcap#3459) (pingcap#3468)

* cdc/sorter: make unified sorter cgroup aware (pingcap#3436) (pingcap#3439)

* tz (ticdc): fix timezone error (pingcap#3887) (pingcap#3906)

* pkg,cdc: do not use log package (pingcap#3902) (pingcap#3940)

* *: rename repo from pingcap/ticdc to pingcap/tiflow (pingcap#3959)

* http_*: add log for http api and refine the err handle logic (pingcap#2997) (pingcap#3307)

* etcd_worker: batch etcd patch (pingcap#3277) (pingcap#3389)

* http_api (ticdc): check --cert-allowed-cn before add server common name (pingcap#3628) (pingcap#3882)

* kvclient(ticdc): fix kvclient takes too long time to recover (pingcap#3612) (pingcap#3663)

* owner: fix owner tick block http request (pingcap#3490) (pingcap#3530)

* dm/syncer: use downstream PK/UK to generate DML (pingcap#3168) (pingcap#3256)

* dep(dm): update go-mysql (pingcap#3914) (pingcap#3934)

* dm/syncer: multiple rows use downstream schema (pingcap#3308) (pingcap#3953)

* errorutil,sink,syncer: add errorutil to handle ignorable error (pingcap#3264) (pingcap#3995)

* dm/worker: don't exit when failed to read checkpoint in relay (pingcap#3345) (pingcap#4005)

* syncer(dm): use an early location to reset binlog and open safemode (pingcap#3860)

* ticdc/owner: Fix ddl special comment syntax error (pingcap#3845) (pingcap#3978)

* dm/scheduler: fix inconsistent of relay status (pingcap#3474) (pingcap#4009)

* owner,scheduler(cdc): fix nil pointer panic in owner scheduler (pingcap#2980) (pingcap#4007) (pingcap#4016)

* config(ticdc): Fix old value configuration check for maxwell protocol (pingcap#3747) (pingcap#3783)

* sink(ticdc): cherry pick sink bug fix to release 5.3 (pingcap#4083)

* master(dm): clean and treat invalid load task (pingcap#4004) (pingcap#4145)

* loader: fix wrong progress in query-status for loader (pingcap#4093) (pingcap#4143)

close pingcap#3252

* ticdc/processor: Fix backoff base delay misconfiguration (pingcap#3992) (pingcap#4028)

* dm: load table structure from dump files (pingcap#3295) (pingcap#4163)

* compactor: fix duplicate entry in safemode (pingcap#3432) (pingcap#3434) (pingcap#4088)

* kv(ticdc): reduce eventfeed rate limited log (pingcap#4072) (pingcap#4111)

close pingcap#4006

* metrics(ticdc): add resolved ts and add changefeed to dataflow (pingcap#4038) (pingcap#4104)

* This is an automated cherry-pick of pingcap#4192

Signed-off-by: ti-chi-bot <[email protected]>

* retry(dm): align with tidb latest error message (pingcap#4172) (pingcap#4254)

close pingcap#4159, close pingcap#4246

* owner(ticdc): Add bootstrap and try to fix the meta information in it (pingcap#3838) (pingcap#3865)

* redolog: add a precleanup process when s3 enable (pingcap#3525) (pingcap#3878)

* ddl(dm): make skipped ddl pass `SplitDDL()` (pingcap#4176) (pingcap#4227)

close pingcap#4173

* cdc/sink: remove Initialize method from the sink interface (pingcap#3682) (pingcap#3765)

Co-authored-by: Ling Jin <[email protected]>

* http_api (ticdc): fix http api 'get processor' panic. (pingcap#4117) (pingcap#4123)

close pingcap#3840

* sink (ticdc): fix a deadlock due to checkpointTs fall back in sinkNode (pingcap#4084) (pingcap#4099)

close pingcap#4055

* cdc/sink: adjust kafka initialization logic (pingcap#3192) (pingcap#4162)

* try fix conflicts.

* This is an automated cherry-pick of pingcap#4192

Signed-off-by: ti-chi-bot <[email protected]>

* fix conflicts.

* fix conflicts.

Co-authored-by: zhaoxinyu <[email protected]>
Co-authored-by: amyangfei <[email protected]>
Co-authored-by: lance6716 <[email protected]>
Co-authored-by: sdojjy <[email protected]>
Co-authored-by: Ling Jin <[email protected]>
Co-authored-by: 3AceShowHand <[email protected]>
  • Loading branch information
7 people authored Jan 18, 2022
1 parent 6c434c2 commit ae04769
Show file tree
Hide file tree
Showing 8 changed files with 13 additions and 2 deletions.
1 change: 1 addition & 0 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
}

mqMessage.Key = evlp
mqMessage.IncRowsCount()
a.resultBuf = append(a.resultBuf, mqMessage)

return EncoderNeedAsyncWrite, nil
Expand Down
4 changes: 3 additions & 1 deletion cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,8 @@ func (d *CanalEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage,

// Build implements the EventBatchEncoder interface
func (d *CanalEventBatchEncoder) Build() []*MQMessage {
if len(d.messages.Messages) == 0 {
rowCount := len(d.messages.Messages)
if rowCount == 0 {
return nil
}

Expand All @@ -392,6 +393,7 @@ func (d *CanalEventBatchEncoder) Build() []*MQMessage {
log.Panic("Error when serializing Canal packet", zap.Error(err))
}
ret := NewMQMessage(ProtocolCanal, nil, value, 0, model.MqMessageTypeRow, nil, nil)
ret.SetRowsCount(rowCount)
d.messages.Reset()
d.resetPacket()
return []*MQMessage{ret}
Expand Down
4 changes: 3 additions & 1 deletion cdc/sink/codec/canal_flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage {
log.Panic("CanalFlatEventBatchEncoder", zap.Error(err))
return nil
}
ret[i] = NewMQMessage(ProtocolCanalJSON, nil, value, msg.tikvTs, model.MqMessageTypeRow, &msg.Schema, &msg.Table)
m := NewMQMessage(ProtocolCanalJSON, nil, value, msg.tikvTs, model.MqMessageTypeRow, &msg.Schema, &msg.Table)
m.IncRowsCount()
ret[i] = m
}
c.resolvedBuf = c.resolvedBuf[0:0]
return ret
Expand Down
2 changes: 2 additions & 0 deletions cdc/sink/codec/canal_flat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func (s *canalFlatSuite) TestBatching(c *check.C) {
c.Assert(msgs, check.HasLen, int(resolvedTs-lastResolved))

for j := range msgs {
c.Assert(msgs[j].GetRowsCount(), check.Equals, 1)

var msg canalFlatMessage
err := json.Unmarshal(msgs[j].Value, &msg)
c.Assert(err, check.IsNil)
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/codec/canal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (s *canalBatchSuite) TestCanalEventBatchEncoder(c *check.C) {
c.Assert(res, check.HasLen, 1)
c.Assert(res[0].Key, check.IsNil)
c.Assert(len(res[0].Value), check.Equals, size)
c.Assert(res[0].GetRowsCount(), check.Equals, len(cs))

packet := &canal.Packet{}
err := proto.Unmarshal(res[0].Value, packet)
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
if len(cs) > 0 {
res := encoder.Build()
c.Assert(res, check.HasLen, 1)
c.Assert(res[0].GetRowsCount(), check.Equals, len(cs))
decoder, err := newDecoder(res[0].Key, res[0].Value)
c.Assert(err, check.IsNil)
checkRowDecoder(decoder, cs)
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/codec/maxwell.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ func (d *MaxwellEventBatchEncoder) Build() []*MQMessage {
}

ret := NewMQMessage(ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil)
ret.SetRowsCount(d.batchSize)
d.Reset()
return []*MQMessage{ret}
}
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/codec/maxwell_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (s *maxwellbatchSuite) testmaxwellBatchCodec(c *check.C, newEncoder func()
continue
}
c.Assert(messages, check.HasLen, 1)
c.Assert(messages[0].GetRowsCount(), check.Equals, len(cs))
c.Assert(len(messages[0].Key)+len(messages[0].Value), check.Equals, size)
}

Expand Down

0 comments on commit ae04769

Please sign in to comment.