Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdutil, leadership: make more high availability #6577

Merged
merged 15 commits into from
Jul 20, 2023
92 changes: 75 additions & 17 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand All @@ -30,6 +31,11 @@ import (
"go.uber.org/zap"
)

const (
watchLoopUnhealthyTimeout = 60 * time.Second
detectHealthyInterval = 10 * time.Second
)

// GetLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func GetLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) {
leader := &pdpb.Member{}
Expand Down Expand Up @@ -182,26 +188,81 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
if ls == nil {
return
}

interval := detectHealthyInterval
unhealthyTimeout := watchLoopUnhealthyTimeout
failpoint.Inject("fastTick", func() {
unhealthyTimeout = 5 * time.Second
interval = 1 * time.Second
})
ticker := time.NewTicker(interval)
defer ticker.Stop()
lastHealthyTime := time.Now()

watcher := clientv3.NewWatcher(ls.client)
defer watcher.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding log when we exits the get leader loop?

Suggested change
defer watcher.Close()
defer watcher.Close()
defer log.Info("exit ...")

ctx, cancel := context.WithCancel(serverCtx)
defer cancel()
// The revision is the revision of last modification on this key.
// If the revision is compacted, will meet required revision has been compacted error.
// In this case, use the compact revision to re-watch the key.
var watchChanCancel *context.CancelFunc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use a pointer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not use a pointer, govet will report the error.
18724658-5b12-4f52-978c-cc293d3359a1
81593c4f-0dfc-44e0-9c8f-c0d862a32135

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just skipping the lint if you are sure about it? It seems a known problem of go analysis.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think pointer is acceptable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with rleungx. Probably just use something like //nolint to avoid pointer.
But I have no strong preference, either way looks good to me.

defer func() {
if watchChanCancel != nil {
(*watchChanCancel)()
}
}()
for {
failpoint.Inject("delayWatcher", nil)
rch := watcher.Watch(ctx, ls.leaderKey, clientv3.WithRev(revision))
for wresp := range rch {
if watchChanCancel != nil {
(*watchChanCancel)()
}
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watchChanCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(serverCtx))
watchChanCancel = &cancel

// When etcd is not available, the watcher.Watch will block,
// so we check the etcd availability first.
if !etcdutil.IsHealthy(serverCtx, ls.client) {
if time.Since(lastHealthyTime) > unhealthyTimeout {
log.Error("the connect of leadership watcher is unhealthy",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
return
}
select {
case <-serverCtx.Done():
// server closed, return
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
return
case <-ticker.C:
// continue to check the etcd availability
continue
}
}

watchChan := watcher.Watch(watchChanCtx, ls.leaderKey, clientv3.WithRev(revision))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding #6554 "TSO primary election took 14 minutes after PD(API) Pods being deleted at the same time and PD(API) leader being re-elected", I have two questions:

  1. If watch can't exit when etcd isn't available is the main reason, it only explains the seconday, which watches the leader here, can't be re-elected as the new primary, and it can't explain why the old primary can't be re-elected as the new primary. What happened on the old primary? Did the old primary also get stuck in some watch loop?
  2. It seems to me what we're thinking is, during the long duration between PD(API) Pods being deleted at the same time and PD(API) leader being re-elected, the watch here won't exit, and even after PD (API) leader is back, the watch here somehow still can't catch the lease lost/expired event. Is my understanding correct? If yes, how long is the unsafe duration of PD(API) leader being offline (once this unsafe duration is exceeded, the watcher here still can't catch the lease lost/expired event)? And the setting value of "unhealthyTimeout" needs to be less than this unsafe duration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think the current PR can help to mitigate the situation even without perfect solution or answers to 1 and 2, because with this pr, it seems that the secondary can be elected as the new primary which addressed the question 1 above and the question 2 is a parameter tuning problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not sure because the watch loop can't exit that it takes fifteen minutes to re-elect, there could be some reasons, stuck on grpc retry, stuck on tcp retry, or watch stuck on etcd watch.

WatchChan:
select {
case <-serverCtx.Done():
// server closed, return
return
case <-ticker.C:
if !etcdutil.IsHealthy(serverCtx, ls.client) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid temporary failure and false alarm, we'd better to check etcd health status multiple times, if it's unhealthy for a defined period (e.g., check heath 5 times every 1 second and all returned unhealthy status), then we cancel the wach channel and re-watch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I have added it in

if !etcdutil.IsHealthy(serverCtx, ls.client) {

if time.Since(lastHealthyTime) > unhealthyTimeout {
log.Error("the connect of leadership watcher is unhealthy",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
return
}
goto WatchChan
}
case wresp := <-watchChan:
// meet compacted error, use the compact revision.
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
break
}
if wresp.Canceled {
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'continue' goes to the beginning of the for loop, correct? Do we need to reset lastHealthyTime here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to recreate watch chan with new revision

} else if wresp.Err() != nil { // wresp.Err() contains CompactRevision not equal to 0
log.Error("leadership watcher is canceled with",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
Expand All @@ -213,19 +274,16 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
for _, ev := range wresp.Events {
if ev.Type == mvccpb.DELETE {
log.Info("current leadership is deleted",
zap.Int64("revision", wresp.Header.Revision),
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
return
}
}
revision = wresp.Header.Revision + 1
}

select {
case <-ctx.Done():
// server closed, return
return
default:
}
lastHealthyTime = time.Now()
goto WatchChan // use goto to avoid to create a new watchChan
}
}

Expand Down
86 changes: 86 additions & 0 deletions pkg/election/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/testutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
)
Expand Down Expand Up @@ -118,3 +120,87 @@ func TestLeadership(t *testing.T) {
re.NoError(lease1.Close())
re.NoError(lease2.Close())
}

func TestExitWatch(t *testing.T) {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
re := require.New(t)
leaderKey := "/test_leader"
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/fastTick", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)"))
// Case1: close the client before the watch loop starts
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayWatcher", `pause`))
client.Close()
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayWatcher"))
})
// Case2: close the client when the watch loop is running
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
// Wait for the watch loop to start
time.Sleep(500 * time.Millisecond)
client.Close()
})
// Case3: delete the leader key
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
leaderKey := leaderKey
_, err := client.Delete(context.Background(), leaderKey)
re.NoError(err)
})
// Case4: close the server before the watch loop starts
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayWatcher", `pause`))
server.Close()
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayWatcher"))
})
// Case5: close the server when the watch loop is running
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
// Wait for the watch loop to start
time.Sleep(500 * time.Millisecond)
server.Close()
})
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/fastTick"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick"))
}

func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embed.Etcd, client *clientv3.Client)) {
re := require.New(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
defer func() {
etcd.Close()
}()
re.NoError(err)

ep := cfg.LCUrls[0].String()
client1, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)
client2, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)

<-etcd.Server.ReadyNotify()

leadership1 := NewLeadership(client1, leaderKey, "test_leader_1")
leadership2 := NewLeadership(client2, leaderKey, "test_leader_2")
err = leadership1.Campaign(defaultLeaseTimeout, "test_leader_1")
re.NoError(err)
resp, err := client2.Get(context.Background(), leaderKey)
re.NoError(err)
done := make(chan struct{})
go func() {
leadership2.Watch(context.Background(), resp.Header.Revision)
done <- struct{}{}
}()

injectFunc(etcd, client2)

testutil.Eventually(re, func() bool {
select {
case <-done:
return true
default:
return false
}
})
}
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (s *Server) initClient() error {
if err != nil {
return err
}
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)[0])
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u))
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (s *Server) initClient() error {
if err != nil {
return err
}
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls[0])
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls)
return err
}

Expand Down
Loading