Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

failed rows are skipped due to checkpoint flush #5279

Closed
D3Hunter opened this issue Apr 26, 2022 · 6 comments · Fixed by #5295
Closed

failed rows are skipped due to checkpoint flush #5279

D3Hunter opened this issue Apr 26, 2022 · 6 comments · Fixed by #5295
Assignees
Labels
affects-6.0 area/dm Issues or PRs related to DM. severity/critical type/bug The issue is confirmed as a bug.

Comments

@D3Hunter
Copy link
Contributor

D3Hunter commented Apr 26, 2022

What did you do?

most steps can reuse #5272, but need to set checkpoint-flush-interval: 1 to make it eaiser to reproduce.other case may reproduce this bug too, just make sure the failed row should trigger table checkpoint flush

  • create source is in pos mode
  • create test.test1 in upstream
  • start-task --remove-meta
  • alter table test1 add column c4 int; in upstream
  • create table test2 (c int primary key); in upstream or wait 30s, to flush checkpoints
  • wait for 1 more seconds, so the next insert will trigger checkpoint flush(this will cause the pos of failed row flushed into table point)
  • insert into test1. Now task will report error because downstream doesn't have column c4, but before the error, checkpoint is flushed:
    return s.flushIfOutdated()
  • task is auto resumed, and the failed row is skipped because the pos of the failed row is smaller than current table checkpoint.
    if s.checkpoint.IsOlderThanTablePoint(sourceTable, *ec.currentLocation, false) {

also this case may cause global point larger than table point:

+------------+-----------+----------+------------------+------------+
| id         | cp_schema | cp_table | binlog_name      | binlog_pos |
+------------+-----------+----------+------------------+------------+
| mysql-3306 |           |          | mysql-bin.000001 |        766 |
| mysql-3306 | test      | test2    | mysql-bin.000001 |        505 |
| mysql-3306 | test      | test1    | mysql-bin.000001 |        735 |
+------------+-----------+----------+------------------+------------+
name: foo-task
task-mode: all


target-database:
  host: "127.0.0.1"
  port: 4000
  user: "root"
  password: "" # 如果密码不为空,则推荐使用经过 dmctl 加密的密文

mysql-instances:
  - source-id: "mysql-3306"
    block-allow-list:  "ba-rule1"
    syncer:
      checkpoint-flush-interval: 1
      enable-gtid: true
    filter-rules: ["filter-rule-1"]

block-allow-list:
  ba-rule1:
    do-dbs: ["test"]

filters:
  filter-rule-1:
    schema-pattern: "test"
    table-pattern: "test1"
    events: ["all ddl"]
    action: Ignore

What did you expect to see?

for this case it should fail again on the failed row

What did you see instead?

the row is skipped

Versions of the cluster

current master

Release Version: v6.1.0-alpha-101-ga7bcd922a
Git Commit Hash: a7bcd922ae761e20525d847f46a4377047c46cef
Git Branch: HEAD
UTC Build Time: 2022-04-26 09:26:31
Go Version: go version go1.18 darwin/arm64

current status of DM cluster (execute query-status <task-name> in dmctl)

(paste current status of DM cluster here)
@D3Hunter D3Hunter added type/bug The issue is confirmed as a bug. area/dm Issues or PRs related to DM. labels Apr 26, 2022
@GMHDBJD
Copy link
Contributor

GMHDBJD commented Apr 26, 2022

this will cause the pos of failed row flushed into table point

Why the table point will be flushed? 🤔 If row failed, the checkpoint should not be flushed.

tiflow/dm/syncer/syncer.go

Lines 1143 to 1153 in 967060a

err := s.execError.Load()
// TODO: for now, if any error occurred (including user canceled), checkpoint won't be updated. But if we have put
// optimistic shard info, DM-master may resolved the optimistic lock and let other worker execute DDL. So after this
// worker resume, it can not execute the DML/DDL in old binlog because of downstream table structure mismatching.
// We should find a way to (compensating) implement a transaction containing interaction with both etcd and SQL.
if err != nil && (terror.ErrDBExecuteFailed.Equal(err) || terror.ErrDBUnExpect.Equal(err)) {
s.tctx.L().Warn("error detected when executing SQL job, skip sync flush checkpoints",
zap.Stringer("checkpoint", s.checkpoint),
zap.Error(err))
return nil
}

@D3Hunter
Copy link
Contributor Author

this will cause the pos of failed row flushed into table point

Why the table point will be flushed? 🤔 If row failed, the checkpoint should not be flushed.

tiflow/dm/syncer/syncer.go

Lines 1143 to 1153 in 967060a

err := s.execError.Load()
// TODO: for now, if any error occurred (including user canceled), checkpoint won't be updated. But if we have put
// optimistic shard info, DM-master may resolved the optimistic lock and let other worker execute DDL. So after this
// worker resume, it can not execute the DML/DDL in old binlog because of downstream table structure mismatching.
// We should find a way to (compensating) implement a transaction containing interaction with both etcd and SQL.
if err != nil && (terror.ErrDBExecuteFailed.Equal(err) || terror.ErrDBUnExpect.Equal(err)) {
s.tctx.L().Warn("error detected when executing SQL job, skip sync flush checkpoints",
zap.Stringer("checkpoint", s.checkpoint),
zap.Error(err))
return nil
}

after send the job, but before the row fail, checkpoint is flushed

@lance6716
Copy link
Contributor

can you provide a modification to test script? so we can reproduce this issue more deterministic

@D3Hunter
Copy link
Contributor Author

can you provide a modification to test script? so we can reproduce this issue more deterministic

use task config in desc, can reproduce every time

@lance6716
Copy link
Contributor

maybe I should call s.flushJobs() instead of s.jobWg.Wait() and s.flushCheckPoints() . Will take a look tomorrow

tiflow/dm/syncer/syncer.go

Lines 989 to 1004 in be428b7

func (s *Syncer) flushIfOutdated() error {
if !s.checkpoint.LastFlushOutdated() {
return nil
}
if s.cfg.Experimental.AsyncCheckpointFlush {
jobSeq := s.getFlushSeq()
s.tctx.L().Info("Start to async flush current checkpoint to downstream based on flush interval", zap.Int64("job sequence", jobSeq))
j := newAsyncFlushJob(s.cfg.WorkerCount, jobSeq)
s.addJob(j)
s.flushCheckPointsAsync(j)
return nil
}
s.jobWg.Wait()
return s.flushCheckPoints()
}

in the AsyncCheckpointFlush branch, we add the job and call flushCheckPointsAsync. But in sync flushing branch, seems we should only add the flush job and let handleJob call flushCheckPoints. I also hope we can unify the calling style 🤔

@niubell
Copy link
Contributor

niubell commented Apr 27, 2022

/assign D3Hunter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affects-6.0 area/dm Issues or PRs related to DM. severity/critical type/bug The issue is confirmed as a bug.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants