diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index 2363e206282..b989ad9adf9 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -108,8 +108,8 @@ func (n *sorterNode) start(ctx pipeline.NodeContext, isTableActorMode bool, eg * if config.GetGlobalServerConfig().Debug.EnableDBSorter { startTs := ctx.ChangefeedVars().Info.StartTs - actorID := ctx.GlobalVars().SorterSystem.ActorID(uint64(n.tableID)) - router := ctx.GlobalVars().SorterSystem.Router() + actorID := ctx.GlobalVars().SorterSystem.DBActorID(uint64(n.tableID)) + router := ctx.GlobalVars().SorterSystem.DBRouter compactScheduler := ctx.GlobalVars().SorterSystem.CompactScheduler() levelSorter := leveldb.NewSorter( ctx, n.tableID, startTs, router, actorID, compactScheduler, diff --git a/cdc/sorter/leveldb/message/task.go b/cdc/sorter/leveldb/message/task.go index cf99fa7d321..dc5569b6c15 100644 --- a/cdc/sorter/leveldb/message/task.go +++ b/cdc/sorter/leveldb/message/task.go @@ -17,6 +17,7 @@ import ( "fmt" "github.com/pingcap/errors" + "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sorter/encoding" "github.com/pingcap/tiflow/pkg/db" "golang.org/x/sync/semaphore" @@ -27,11 +28,20 @@ type Task struct { UID uint32 TableID uint64 + // Input unsorted event for writers. + // Sorter.AddEntry -> writer. + InputEvent *model.PolymorphicEvent + // Latest resolved ts / commit ts for readers. + // writer -> reader + ReadTs ReadTs // A batch of events (bytes encoded) need to be wrote. + // writer -> leveldb WriteReq map[Key][]byte // Requests an iterator when it is not nil. + // reader -> leveldb IterReq *IterRequest // Deletes all of the key-values in the range. + // reader -> leveldb and leveldb -> compactor DeleteReq *DeleteRequest } @@ -42,6 +52,12 @@ type DeleteRequest struct { Count int } +// ReadTs wraps the latest resolved ts and commit ts. +type ReadTs struct { + MaxCommitTs uint64 + MaxResolvedTs uint64 +} + // IterRequest contains parameters that necessary to build an iterator. type IterRequest struct { UID uint32 diff --git a/cdc/sorter/leveldb/system/system.go b/cdc/sorter/leveldb/system/system.go index 739be94e44c..5c0de5cd754 100644 --- a/cdc/sorter/leveldb/system/system.go +++ b/cdc/sorter/leveldb/system/system.go @@ -49,6 +49,8 @@ type System struct { dbs []db.DB dbSystem *actor.System DBRouter *actor.Router + WriterSystem *actor.System + WriterRouter *actor.Router compactSystem *actor.System compactRouter *actor.Router compactSched *lsorter.CompactScheduler @@ -64,14 +66,22 @@ type System struct { // NewSystem returns a system. func NewSystem(dir string, memPercentage float64, cfg *config.DBConfig) *System { + // A system polles actors that read and write leveldb. dbSystem, dbRouter := actor.NewSystemBuilder("sorter-db"). WorkerNumber(cfg.Count).Build() + // A system polles actors that compact leveldb, garbage collection. compactSystem, compactRouter := actor.NewSystemBuilder("sorter-compactor"). WorkerNumber(cfg.Count).Build() + // A system polles actors that receive events from Puller and batch send + // writes to leveldb. + writerSystem, writerRouter := actor.NewSystemBuilder("sorter-writer"). + WorkerNumber(cfg.Count).Throughput(4, 64).Build() compactSched := lsorter.NewCompactScheduler(compactRouter) return &System{ dbSystem: dbSystem, DBRouter: dbRouter, + WriterSystem: writerSystem, + WriterRouter: writerRouter, compactSystem: compactSystem, compactRouter: compactRouter, compactSched: compactSched, @@ -85,8 +95,8 @@ func NewSystem(dir string, memPercentage float64, cfg *config.DBConfig) *System } } -// ActorID returns an ActorID correspond with tableID. -func (s *System) ActorID(tableID uint64) actor.ID { +// DBActorID returns an DBActorID correspond with tableID. +func (s *System) DBActorID(tableID uint64) actor.ID { h := fnv.New64() b := [8]byte{} binary.LittleEndian.PutUint64(b[:], tableID) @@ -94,11 +104,6 @@ func (s *System) ActorID(tableID uint64) actor.ID { return actor.ID(h.Sum64() % uint64(s.cfg.Count)) } -// Router returns db actors router. -func (s *System) Router() *actor.Router { - return s.DBRouter -} - // CompactScheduler returns compaction scheduler. func (s *System) CompactScheduler() *lsorter.CompactScheduler { return s.compactSched @@ -131,6 +136,7 @@ func (s *System) Start(ctx context.Context) error { s.compactSystem.Start(ctx) s.dbSystem.Start(ctx) + s.WriterSystem.Start(ctx) captureAddr := config.GetGlobalServerConfig().AdvertiseAddr totalMemory, err := memory.MemTotal() if err != nil { @@ -205,6 +211,7 @@ func (s *System) Stop() error { defer cancel() // Close actors s.broadcast(ctx, s.DBRouter, message.StopMessage()) + s.broadcast(ctx, s.WriterRouter, message.StopMessage()) s.broadcast(ctx, s.compactRouter, message.StopMessage()) // Close metrics goroutine. close(s.closedCh) @@ -216,6 +223,10 @@ func (s *System) Stop() error { if err != nil { return errors.Trace(err) } + err = s.WriterSystem.Stop() + if err != nil { + return errors.Trace(err) + } err = s.compactSystem.Stop() if err != nil { return errors.Trace(err) diff --git a/cdc/sorter/leveldb/system/system_test.go b/cdc/sorter/leveldb/system/system_test.go index a5c53e2a0a1..f857fd9e939 100644 --- a/cdc/sorter/leveldb/system/system_test.go +++ b/cdc/sorter/leveldb/system/system_test.go @@ -66,8 +66,8 @@ func TestDBActorID(t *testing.T) { sys := NewSystem(t.TempDir(), 1, cfg) require.Nil(t, sys.Start(ctx)) - id1 := sys.ActorID(1) - id2 := sys.ActorID(1) + id1 := sys.DBActorID(1) + id2 := sys.DBActorID(1) // tableID to actor ID must be deterministic. require.Equal(t, id1, id2) require.Nil(t, sys.Stop()) diff --git a/cdc/sorter/leveldb/table_sorter.go b/cdc/sorter/leveldb/table_sorter.go index 4aae9271fd5..eadebfca53d 100644 --- a/cdc/sorter/leveldb/table_sorter.go +++ b/cdc/sorter/leveldb/table_sorter.go @@ -47,14 +47,34 @@ func allocID() uint32 { return atomic.AddUint32(&levelDBSorterIDAlloc, 1) } -// Sorter accepts out-of-order raw kv entries and output sorted entries -type Sorter struct { - actorID actor.ID - router *actor.Router - compact *CompactScheduler +type common struct { + dbActorID actor.ID + dbRouter *actor.Router + uid uint32 tableID uint64 serde *encoding.MsgPackGenSerde + errCh chan error +} + +// reportError notifies Sorter to return an error and close. +func (c *common) reportError(msg string, err error) { + if errors.Cause(err) != context.Canceled { + log.L().WithOptions(zap.AddCallerSkip(1)). + Warn(msg, zap.Uint64("tableID", c.tableID), zap.Error(err)) + } + select { + case c.errCh <- err: + default: + // It means there is an error already. + } +} + +// Sorter accepts out-of-order raw kv entries and output sorted entries +type Sorter struct { + common + + compact *CompactScheduler iterMaxAliveDuration time.Duration iterFirstSlowDuration time.Duration @@ -85,13 +105,16 @@ func NewSorter( metricIterDuration := sorterIterReadDurationHistogram.MustCurryWith( prometheus.Labels{"capture": captureAddr, "id": changefeedID}) return &Sorter{ - actorID: actorID, - router: router, + common: common{ + dbActorID: actorID, + dbRouter: router, + uid: allocID(), + tableID: uint64(tableID), + serde: &encoding.MsgPackGenSerde{}, + errCh: make(chan error, 1), + }, compact: compact, - uid: allocID(), - tableID: uint64(tableID), lastSentResolvedTs: startTs, - serde: &encoding.MsgPackGenSerde{}, iterMaxAliveDuration: time.Duration(cfg.IteratorMaxAliveDuration) * time.Millisecond, iterFirstSlowDuration: time.Duration(cfg.IteratorSlowReadDuration) * time.Millisecond, @@ -111,6 +134,8 @@ func (ls *Sorter) waitInput(ctx context.Context) (*model.PolymorphicEvent, error select { case <-ctx.Done(): return nil, errors.Trace(ctx.Err()) + case err := <-ls.errCh: + return nil, errors.Trace(err) case ev := <-ls.inputCh: return ev, nil } @@ -129,6 +154,8 @@ func (ls *Sorter) waitInputOutput( select { case <-ctx.Done(): return nil, errors.Trace(ctx.Err()) + case err := <-ls.errCh: + return nil, errors.Trace(err) case ev := <-ls.inputCh: return ev, nil case ls.outputCh <- dummyEvent: @@ -187,7 +214,7 @@ func (ls *Sorter) wait( if err != nil { atomic.StoreInt32(&ls.closed, 1) close(ls.outputCh) - return 0, 0, 0, errors.Trace(ctx.Err()) + return 0, 0, 0, errors.Trace(err) } if ev == nil { // No input event and output is available. @@ -200,7 +227,7 @@ func (ls *Sorter) wait( if err != nil { atomic.StoreInt32(&ls.closed, 1) close(ls.outputCh) - return 0, 0, 0, errors.Trace(ctx.Err()) + return 0, 0, 0, errors.Trace(err) } appendInputEvent(ev) } @@ -629,13 +656,13 @@ func (ls *Sorter) poll(ctx context.Context, state *pollState) error { return errors.Trace(err) } // Send write task to leveldb. - return ls.router.SendB(ctx, ls.actorID, actormsg.SorterMessage(task)) + return ls.dbRouter.SendB(ctx, ls.dbActorID, actormsg.SorterMessage(task)) } var hasIter bool task.IterReq, hasIter = state.tryGetIterator(ls.uid, ls.tableID) // Send write/read task to leveldb. - err = ls.router.SendB(ctx, ls.actorID, actormsg.SorterMessage(task)) + err = ls.dbRouter.SendB(ctx, ls.dbActorID, actormsg.SorterMessage(task)) if err != nil { // Skip read iterator if send fails. return errors.Trace(err) @@ -668,7 +695,7 @@ func (ls *Sorter) Run(ctx context.Context) error { maxResolvedTs: uint64(0), exhaustedResolvedTs: uint64(0), - actorID: ls.actorID, + actorID: ls.dbActorID, compact: ls.compact, iterFirstSlowDuration: ls.iterFirstSlowDuration, iterMaxAliveDuration: ls.iterMaxAliveDuration, @@ -729,6 +756,6 @@ func (ls *Sorter) CleanupFunc() func(context.Context) error { encoding.EncodeTsKey(ls.uid, ls.tableID+1, 0), }, } - return ls.router.SendB(ctx, ls.actorID, actormsg.SorterMessage(task)) + return ls.dbRouter.SendB(ctx, ls.dbActorID, actormsg.SorterMessage(task)) } } diff --git a/cdc/sorter/leveldb/table_sorter_test.go b/cdc/sorter/leveldb/table_sorter_test.go index 420ed58a303..4e2d1a5845c 100644 --- a/cdc/sorter/leveldb/table_sorter_test.go +++ b/cdc/sorter/leveldb/table_sorter_test.go @@ -26,6 +26,7 @@ import ( actormsg "github.com/pingcap/tiflow/pkg/actor/message" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/db" + "github.com/pingcap/tiflow/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "golang.org/x/sync/semaphore" @@ -44,6 +45,26 @@ func newTestSorter( return ls, mb } +func TestRunAndReportError(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s, _ := newTestSorter(ctx, 2) + go func() { + time.Sleep(100 * time.Millisecond) + s.common.reportError( + "test", errors.ErrLevelDBSorterError.GenWithStackByArgs()) + }() + require.Error(t, s.Run(ctx)) + + // Must be nonblock. + s.common.reportError( + "test", errors.ErrLevelDBSorterError.GenWithStackByArgs()) + s.common.reportError( + "test", errors.ErrLevelDBSorterError.GenWithStackByArgs()) +} + func TestInputOutOfOrder(t *testing.T) { t.Parallel() @@ -471,15 +492,6 @@ func TestOutputBufferedResolvedEvents(t *testing.T) { } } -func newTestEvent(crts, startTs uint64, key int) *model.PolymorphicEvent { - return model.NewPolymorphicEvent(&model.RawKVEntry{ - OpType: model.OpTypePut, - Key: []byte{byte(key)}, - StartTs: startTs, - CRTs: crts, - }) -} - func prepareTxnData( t *testing.T, ls *Sorter, txnCount, txnSize int, ) db.DB { @@ -501,22 +513,6 @@ func prepareTxnData( return db } -func receiveOutputEvents( - outputCh chan *model.PolymorphicEvent, -) []*model.PolymorphicEvent { - outputEvents := []*model.PolymorphicEvent{} -RECV: - for { - select { - case ev := <-outputCh: - outputEvents = append(outputEvents, ev) - default: - break RECV - } - } - return outputEvents -} - func TestOutputIterEvents(t *testing.T) { t.Parallel() diff --git a/cdc/sorter/leveldb/writer.go b/cdc/sorter/leveldb/writer.go new file mode 100644 index 00000000000..7b8f6fcedea --- /dev/null +++ b/cdc/sorter/leveldb/writer.go @@ -0,0 +1,110 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package leveldb + +import ( + "context" + + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sorter/encoding" + "github.com/pingcap/tiflow/cdc/sorter/leveldb/message" + "github.com/pingcap/tiflow/pkg/actor" + actormsg "github.com/pingcap/tiflow/pkg/actor/message" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +// writer is a thin shim that batches, translates events into key-vaule pairs +// and writes to leveldb. +type writer struct { + common + + readerRouter *actor.Router + readerActorID actor.ID + + metricTotalEventsKV prometheus.Counter + metricTotalEventsResolvedTs prometheus.Counter +} + +var _ actor.Actor = (*writer)(nil) + +func (w *writer) Poll(ctx context.Context, msgs []actormsg.Message) (running bool) { + maxCommitTs, maxResolvedTs := uint64(0), uint64(0) + kvEventCount, resolvedEventCount := 0, 0 + writes := make(map[message.Key][]byte) + for i := range msgs { + switch msgs[i].Tp { + case actormsg.TypeSorterTask: + case actormsg.TypeStop: + return false + default: + log.Panic("unexpected message", zap.Any("message", msgs[i])) + } + + ev := msgs[i].SorterTask.InputEvent + if ev.RawKV.OpType == model.OpTypeResolved { + if maxResolvedTs < ev.CRTs { + maxResolvedTs = ev.CRTs + } + resolvedEventCount++ + continue + } + if maxCommitTs < ev.CRTs { + maxCommitTs = ev.CRTs + } + kvEventCount++ + + key := encoding.EncodeKey(w.uid, w.tableID, ev) + value := []byte{} + var err error + value, err = w.serde.Marshal(ev, value) + if err != nil { + log.Panic("failed to marshal events", zap.Error(err)) + } + writes[message.Key(key)] = value + } + w.metricTotalEventsKV.Add(float64(kvEventCount)) + w.metricTotalEventsResolvedTs.Add(float64(resolvedEventCount)) + + if len(writes) != 0 { + // Send write task to leveldb. + task := message.Task{UID: w.uid, TableID: w.tableID, WriteReq: writes} + err := w.dbRouter.SendB(ctx, w.dbActorID, actormsg.SorterMessage(task)) + if err != nil { + w.reportError("failed to send write request", err) + return false + } + } + + // Notify reader that there is something to read. + // + // It's ok to noify reader immediately without waiting writes done, + // because reader will see these writes: + // 1. reader/writer send tasks to the same leveldb, so tasks are ordered. + // 2. ReadTs will trigger reader to take iterator from leveldb, + // it happens after writer send writes to leveldb. + // 3. Before leveldb takes iterator, it flushes all buffered writes. + msg := actormsg.SorterMessage(message.Task{ + UID: w.uid, + TableID: w.tableID, + ReadTs: message.ReadTs{ + MaxCommitTs: maxCommitTs, + MaxResolvedTs: maxResolvedTs, + }, + }) + // It's ok if send fails, as resolved ts events are received periodically. + _ = w.readerRouter.Send(w.readerActorID, msg) + return true +} diff --git a/cdc/sorter/leveldb/writer_test.go b/cdc/sorter/leveldb/writer_test.go new file mode 100644 index 00000000000..21c84321f8d --- /dev/null +++ b/cdc/sorter/leveldb/writer_test.go @@ -0,0 +1,166 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package leveldb + +import ( + "context" + "testing" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sorter" + "github.com/pingcap/tiflow/cdc/sorter/encoding" + "github.com/pingcap/tiflow/cdc/sorter/leveldb/message" + "github.com/pingcap/tiflow/pkg/actor" + actormsg "github.com/pingcap/tiflow/pkg/actor/message" + "github.com/stretchr/testify/require" +) + +func newTestWriter( + c common, readerRouter *actor.Router, readerActorID actor.ID, +) *writer { + return &writer{ + common: c, + readerRouter: readerRouter, + readerActorID: readerActorID, + + metricTotalEventsKV: sorter.EventCount.WithLabelValues("test", "test", "kv"), + metricTotalEventsResolvedTs: sorter.EventCount.WithLabelValues("test", "test", "resolved"), + } +} + +func newTestEvent(crts, startTs uint64, key int) *model.PolymorphicEvent { + return model.NewPolymorphicEvent(&model.RawKVEntry{ + OpType: model.OpTypePut, + Key: []byte{byte(key)}, + StartTs: startTs, + CRTs: crts, + }) +} + +func receiveOutputEvents( + outputCh chan *model.PolymorphicEvent, +) []*model.PolymorphicEvent { + outputEvents := []*model.PolymorphicEvent{} +RECV: + for { + select { + case ev := <-outputCh: + outputEvents = append(outputEvents, ev) + default: + break RECV + } + } + return outputEvents +} + +func TestWriterPoll(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + capacity := 4 + router := actor.NewRouter(t.Name()) + readerID := actor.ID(1) + readerMB := actor.NewMailbox(readerID, capacity) + router.InsertMailbox4Test(readerID, readerMB) + dbID := actor.ID(2) + dbMB := actor.NewMailbox(dbID, capacity) + router.InsertMailbox4Test(dbID, dbMB) + c := common{dbActorID: dbID, dbRouter: router} + writer := newTestWriter(c, router, readerID) + + // We need to poll twice to read resolved events, so we need a slice of + // two cases. + cases := []struct { + inputEvents []*model.PolymorphicEvent + + expectWrites [][]byte + expectMaxCommitTs uint64 + expectMaxResolvedTs uint64 + }{{ + // Only resoved ts events. + inputEvents: []*model.PolymorphicEvent{ + model.NewResolvedPolymorphicEvent(0, 1), + model.NewResolvedPolymorphicEvent(0, 2), + }, + + expectWrites: [][]byte{}, + expectMaxCommitTs: 0, + expectMaxResolvedTs: 2, + }, { + // Only rawkv events. + inputEvents: []*model.PolymorphicEvent{ + newTestEvent(3, 1, 0), // crts 3, startts 2, key 0 + newTestEvent(3, 1, 1), // crts 3, startts 2, key 1 + }, + + expectWrites: [][]byte{ + encoding.EncodeKey(c.uid, c.tableID, newTestEvent(3, 1, 0)), + encoding.EncodeKey(c.uid, c.tableID, newTestEvent(3, 1, 1)), + }, + expectMaxCommitTs: 3, + expectMaxResolvedTs: 0, + }, { + // Mix rawkv events and resolved ts events. + inputEvents: []*model.PolymorphicEvent{ + newTestEvent(4, 2, 0), // crts 4, startts 2 + model.NewResolvedPolymorphicEvent(0, 4), + newTestEvent(5, 3, 0), // crts 5, startts 3 + model.NewResolvedPolymorphicEvent(0, 6), + }, + + expectWrites: [][]byte{ + encoding.EncodeKey(c.uid, c.tableID, newTestEvent(4, 2, 0)), + encoding.EncodeKey(c.uid, c.tableID, newTestEvent(5, 3, 0)), + }, + expectMaxCommitTs: 5, + expectMaxResolvedTs: 6, + }} + + for i, cs := range cases { + msgs := make([]actormsg.Message, 0, len(cs.inputEvents)) + for i := range cs.inputEvents { + msgs = append(msgs, actormsg.SorterMessage(message.Task{ + InputEvent: cs.inputEvents[i], + })) + } + t.Logf("test case #%d, %v", i, cs) + require.True(t, writer.Poll(ctx, msgs), "case #%d, %v", i, cs) + if len(cs.expectWrites) != 0 { + msg, ok := dbMB.Receive() + require.True(t, ok, "case #%d, %v", i, cs) + writeReq := msg.SorterTask.WriteReq + require.EqualValues(t, len(cs.expectWrites), len(writeReq)) + for _, k := range cs.expectWrites { + _, ok := writeReq[message.Key(k)] + require.True(t, ok, "case #%d, %v, %v, %v", i, cs, writeReq) + } + } else { + _, ok := dbMB.Receive() + require.False(t, ok, "case #%d, %v", i, cs) + } + msg, ok := readerMB.Receive() + require.True(t, ok, "case #%d, %v", i, cs) + require.EqualValues(t, + cs.expectMaxCommitTs, msg.SorterTask.ReadTs.MaxCommitTs, + "case #%d, %v", i, cs) + require.EqualValues(t, + cs.expectMaxResolvedTs, msg.SorterTask.ReadTs.MaxResolvedTs, + "case #%d, %v", i, cs) + } + + // writer should stop once it receives Stop message. + require.False(t, writer.Poll(ctx, []actormsg.Message{actormsg.StopMessage()})) +} diff --git a/tests/integration_tests/_utils/start_tidb_cluster_impl b/tests/integration_tests/_utils/start_tidb_cluster_impl index 6d121376b50..e127bfa290b 100755 --- a/tests/integration_tests/_utils/start_tidb_cluster_impl +++ b/tests/integration_tests/_utils/start_tidb_cluster_impl @@ -125,7 +125,7 @@ for idx in $(seq 1 $pd_count); do done i=0 - while [ -z "$(curl http://${!host}:${!port}/pd/health 2>/dev/null | grep 'health' | grep 'true')" ]; do + while [ -z "$(curl http://${!host}:${!port}/pd/api/v1/health 2>/dev/null | grep 'health' | grep 'true')" ]; do i=$((i + 1)) if [ "$i" -gt 60 ]; then echo 'Failed to start upstream PD' @@ -148,7 +148,7 @@ while ! curl -o /dev/null -sf http://${DOWN_PD_HOST}:${DOWN_PD_PORT}/pd/api/v1/v done i=0 -while [ -z "$(curl http://${DOWN_PD_HOST}:${DOWN_PD_PORT}/pd/health 2>/dev/null | grep 'health' | grep 'true')" ]; do +while [ -z "$(curl http://${DOWN_PD_HOST}:${DOWN_PD_PORT}/pd/api/v1/health 2>/dev/null | grep 'health' | grep 'true')" ]; do i=$((i + 1)) if [ "$i" -gt 60 ]; then echo 'Failed to start downstream PD' diff --git a/tests/integration_tests/_utils/start_tls_tidb_cluster_impl b/tests/integration_tests/_utils/start_tls_tidb_cluster_impl index da31694e5a8..c64d81f1bcb 100755 --- a/tests/integration_tests/_utils/start_tls_tidb_cluster_impl +++ b/tests/integration_tests/_utils/start_tls_tidb_cluster_impl @@ -64,7 +64,7 @@ done while [ -z "$(curl --cacert $TLS_DIR/ca.pem \ --cert $TLS_DIR/client.pem \ --key $TLS_DIR/client-key.pem \ - https://${TLS_PD_HOST}:${TLS_PD_PORT}/pd/health 2>/dev/null | grep 'health' | grep 'true')" ]; do + https://${TLS_PD_HOST}:${TLS_PD_PORT}/pd/api/v1/health 2>/dev/null | grep 'health' | grep 'true')" ]; do sleep 1 done