Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdutil: fix ctx in watch loop #6445

Merged
merged 3 commits into from
May 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,19 +417,19 @@ func (lw *LoopWatcher) StartWatchLoop() {
defer logutil.LogPanic()
defer lw.wg.Done()

ctx, cancel := context.WithTimeout(lw.ctx, lw.loadTimeout)
ctx, cancel := context.WithCancel(lw.ctx)
defer cancel()
watchStartRevision := lw.initFromEtcd(ctx)

log.Info("start to watch loop", zap.String("name", lw.name), zap.String("key", lw.key))
for {
select {
case <-lw.ctx.Done():
case <-ctx.Done():
log.Info("server is closed, exit watch loop", zap.String("name", lw.name), zap.String("key", lw.key))
return
default:
}
nextRevision, err := lw.watch(lw.ctx, watchStartRevision)
nextRevision, err := lw.watch(ctx, watchStartRevision)
if err != nil {
log.Error("watcher canceled unexpectedly and a new watcher will start after a while for watch loop",
zap.String("name", lw.name),
Expand All @@ -453,6 +453,8 @@ func (lw *LoopWatcher) initFromEtcd(ctx context.Context) int64 {
)
ticker := time.NewTicker(defaultLoadFromEtcdRetryInterval)
defer ticker.Stop()
ctx, cancel := context.WithTimeout(ctx, lw.loadTimeout)
defer cancel()

for i := 0; i < lw.loadRetryTimes; i++ {
failpoint.Inject("loadTemporaryFail", func(val failpoint.Value) {
Expand All @@ -479,6 +481,8 @@ func (lw *LoopWatcher) initFromEtcd(ctx context.Context) int64 {
}
if err != nil {
log.Warn("meet error when loading in watch loop", zap.String("name", lw.name), zap.String("key", lw.key), zap.Error(err))
} else {
log.Info("load finished in watch loop", zap.String("name", lw.name), zap.String("key", lw.key))
}
lw.isLoadedCh <- err
return watchStartRevision
Expand All @@ -490,8 +494,12 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision

for {
WatchChan:
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watchChanCtx, watchChanCancel := context.WithCancel(clientv3.WithRequireLeader(ctx))
defer watchChanCancel()
opts := append(lw.opts, clientv3.WithRev(revision))
watchChan := watcher.Watch(ctx, lw.key, opts...)
watchChan := watcher.Watch(watchChanCtx, lw.key, opts...)
select {
case <-ctx.Done():
return revision, nil
Expand All @@ -501,13 +509,15 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
log.Warn("force load key failed in watch loop", zap.String("name", lw.name),
zap.String("key", lw.key), zap.Error(err))
}
watchChanCancel()
goto WatchChan
case wresp := <-watchChan:
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision in watch loop",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
watchChanCancel()
goto WatchChan
} else if wresp.Err() != nil { // wresp.Err() contains CompactRevision not equal to 0
log.Error("watcher is canceled in watch loop",
Expand All @@ -522,11 +532,16 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
log.Error("put failed in watch loop", zap.String("name", lw.name),
zap.String("key", lw.key), zap.Error(err))
}
log.Debug("put in watch loop", zap.String("name", lw.name),
zap.ByteString("key", event.Kv.Key),
rleungx marked this conversation as resolved.
Show resolved Hide resolved
zap.ByteString("value", event.Kv.Value))
case clientv3.EventTypeDelete:
if err := lw.deleteFn(event.Kv); err != nil {
log.Error("delete failed in watch loop", zap.String("name", lw.name),
zap.String("key", lw.key), zap.Error(err))
}
log.Debug("delete in watch loop", zap.String("name", lw.name),
zap.ByteString("key", event.Kv.Key))
}
}
if err := lw.postEventFn(); err != nil {
Expand All @@ -535,6 +550,7 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
}
revision = wresp.Header.Revision + 1
}
watchChanCancel()
}
}

Expand Down Expand Up @@ -570,6 +586,7 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error)
log.Error("put failed in watch loop when loading", zap.String("name", lw.name), zap.String("key", lw.key), zap.Error(err))
}
}
// Note: if there are no keys in etcd, the resp.More is false. It also means the load is finished.
if !resp.More {
if err := lw.postEventFn(); err != nil {
log.Error("run post event failed in watch loop", zap.String("name", lw.name),
Expand Down