Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

relay: make relay log high available #1291

Merged
merged 86 commits into from
Dec 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
24e959c
support TLS in relay
GMHDBJD Nov 4, 2020
c62da6b
support gtid in relay
GMHDBJD Nov 9, 2020
ec050ee
add relay hub for reader
GMHDBJD Nov 9, 2020
b8b9c7c
temp commit
GMHDBJD Nov 11, 2020
0dec4d9
fix nil previous gtid event
GMHDBJD Nov 11, 2020
78fa598
fix nil panic for relay_test
GMHDBJD Nov 12, 2020
34d85df
add ut from relay gtid
GMHDBJD Nov 12, 2020
d788eef
change source config for test
GMHDBJD Nov 12, 2020
3923800
change error order
GMHDBJD Nov 12, 2020
0f5433b
revert full-mode config
GMHDBJD Nov 13, 2020
52bfe73
Merge remote-tracking branch 'upstream/master' into relayV2
GMHDBJD Nov 13, 2020
83457ca
fix ut
GMHDBJD Nov 13, 2020
f0e7e8c
address comment
GMHDBJD Nov 16, 2020
664b70f
fix sync gtid from oldest file
GMHDBJD Nov 16, 2020
57c83e1
Update pkg/streamer/reader_test.go
GMHDBJD Nov 16, 2020
e4b6b77
remove oldest check
GMHDBJD Nov 17, 2020
cf32bc6
address comment
GMHDBJD Nov 17, 2020
241e278
support source switch between worker in relay
GMHDBJD Nov 19, 2020
5c6ea46
use random server-id for relay reader
GMHDBJD Nov 19, 2020
3bff78b
use relay log for ha_cases
GMHDBJD Nov 19, 2020
d8a77a6
Merge branch 'master' into relayHA
GMHDBJD Nov 20, 2020
aae79f9
Merge branch 'master' into relayV2
GMHDBJD Nov 20, 2020
aa6cf8c
Merge remote-tracking branch 'upstream/master' into relayV2
GMHDBJD Nov 23, 2020
79cb20e
auto resume for relay
GMHDBJD Nov 23, 2020
c07ee7d
fix some bugs
GMHDBJD Nov 23, 2020
632cbf0
enable relay in integration test
GMHDBJD Nov 23, 2020
edeb3d8
Merge branch 'relayV2' into relayHA
GMHDBJD Nov 23, 2020
3ead31a
address comment
GMHDBJD Nov 23, 2020
5a69d8c
Merge branch 'relayV2' into relayHA
GMHDBJD Nov 23, 2020
8b024b7
return error if cannot compare gtid
GMHDBJD Nov 23, 2020
056ced5
Merge branch 'relayV2' into relayHA
GMHDBJD Nov 23, 2020
bf75c71
use fileReader instead
GMHDBJD Nov 23, 2020
e65307b
clear memory relay meta only when init
GMHDBJD Nov 23, 2020
a516d55
Merge branch 'relayV2' into relayHA
GMHDBJD Nov 23, 2020
513821e
minor change
GMHDBJD Nov 23, 2020
47d8da2
Merge branch 'relayV2' into relayHA
GMHDBJD Nov 23, 2020
2df59e4
clear meta when reset
GMHDBJD Nov 23, 2020
de66815
return error if reach end of file in reader
GMHDBJD Nov 25, 2020
bfbc5e2
Merge branch 'relayV2' into relayHA
GMHDBJD Nov 25, 2020
cdb3d1f
enable purge relay cmd
GMHDBJD Nov 25, 2020
55b8761
change help_cnt
GMHDBJD Nov 25, 2020
f6149da
Merge branch 'master' into relayV2
GMHDBJD Nov 25, 2020
d3c4689
remove error comment
GMHDBJD Nov 25, 2020
32de02e
fix some tests
GMHDBJD Nov 25, 2020
4fd723f
fix compatibility test
GMHDBJD Nov 25, 2020
7c863e5
Merge branch 'master' into relayHA
GMHDBJD Nov 25, 2020
c5992f3
Merge branch 'master' into relayV2
csuzhangxc Nov 26, 2020
79071d8
Merge remote-tracking branch 'upstream/master' into relayV2
GMHDBJD Nov 26, 2020
65764f1
Merge branch 'master' into relayV2
GMHDBJD Nov 26, 2020
7d662d7
Merge branch 'relayV2' into relayHA
GMHDBJD Nov 26, 2020
e444889
add test for resetup meta
GMHDBJD Nov 26, 2020
16a7558
add more comment
GMHDBJD Nov 26, 2020
5e875ab
fix fmt
GMHDBJD Nov 26, 2020
e54f231
fix test
GMHDBJD Nov 27, 2020
c7a54d7
Merge branch 'master' into relayHA
GMHDBJD Nov 30, 2020
151e97c
relay test for chaos
GMHDBJD Nov 30, 2020
67b928f
minor change
GMHDBJD Dec 2, 2020
76f338d
fix default relay-dir
GMHDBJD Dec 2, 2020
92ef267
fix error log
GMHDBJD Dec 2, 2020
f14659f
fix unsupported table option
GMHDBJD Dec 2, 2020
6733785
better arg name
GMHDBJD Dec 7, 2020
00d539d
Merge remote-tracking branch 'upstream/master' into relayHA
GMHDBJD Dec 7, 2020
5506bee
Merge remote-tracking branch 'upstream/master' into relayHA
GMHDBJD Dec 9, 2020
b6c19f9
address comment
GMHDBJD Dec 9, 2020
76b7841
fix data race
GMHDBJD Dec 9, 2020
8f7cf97
fix worker not exit
GMHDBJD Dec 9, 2020
c1513a6
Merge branch 'master' into relayHA
GMHDBJD Dec 14, 2020
eb1c976
Merge branch 'master' into relayHA
csuzhangxc Dec 14, 2020
faf371a
address comment
GMHDBJD Dec 17, 2020
c45ba2b
unresume some errors
GMHDBJD Dec 17, 2020
ae4ad25
Merge branch 'master' into relayHA
GMHDBJD Dec 17, 2020
3543055
fix fmt
GMHDBJD Dec 17, 2020
203310f
Merge branch 'master' into relayHA
GMHDBJD Dec 17, 2020
b91fc5f
Merge branch 'master' into relayHA
GMHDBJD Dec 18, 2020
0a58715
Merge remote-tracking branch 'upstream/master' into relayHA
GMHDBJD Dec 18, 2020
10bcb7f
Merge branch 'master' into relayHA
GMHDBJD Dec 21, 2020
ef4d21f
revert clone change
GMHDBJD Dec 21, 2020
3f2a346
Merge branch 'master' into relayHA
GMHDBJD Dec 21, 2020
db85f8f
revert change of integration test
GMHDBJD Dec 21, 2020
41ef863
fix ci
GMHDBJD Dec 21, 2020
2739495
fix ci
GMHDBJD Dec 21, 2020
61fb37b
add comment
GMHDBJD Dec 21, 2020
3ceabf8
fix ci
GMHDBJD Dec 21, 2020
c6bdfd3
fix ci
GMHDBJD Dec 21, 2020
b6876f9
fix chaos
GMHDBJD Dec 22, 2020
f9c1b1a
increase diff count
GMHDBJD Dec 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions chaos/cases/conf/source1.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
source-id: "mysql-replica-01"
enable-gtid: false
enable-relay: false

from:
host: "mysql-0.mysql" # same namespace with MySQL
Expand Down
2 changes: 2 additions & 0 deletions chaos/cases/conf/source2.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
source-id: "mysql-replica-02"
enable-gtid: true
enable-relay: true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may update the GitHub actions workflow and these config files to enable/disable both for enable-gtid and enable-relay later.


from:
host: "mysql-1.mysql" # same namespace with MySQL
Expand Down
4 changes: 2 additions & 2 deletions chaos/cases/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func createSources(ctx context.Context, cli pb.MasterClient, cfg *config) error
return err
}

var cfg1 config2.SourceConfig
var cfg2 config2.SourceConfig
cfg1 := config2.NewSourceConfig()
cfg2 := config2.NewSourceConfig()
if err = cfg1.ParseYaml(string(s1Content)); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion chaos/cases/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
const (
tableCount = 10 // tables count in schema.
fullInsertCount = 100 // `INSERT INTO` count (not rows count) for each table in full stage.
diffCount = 20 // diff data check count
diffCount = 30 // diff data check count
diffInterval = 10 * time.Second // diff data check interval
incrRoundTime = 20 * time.Second // time to generate incremental data in one round
)
Expand Down
8 changes: 7 additions & 1 deletion dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
const (
// the default base(min) server id generated by random
defaultBaseServerID = math.MaxUint32 / 10
defaultRelayDir = "relay-dir"
)

var getAllServerIDFunc = utils.GetAllServerID
Expand Down Expand Up @@ -53,6 +54,8 @@ type SourceConfig struct {
// relay synchronous starting point (if specified)
RelayBinLogName string `yaml:"relay-binlog-name" toml:"relay-binlog-name" json:"relay-binlog-name"`
RelayBinlogGTID string `yaml:"relay-binlog-gtid" toml:"relay-binlog-gtid" json:"relay-binlog-gtid"`
// only use when worker bound source, do not marsh it
UUIDSuffix int `yaml:"-" toml:"-" json:"-"`

SourceID string `yaml:"source-id" toml:"source-id" json:"source-id"`
From DBConfig `yaml:"from" toml:"from" json:"from"`
Expand All @@ -73,7 +76,6 @@ type SourceConfig struct {
// NewSourceConfig creates a new base config for upstream MySQL/MariaDB source.
func NewSourceConfig() *SourceConfig {
c := &SourceConfig{
RelayDir: "relay-dir",
Purge: PurgeConfig{
Interval: 60 * 60,
Expires: 0,
Expand Down Expand Up @@ -243,6 +245,10 @@ func (c *SourceConfig) Adjust(ctx context.Context, db *sql.DB) (err error) {
}
}

if c.EnableRelay && len(c.RelayDir) == 0 {
c.RelayDir = defaultRelayDir
}

return nil
}

Expand Down
6 changes: 3 additions & 3 deletions dm/ctl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ func NewRootCmd() *cobra.Command {
master.NewQueryStatusCmd(),
master.NewShowDDLLocksCmd(),
master.NewUnlockDDLLockCmd(),
// master.NewPauseRelayCmd(),
// master.NewResumeRelayCmd(),
// master.NewPurgeRelayCmd(),
master.NewPauseRelayCmd(),
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
master.NewResumeRelayCmd(),
master.NewPurgeRelayCmd(),
master.NewOperateSourceCmd(),
master.NewOfflineMemberCmd(),
master.NewOperateLeaderCmd(),
Expand Down
8 changes: 8 additions & 0 deletions dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ func (d *DummyRelay) SaveMeta(pos mysql.Position, gset gtid.Set) error {
return nil
}

// ResetMeta implements Process interface
func (d *DummyRelay) ResetMeta() {}

// PurgeRelayDir implements Process interface
func (d *DummyRelay) PurgeRelayDir() error {
return nil
}

func (t *testRelay) TestRelay(c *C) {
originNewRelay := relay.NewRelay
relay.NewRelay = NewDummyRelay
Expand Down
45 changes: 26 additions & 19 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var (
keepaliveTime = 3 * time.Second
retryConnectSleepTime = time.Second
syncMasterEndpointsTime = 3 * time.Second
getMinPosForSubTaskFunc = getMinPosForSubTask
getMinLocForSubTaskFunc = getMinLocForSubTask
)

// Server accepts RPC requests
Expand Down Expand Up @@ -546,16 +546,24 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {
if cfg.EnableRelay {
dctx, dcancel := context.WithTimeout(s.etcdClient.Ctx(), time.Duration(len(subTaskCfgs))*3*time.Second)
defer dcancel()
minPos, err1 := getMinPosInAllSubTasks(dctx, subTaskCfgs)
minLoc, err1 := getMinLocInAllSubTasks(dctx, subTaskCfgs)
if err1 != nil {
return err1
}

// TODO: support GTID
// don't contain GTID information in checkpoint table, just set it to empty
if minPos != nil {
cfg.RelayBinLogName = binlog.AdjustPosition(*minPos).Name
cfg.RelayBinlogGTID = ""
if minLoc != nil {
log.L().Info("get min location in all subtasks", zap.Stringer("location", *minLoc))
cfg.RelayBinLogName = binlog.AdjustPosition(minLoc.Position).Name
cfg.RelayBinlogGTID = minLoc.GTIDSetStr()
// set UUIDSuffix when bound to a source
cfg.UUIDSuffix, err = binlog.ExtractSuffix(minLoc.Position.Name)
if err != nil {
return err
}
} else {
// set UUIDSuffix even not checkpoint exist
// so we will still remove relay dir
cfg.UUIDSuffix = binlog.MinUUIDSuffix
Copy link
Collaborator Author

@GMHDBJD GMHDBJD Dec 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add in 41ef863 and 61fb37b, PTAL

}
}

Expand Down Expand Up @@ -634,32 +642,31 @@ func makeCommonWorkerResponse(reqErr error) *pb.CommonWorkerResponse {
}

// all subTask in subTaskCfgs should have same source
// this function return the min position in all subtasks, used for relay's position
// TODO: get min gtidSet
func getMinPosInAllSubTasks(ctx context.Context, subTaskCfgs []*config.SubTaskConfig) (minPos *mysql.Position, err error) {
// this function return the min location in all subtasks, used for relay's location
func getMinLocInAllSubTasks(ctx context.Context, subTaskCfgs []*config.SubTaskConfig) (minLoc *binlog.Location, err error) {
for _, subTaskCfg := range subTaskCfgs {
pos, err := getMinPosForSubTaskFunc(ctx, subTaskCfg)
loc, err := getMinLocForSubTaskFunc(ctx, subTaskCfg)
if err != nil {
return nil, err
}

if pos == nil {
if loc == nil {
continue
}

if minPos == nil {
minPos = pos
if minLoc == nil {
minLoc = loc
} else {
if minPos.Compare(*pos) >= 1 {
minPos = pos
if binlog.CompareLocation(*minLoc, *loc, subTaskCfg.EnableGTID) >= 1 {
minLoc = loc
}
}
}

return minPos, nil
return minLoc, nil
}

func getMinPosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minPos *mysql.Position, err error) {
func getMinLocForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minLoc *binlog.Location, err error) {
if subTaskCfg.Mode == config.ModeFull {
return nil, nil
}
Expand All @@ -682,7 +689,7 @@ func getMinPosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig)
}

location := checkpoint.GlobalPoint()
return &location.Position, nil
return &location, nil
}

// unifyMasterBinlogPos eliminates different masterBinlog in one response
Expand Down
61 changes: 44 additions & 17 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/ha"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
Expand Down Expand Up @@ -60,11 +62,11 @@ func (t *testServer) SetUpSuite(c *C) {
err := log.InitLogger(&log.Config{})
c.Assert(err, IsNil)

getMinPosForSubTaskFunc = getFakePosForSubTask
getMinLocForSubTaskFunc = getFakeLocForSubTask
}

func (t *testServer) TearDownSuite(c *C) {
getMinPosForSubTaskFunc = getMinPosForSubTask
getMinLocForSubTaskFunc = getMinLocForSubTask
}

func createMockETCD(dir string, host string) (*embed.Etcd, error) {
Expand Down Expand Up @@ -404,7 +406,7 @@ func (t *testServer) testStopWorkerWhenLostConnect(c *C, s *Server, ETCD *embed.
c.Assert(s.getWorker(true), IsNil)
}

func (t *testServer) TestGetMinPosInAllSubTasks(c *C) {
func (t *testServer) TestGetMinLocInAllSubTasks(c *C) {
subTaskCfg := []*config.SubTaskConfig{
{
Name: "test2",
Expand All @@ -414,10 +416,19 @@ func (t *testServer) TestGetMinPosInAllSubTasks(c *C) {
Name: "test1",
},
}
minPos, err := getMinPosInAllSubTasks(context.Background(), subTaskCfg)
minLoc, err := getMinLocInAllSubTasks(context.Background(), subTaskCfg)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(err, IsNil)
c.Assert(minPos.Name, Equals, "mysql-binlog.00001")
c.Assert(minPos.Pos, Equals, uint32(12))
c.Assert(minLoc.Position.Name, Equals, "mysql-binlog.00001")
c.Assert(minLoc.Position.Pos, Equals, uint32(12))

for _, subtask := range subTaskCfg {
subtask.EnableGTID = true
}

minLoc, err = getMinLocInAllSubTasks(context.Background(), subTaskCfg)
c.Assert(err, IsNil)
c.Assert(minLoc.Position.Name, Equals, "mysql-binlog.00001")
c.Assert(minLoc.Position.Pos, Equals, uint32(123))
}

func (t *testServer) TestUnifyMasterBinlogPos(c *C) {
Expand Down Expand Up @@ -530,22 +541,38 @@ func (t *testServer) TestUnifyMasterBinlogPos(c *C) {
c.Assert(relay.RelayCatchUpMaster, IsTrue)
}

func getFakePosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minPos *mysql.Position, err error) {
switch subTaskCfg.Name {
case "test1":
return &mysql.Position{
func getFakeLocForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minLoc *binlog.Location, err error) {
gset1, _ := gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110001:1-30")
gset2, _ := gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110001:1-50")
gset3, _ := gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110001:1-50,ba8f633f-1f15-11eb-b1c7-0242ac110002:1")
loc1 := binlog.InitLocation(
mysql.Position{
Name: "mysql-binlog.00001",
Pos: 123,
}, nil
case "test2":
return &mysql.Position{
},
gset1,
)
loc2 := binlog.InitLocation(
mysql.Position{
Name: "mysql-binlog.00001",
Pos: 12,
}, nil
case "test3":
return &mysql.Position{
},
gset2,
)
loc3 := binlog.InitLocation(
mysql.Position{
Name: "mysql-binlog.00003",
}, nil
},
gset3,
)

switch subTaskCfg.Name {
case "test1":
return &loc1, nil
case "test2":
return &loc2, nil
case "test3":
return &loc3, nil
default:
return nil, nil
}
Expand Down
80 changes: 79 additions & 1 deletion dm/worker/task_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/retry"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

//// Backoff related constants
Expand Down Expand Up @@ -116,6 +117,11 @@ type backoffController struct {

// task name -> the latest auto resume time
latestResumeTime map[string]time.Time

latestRelayPausedTime time.Time
latestRelayBlockTime time.Time
latestRelayResumeTime time.Time
relayBackoff *backoff.Backoff
}

// newBackoffController returns a new backoffController instance
Expand Down Expand Up @@ -273,7 +279,72 @@ func (tsc *realTaskStatusChecker) getResumeStrategy(stStatus *pb.SubTaskStatus,
return ResumeDispatch
}

func (tsc *realTaskStatusChecker) check() {
func (tsc *realTaskStatusChecker) getRelayResumeStrategy(relayStatus *pb.RelayStatus, duration time.Duration) ResumeStrategy {
// relay that is not paused or paused manually, just ignore it
if relayStatus == nil || relayStatus.Stage != pb.Stage_Paused || relayStatus.Result == nil || relayStatus.Result.IsCanceled {
return ResumeIgnore
}

for _, err := range relayStatus.Result.Errors {
if _, ok := retry.UnresumableRelayErrCodes[err.ErrCode]; ok {
return ResumeNoSense
}
}

if time.Since(tsc.bc.latestRelayResumeTime) < duration {
return ResumeSkip
}

return ResumeDispatch
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
}

func (tsc *realTaskStatusChecker) checkRelayStatus() {
ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultDBTimeout)
defer cancel()

relayStatus := tsc.w.relayHolder.Status(ctx)
if tsc.bc.relayBackoff == nil {
tsc.bc.relayBackoff, _ = backoff.NewBackoff(tsc.cfg.BackoffFactor, tsc.cfg.BackoffJitter, tsc.cfg.BackoffMin.Duration, tsc.cfg.BackoffMax.Duration)
tsc.bc.latestRelayPausedTime = time.Now()
tsc.bc.latestRelayResumeTime = time.Now()
}
rbf := tsc.bc.relayBackoff
duration := rbf.Current()
strategy := tsc.getRelayResumeStrategy(relayStatus, duration)
switch strategy {
case ResumeIgnore:
if time.Since(tsc.bc.latestRelayPausedTime) > tsc.cfg.BackoffRollback.Duration {
rbf.Rollback()
// after each rollback, reset this timer
tsc.bc.latestRelayPausedTime = time.Now()
}
case ResumeNoSense:
// this strategy doesn't forward or rollback backoff
tsc.bc.latestRelayPausedTime = time.Now()
blockTime := tsc.bc.latestRelayBlockTime
if !blockTime.IsZero() {
tsc.l.Warn("relay can't auto resume", zap.Duration("paused duration", time.Since(blockTime)))
} else {
tsc.bc.latestRelayBlockTime = time.Now()
tsc.l.Warn("relay can't auto resume")
}
case ResumeSkip:
tsc.l.Warn("backoff skip auto resume relay", zap.Time("latestResumeTime", tsc.bc.latestRelayResumeTime), zap.Duration("duration", duration))
tsc.bc.latestRelayPausedTime = time.Now()
case ResumeDispatch:
tsc.bc.latestRelayPausedTime = time.Now()
err := tsc.w.operateRelay(tsc.ctx, pb.RelayOp_ResumeRelay)
if err != nil {
tsc.l.Error("dispatch auto resume relay failed", zap.Error(err))
} else {
tsc.l.Info("dispatch auto resume relay")
tsc.bc.latestRelayResumeTime = time.Now()
rbf.BoundaryForward()
}
}
}

func (tsc *realTaskStatusChecker) checkTaskStatus() {
allSubTaskStatus := tsc.w.getAllSubTaskStatus()

defer func() {
Expand Down Expand Up @@ -333,3 +404,10 @@ func (tsc *realTaskStatusChecker) check() {
}
}
}

func (tsc *realTaskStatusChecker) check() {
if tsc.w.cfg.EnableRelay {
tsc.checkRelayStatus()
}
tsc.checkTaskStatus()
}
Loading