diff --git a/clientv3/concurrency/session.go b/clientv3/concurrency/session.go index 2f3281d8e35..0cb5ea7cf1e 100644 --- a/clientv3/concurrency/session.go +++ b/clientv3/concurrency/session.go @@ -51,9 +51,12 @@ func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) { } ctx, cancel := context.WithCancel(ops.ctx) - keepAlive := client.KeepAlive(ctx, id) - donec := make(chan struct{}) + keepAlive, err := client.KeepAlive(ctx, id) + if err != nil || keepAlive == nil { + return nil, err + } + donec := make(chan struct{}) s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec} // keep the lease alive until client error or cancelled context diff --git a/clientv3/example_lease_test.go b/clientv3/example_lease_test.go index 2eeab77229c..e1bd57a15d7 100644 --- a/clientv3/example_lease_test.go +++ b/clientv3/example_lease_test.go @@ -100,13 +100,12 @@ func ExampleLease_keepAlive() { } // the key 'foo' will be kept forever - ch := cli.KeepAlive(context.TODO(), resp.ID) - - ka := <-ch - if ka.Err != nil { - log.Fatal(ka.Err) + ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID) + if kaerr != nil { + log.Fatal(kaerr) } + ka := <-ch fmt.Println("ttl:", ka.TTL) // Output: ttl: 5 } @@ -132,9 +131,9 @@ func ExampleLease_keepAliveOnce() { } // to renew the lease only once - ka := cli.KeepAliveOnce(context.TODO(), resp.ID) - if ka.Err != nil { - log.Fatal(ka.Err) + ka, kaerr := cli.KeepAliveOnce(context.TODO(), resp.ID) + if kaerr != nil { + log.Fatal(kaerr) } fmt.Println("ttl:", ka.TTL) diff --git a/clientv3/integration/lease_test.go b/clientv3/integration/lease_test.go index 544ef5b66b9..a0c41182697 100644 --- a/clientv3/integration/lease_test.go +++ b/clientv3/integration/lease_test.go @@ -104,14 +104,14 @@ func TestLeaseKeepAliveOnce(t *testing.T) { t.Errorf("failed to create lease %v", err) } - ka := lapi.KeepAliveOnce(context.Background(), resp.ID) - if ka.Err != nil { - t.Errorf("failed to keepalive lease %v", ka.Err) + _, err = lapi.KeepAliveOnce(context.Background(), resp.ID) + if err != nil { + t.Errorf("failed to keepalive lease %v", err) } - ka = lapi.KeepAliveOnce(context.Background(), clientv3.LeaseID(0)) - if ka.Err != rpctypes.ErrLeaseNotFound { - t.Errorf("expected %v, got %v", rpctypes.ErrLeaseNotFound, ka.Err) + _, err = lapi.KeepAliveOnce(context.Background(), clientv3.LeaseID(0)) + if err != rpctypes.ErrLeaseNotFound { + t.Errorf("expected %v, got %v", rpctypes.ErrLeaseNotFound, err) } } @@ -129,7 +129,10 @@ func TestLeaseKeepAlive(t *testing.T) { t.Errorf("failed to create lease %v", err) } - rc := lapi.KeepAlive(context.Background(), resp.ID) + rc, kerr := lapi.KeepAlive(context.Background(), resp.ID) + if kerr != nil { + t.Errorf("failed to keepalive lease %v", kerr) + } kresp, ok := <-rc if !ok { @@ -160,7 +163,11 @@ func TestLeaseKeepAliveOneSecond(t *testing.T) { if err != nil { t.Errorf("failed to create lease %v", err) } - rc := cli.KeepAlive(context.Background(), resp.ID) + rc, kerr := cli.KeepAlive(context.Background(), resp.ID) + if kerr != nil { + t.Errorf("failed to keepalive lease %v", kerr) + } + for i := 0; i < 3; i++ { if _, ok := <-rc; !ok { t.Errorf("chan is closed, want not closed") @@ -186,7 +193,10 @@ func TestLeaseKeepAliveHandleFailure(t *testing.T) { t.Errorf("failed to create lease %v", err) } - rc := lapi.KeepAlive(context.Background(), resp.ID) + rc, kerr := lapi.KeepAlive(context.Background(), resp.ID) + if kerr != nil { + t.Errorf("failed to keepalive lease %v", kerr) + } kresp := <-rc if kresp.ID != resp.ID { @@ -220,7 +230,7 @@ func TestLeaseKeepAliveHandleFailure(t *testing.T) { type leaseCh struct { lid clientv3.LeaseID - ch clientv3.LeaseKeepAliveChan + ch <-chan *clientv3.LeaseKeepAliveResponse } // TestLeaseKeepAliveNotFound ensures a revoked lease won't stop other keep alives @@ -237,7 +247,10 @@ func TestLeaseKeepAliveNotFound(t *testing.T) { if rerr != nil { t.Fatal(rerr) } - kach := cli.KeepAlive(context.Background(), resp.ID) + kach, kaerr := cli.KeepAlive(context.Background(), resp.ID) + if kaerr != nil { + t.Fatal(kaerr) + } lchs = append(lchs, leaseCh{resp.ID, kach}) } @@ -362,7 +375,10 @@ func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) { if err != nil { t.Fatal(err) } - rc := cli.KeepAlive(context.Background(), resp.ID) + rc, kerr := cli.KeepAlive(context.Background(), resp.ID) + if kerr != nil { + t.Fatal(kerr) + } kresp := <-rc if kresp.ID != resp.ID { t.Fatalf("ID = %x, want %x", kresp.ID, resp.ID) @@ -381,10 +397,9 @@ func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) { // some keep-alives may still be buffered; drain until close timer := time.After(time.Duration(kresp.TTL) * time.Second) - loop := true - for loop { + for kresp != nil { select { - case _, loop = <-rc: + case kresp = <-rc: case <-timer: t.Fatalf("keepalive channel did not close") } @@ -408,7 +423,10 @@ func TestLeaseKeepAliveInitTimeout(t *testing.T) { } // keep client disconnected clus.Members[0].Stop(t) - rc := cli.KeepAlive(context.Background(), resp.ID) + rc, kerr := cli.KeepAlive(context.Background(), resp.ID) + if kerr != nil { + t.Fatal(kerr) + } select { case ka, ok := <-rc: if ok { @@ -436,7 +454,10 @@ func TestLeaseKeepAliveTTLTimeout(t *testing.T) { if err != nil { t.Fatal(err) } - rc := cli.KeepAlive(context.Background(), resp.ID) + rc, kerr := cli.KeepAlive(context.Background(), resp.ID) + if kerr != nil { + t.Fatal(kerr) + } if kresp := <-rc; kresp.ID != resp.ID { t.Fatalf("ID = %x, want %x", kresp.ID, resp.ID) } @@ -559,7 +580,10 @@ func TestLeaseRenewLostQuorum(t *testing.T) { kctx, kcancel := context.WithCancel(context.Background()) defer kcancel() - ka := cli.KeepAlive(kctx, r.ID) + ka, err := cli.KeepAlive(kctx, r.ID) + if err != nil { + t.Fatal(err) + } // consume first keepalive so next message sends when cluster is down <-ka lastKa := time.Now() @@ -606,9 +630,9 @@ func TestLeaseKeepAliveLoopExit(t *testing.T) { } cli.Close() - ka := cli.KeepAlive(ctx, resp.ID) - if resp, ok := <-ka; ok { - t.Fatalf("expected closed channel, got response %+v", resp) + _, err = cli.KeepAlive(ctx, resp.ID) + if _, ok := err.(clientv3.ErrKeepAliveHalted); !ok { + t.Fatalf("expected %T, got %v(%T)", clientv3.ErrKeepAliveHalted{}, err, err) } } @@ -683,9 +707,15 @@ func TestLeaseWithRequireLeader(t *testing.T) { t.Fatal(err2) } // kaReqLeader close if the leader is lost - kaReqLeader := c.KeepAlive(clientv3.WithRequireLeader(context.TODO()), lid1.ID) + kaReqLeader, kerr1 := c.KeepAlive(clientv3.WithRequireLeader(context.TODO()), lid1.ID) + if kerr1 != nil { + t.Fatal(kerr1) + } // kaWait will wait even if the leader is lost - kaWait := c.KeepAlive(context.TODO(), lid2.ID) + kaWait, kerr2 := c.KeepAlive(context.TODO(), lid2.ID) + if kerr2 != nil { + t.Fatal(kerr2) + } select { case <-kaReqLeader: diff --git a/clientv3/lease.go b/clientv3/lease.go index 34772315dce..a6494ceee45 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -41,10 +41,8 @@ type LeaseGrantResponse struct { // LeaseKeepAliveResponse is used to convert the protobuf keepalive response. type LeaseKeepAliveResponse struct { *pb.ResponseHeader - ID LeaseID - TTL int64 - Err error - Deadline time.Time + ID LeaseID + TTL int64 } // LeaseTimeToLiveResponse is used to convert the protobuf lease timetolive response. @@ -72,11 +70,23 @@ const ( NoLease LeaseID = 0 // retryConnWait is how long to wait before retrying on a lost leader - // or keep alive loop failure. retryConnWait = 500 * time.Millisecond ) -type LeaseKeepAliveChan <-chan LeaseKeepAliveResponse +// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error. +// +// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected. +type ErrKeepAliveHalted struct { + Reason error +} + +func (e ErrKeepAliveHalted) Error() string { + s := "etcdclient: leases keep alive halted" + if e.Reason != nil { + s += ": " + e.Reason.Error() + } + return s +} type Lease interface { // Grant creates a new lease. @@ -88,24 +98,12 @@ type Lease interface { // TimeToLive retrieves the lease information of the given lease ID. TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) - // KeepAlive keeps the given lease alive forever. If the keepalive response posted to - // the channel is not consumed immediately, the lease client will continue sending keep alive requests - // to the etcd server at least every second until latest response is consumed. - // - // The KeepAlive channel closes if the underlying keep alive stream is interrupted in some - // way the client cannot handle itself; the error will be posted in the last keep - // alive message before closing. If there is no keepalive response within the - // lease's time-out, the channel will close with no error. In most cases calling - // KeepAlive again will re-establish keepalives with the target lease if it has not - // expired. - KeepAlive(ctx context.Context, id LeaseID) LeaseKeepAliveChan - - // KeepAliveOnce renews the lease once. The response corresponds to the - // first message from calling KeepAlive. If the response has a recoverable - // error, KeepAliveOnce will retry the RPC with a new keep alive message. - // - // In most of the cases, Keepalive should be used instead of KeepAliveOnce. - KeepAliveOnce(ctx context.Context, id LeaseID) LeaseKeepAliveResponse + // KeepAlive keeps the given lease alive forever. + KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) + + // KeepAliveOnce renews the lease once. In most of the cases, Keepalive + // should be used instead of KeepAliveOnce. + KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) // Close releases all resources Lease keeps for efficient communication // with the etcd server. @@ -115,8 +113,9 @@ type Lease interface { type lessor struct { mu sync.Mutex // guards all fields - // donec is closed when all goroutines are torn down from Close() - donec chan struct{} + // donec is closed and loopErr is set when recvKeepAliveLoop stops + donec chan struct{} + loopErr error remote pb.LeaseClient @@ -138,7 +137,7 @@ type lessor struct { // keepAlive multiplexes a keepalive for a lease over multiple channels type keepAlive struct { - chs []chan<- LeaseKeepAliveResponse + chs []chan<- *LeaseKeepAliveResponse ctxs []context.Context // deadline is the time the keep alive channels close if no response deadline time.Time @@ -220,22 +219,24 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption } } -func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) LeaseKeepAliveChan { - ch := make(chan LeaseKeepAliveResponse, leaseResponseChSize) +func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) { + ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize) l.mu.Lock() // ensure that recvKeepAliveLoop is still running select { case <-l.donec: + err := l.loopErr + l.mu.Unlock() close(ch) - return ch + return ch, ErrKeepAliveHalted{Reason: err} default: } ka, ok := l.keepAlives[id] if !ok { // create fresh keep alive ka = &keepAlive{ - chs: []chan<- LeaseKeepAliveResponse{ch}, + chs: []chan<- *LeaseKeepAliveResponse{ch}, ctxs: []context.Context{ctx}, deadline: time.Now().Add(l.firstKeepAliveTimeout), nextKeepAlive: time.Now(), @@ -251,51 +252,24 @@ func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) LeaseKeepAliveChan { go l.keepAliveCtxCloser(id, ctx, ka.donec) l.firstKeepAliveOnce.Do(func() { - go func() { - defer func() { - l.mu.Lock() - for _, ka := range l.keepAlives { - ka.Close(nil) - } - close(l.donec) - l.mu.Unlock() - }() - - for l.stopCtx.Err() == nil { - err := l.recvKeepAliveLoop() - if err == context.Canceled { - // canceled by user; no error like WatchChan - err = nil - } - l.mu.Lock() - for _, ka := range l.keepAlives { - ka.Close(err) - } - l.keepAlives = make(map[LeaseID]*keepAlive) - l.mu.Unlock() - select { - case <-l.stopCtx.Done(): - case <-time.After(retryConnWait): - } - } - }() + go l.recvKeepAliveLoop() go l.deadlineLoop() }) - return ch + return ch, nil } -func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) LeaseKeepAliveResponse { +func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) { for { - resp := l.keepAliveOnce(ctx, id) - if resp.Err == nil { + resp, err := l.keepAliveOnce(ctx, id) + if err == nil { if resp.TTL <= 0 { - resp.Err = rpctypes.ErrLeaseNotFound + err = rpctypes.ErrLeaseNotFound } - return resp + return resp, err } - if isHaltErr(ctx, resp.Err) { - return resp + if isHaltErr(ctx, err) { + return nil, toErr(ctx, err) } } } @@ -365,7 +339,7 @@ func (l *lessor) closeRequireLeader() { continue } // remove all channels that required a leader from keepalive - newChs := make([]chan<- LeaseKeepAliveResponse, len(ka.chs)-reqIdxs) + newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs) newCtxs := make([]context.Context, len(newChs)) newIdx := 0 for i := range ka.chs { @@ -379,34 +353,45 @@ func (l *lessor) closeRequireLeader() { } } -func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) LeaseKeepAliveResponse { +func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) { cctx, cancel := context.WithCancel(ctx) defer cancel() stream, err := l.remote.LeaseKeepAlive(cctx, grpc.FailFast(false)) if err != nil { - return LeaseKeepAliveResponse{Err: toErr(ctx, err)} + return nil, toErr(ctx, err) } err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)}) if err != nil { - return LeaseKeepAliveResponse{Err: toErr(ctx, err)} + return nil, toErr(ctx, err) } resp, rerr := stream.Recv() if rerr != nil { - return LeaseKeepAliveResponse{Err: toErr(ctx, rerr)} + return nil, toErr(ctx, rerr) } - return LeaseKeepAliveResponse{ + karesp := &LeaseKeepAliveResponse{ ResponseHeader: resp.GetHeader(), ID: LeaseID(resp.ID), TTL: resp.TTL, - Deadline: time.Now().Add(time.Duration(resp.TTL) * time.Second), } + return karesp, nil } func (l *lessor) recvKeepAliveLoop() (gerr error) { + defer func() { + l.mu.Lock() + close(l.donec) + l.loopErr = gerr + for _, ka := range l.keepAlives { + ka.Close() + } + l.keepAlives = make(map[LeaseID]*keepAlive) + l.mu.Unlock() + }() + stream, serr := l.resetRecv() for serr == nil { resp, err := stream.Recv() @@ -458,7 +443,6 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) { ResponseHeader: resp.GetHeader(), ID: LeaseID(resp.ID), TTL: resp.TTL, - Deadline: time.Now().Add(time.Duration(resp.TTL) * time.Second), } l.mu.Lock() @@ -472,7 +456,7 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) { if karesp.TTL <= 0 { // lease expired; close all keep alive channels delete(l.keepAlives, karesp.ID) - ka.Close(nil) + ka.Close() return } @@ -481,7 +465,7 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) { ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second) for _, ch := range ka.chs { select { - case ch <- *karesp: + case ch <- karesp: ka.nextKeepAlive = nextKeepAlive default: } @@ -502,7 +486,7 @@ func (l *lessor) deadlineLoop() { for id, ka := range l.keepAlives { if ka.deadline.Before(now) { // waited too long for response; lease may be expired - ka.Close(nil) + ka.Close() delete(l.keepAlives, id) } } @@ -544,18 +528,9 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) { } } -func (ka *keepAlive) Close(err error) { +func (ka *keepAlive) Close() { close(ka.donec) for _, ch := range ka.chs { - if err != nil { - // try to post error if buffer space available - select { - case ch <- LeaseKeepAliveResponse{Err: err}: - default: - } - } close(ch) } - // so keepAliveCtxClose doesn't double-close ka.chs - ka.chs, ka.ctxs = nil, nil } diff --git a/etcdctl/ctlv3/command/lease_command.go b/etcdctl/ctlv3/command/lease_command.go index 26b9c6cb6a7..0afb3d69c7c 100644 --- a/etcdctl/ctlv3/command/lease_command.go +++ b/etcdctl/ctlv3/command/lease_command.go @@ -148,12 +148,13 @@ func leaseKeepAliveCommandFunc(cmd *cobra.Command, args []string) { } id := leaseFromArgs(args[0]) - respc := mustClientFromCmd(cmd).KeepAlive(context.TODO(), id) + respc, kerr := mustClientFromCmd(cmd).KeepAlive(context.TODO(), id) + if kerr != nil { + ExitWithError(ExitBadConnection, kerr) + } + for resp := range respc { - if resp.Err != nil { - ExitWithError(ExitError, resp.Err) - } - display.KeepAlive(resp) + display.KeepAlive(*resp) } if _, ok := (display).(*simplePrinter); ok { diff --git a/integration/cluster_proxy.go b/integration/cluster_proxy.go index 8593b5064f1..0152a16c67c 100644 --- a/integration/cluster_proxy.go +++ b/integration/cluster_proxy.go @@ -75,7 +75,6 @@ type proxyCloser struct { clientv3.Watcher wdonec <-chan struct{} kvdonec <-chan struct{} - lclose func() lpdonec <-chan struct{} } @@ -84,7 +83,6 @@ func (pc *proxyCloser) Close() error { <-pc.kvdonec err := pc.Watcher.Close() <-pc.wdonec - pc.lclose() <-pc.lpdonec return err } @@ -97,13 +95,11 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) { rpc := toGRPC(c) c.KV = clientv3.NewKVFromKVClient(rpc.KV) pmu.Lock() - lc := c.Lease c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg.DialTimeout) c.Watcher = &proxyCloser{ Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch), wdonec: proxies[c].wdonec, kvdonec: proxies[c].kvdonec, - lclose: func() { lc.Close() }, lpdonec: proxies[c].lpdonec, } pmu.Unlock() diff --git a/proxy/grpcproxy/lease.go b/proxy/grpcproxy/lease.go index ba655d02d76..dd23425a281 100644 --- a/proxy/grpcproxy/lease.go +++ b/proxy/grpcproxy/lease.go @@ -255,7 +255,10 @@ func (lps *leaseProxyStream) recvLoop() error { func (lps *leaseProxyStream) keepAliveLoop(leaseID int64, neededResps *atomicCounter) error { cctx, ccancel := context.WithCancel(lps.ctx) defer ccancel() - respc := lps.lessor.KeepAlive(cctx, clientv3.LeaseID(leaseID)) + respc, err := lps.lessor.KeepAlive(cctx, clientv3.LeaseID(leaseID)) + if err != nil { + return err + } // ticker expires when loop hasn't received keepalive within TTL var ticker <-chan time.Time for { @@ -273,7 +276,7 @@ func (lps *leaseProxyStream) keepAliveLoop(leaseID int64, neededResps *atomicCou lps.mu.Unlock() return nil case rp, ok := <-respc: - if !ok || rp.Err != nil { + if !ok { lps.mu.Lock() delete(lps.keepAliveLeases, leaseID) lps.mu.Unlock() diff --git a/tools/benchmark/cmd/lease.go b/tools/benchmark/cmd/lease.go index bf59d075159..8743ed27ead 100644 --- a/tools/benchmark/cmd/lease.go +++ b/tools/benchmark/cmd/lease.go @@ -61,8 +61,8 @@ func leaseKeepaliveFunc(cmd *cobra.Command, args []string) { } for range requests { st := time.Now() - ka := c.KeepAliveOnce(context.TODO(), resp.ID) - r.Results() <- report.Result{Err: ka.Err, Start: st, End: time.Now()} + _, err := c.KeepAliveOnce(context.TODO(), resp.ID) + r.Results() <- report.Result{Err: err, Start: st, End: time.Now()} bar.Increment() } }(clients[i]) diff --git a/tools/functional-tester/etcd-runner/command/lease_renewer_command.go b/tools/functional-tester/etcd-runner/command/lease_renewer_command.go index 1602a8fa0d2..439e267d597 100644 --- a/tools/functional-tester/etcd-runner/command/lease_renewer_command.go +++ b/tools/functional-tester/etcd-runner/command/lease_renewer_command.go @@ -56,6 +56,7 @@ func runLeaseRenewerFunc(cmd *cobra.Command, args []string) { for { var ( l *clientv3.LeaseGrantResponse + lk *clientv3.LeaseKeepAliveResponse err error ) for { @@ -67,14 +68,13 @@ func runLeaseRenewerFunc(cmd *cobra.Command, args []string) { expire := time.Now().Add(time.Duration(l.TTL-1) * time.Second) for { - lk := c.Lease.KeepAliveOnce(ctx, l.ID) - err = lk.Err + lk, err = c.Lease.KeepAliveOnce(ctx, l.ID) if grpc.Code(err) == codes.NotFound { if time.Since(expire) < 0 { log.Fatalf("bad renew! exceeded: %v", time.Since(expire)) for { - lk = c.Lease.KeepAliveOnce(ctx, l.ID) - fmt.Println(lk) + lk, err = c.Lease.KeepAliveOnce(ctx, l.ID) + fmt.Println(lk, err) time.Sleep(time.Second) } }