Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: add Node to stream closed callbacks #572

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions docs/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"log"
"sync"

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)

Expand All @@ -46,9 +47,9 @@ func (cb *Callbacks) OnStreamOpen(_ context.Context, id int64, typ string) error
}
return nil
}
func (cb *Callbacks) OnStreamClosed(id int64) {
func (cb *Callbacks) OnStreamClosed(id int64, node *core.Node) {
if cb.Debug {
log.Printf("stream %d closed\n", id)
log.Printf("stream %d of node %s closed\n", id, node.Id)
}
}
func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string) error {
Expand All @@ -57,9 +58,9 @@ func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string)
}
return nil
}
func (cb *Callbacks) OnDeltaStreamClosed(id int64) {
func (cb *Callbacks) OnDeltaStreamClosed(id int64, node *core.Node) {
if cb.Debug {
log.Printf("delta stream %d closed\n", id)
log.Printf("delta stream %d of node %s closed\n", id, node.Id)
}
}
func (cb *Callbacks) OnStreamRequest(int64, *discovery.DiscoveryRequest) error {
Expand Down
7 changes: 4 additions & 3 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Callbacks interface {
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
OnDeltaStreamOpen(context.Context, int64, string) error
// OnDeltaStreamClosed is called immediately prior to closing an xDS stream with a stream ID.
OnDeltaStreamClosed(int64)
OnDeltaStreamClosed(int64, *core.Node)
// OnStreamDeltaRequest is called once a request is received on a stream.
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
OnStreamDeltaRequest(int64, *discovery.DeltaDiscoveryRequest) error
Expand Down Expand Up @@ -63,10 +63,12 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
// a collection of stack allocated watches per request type
watches := newWatches()

var node = &core.Node{}

defer func() {
watches.Cancel()
if s.callbacks != nil {
s.callbacks.OnDeltaStreamClosed(streamID)
s.callbacks.OnDeltaStreamClosed(streamID, node)
}
}()

Expand Down Expand Up @@ -96,7 +98,6 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
}
}

var node = &core.Node{}
for {
select {
case <-s.ctx.Done():
Expand Down
10 changes: 5 additions & 5 deletions pkg/server/sotw/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Callbacks interface {
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
OnStreamOpen(context.Context, int64, string) error
// OnStreamClosed is called immediately prior to closing an xDS stream with a stream ID.
OnStreamClosed(int64)
OnStreamClosed(int64, *core.Node)
// OnStreamRequest is called once a request is received on a stream.
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
OnStreamRequest(int64, *discovery.DiscoveryRequest) error
Expand Down Expand Up @@ -87,10 +87,13 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
// a collection of stack allocated watches per request type
watches := newWatches()

// node may only be set on the first discovery request
var node = &core.Node{}

defer func() {
watches.close()
if s.callbacks != nil {
s.callbacks.OnStreamClosed(streamID)
s.callbacks.OnStreamClosed(streamID, node)
}
}()

Expand Down Expand Up @@ -130,9 +133,6 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
}
}

// node may only be set on the first discovery request
var node = &core.Node{}

// recompute dynamic channels for this stream
watches.recompute(s.ctx, reqCh)

Expand Down
13 changes: 7 additions & 6 deletions pkg/server/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
Expand Down Expand Up @@ -68,9 +69,9 @@ type Callbacks interface {
// CallbackFuncs is a convenience type for implementing the Callbacks interface.
type CallbackFuncs struct {
StreamOpenFunc func(context.Context, int64, string) error
StreamClosedFunc func(int64)
StreamClosedFunc func(int64, *core.Node)
DeltaStreamOpenFunc func(context.Context, int64, string) error
DeltaStreamClosedFunc func(int64)
DeltaStreamClosedFunc func(int64, *core.Node)
StreamRequestFunc func(int64, *discovery.DiscoveryRequest) error
StreamResponseFunc func(context.Context, int64, *discovery.DiscoveryRequest, *discovery.DiscoveryResponse)
StreamDeltaRequestFunc func(int64, *discovery.DeltaDiscoveryRequest) error
Expand All @@ -91,9 +92,9 @@ func (c CallbackFuncs) OnStreamOpen(ctx context.Context, streamID int64, typeURL
}

// OnStreamClosed invokes StreamClosedFunc.
func (c CallbackFuncs) OnStreamClosed(streamID int64) {
func (c CallbackFuncs) OnStreamClosed(streamID int64, node *core.Node) {
if c.StreamClosedFunc != nil {
c.StreamClosedFunc(streamID)
c.StreamClosedFunc(streamID, node)
}
}

Expand All @@ -107,9 +108,9 @@ func (c CallbackFuncs) OnDeltaStreamOpen(ctx context.Context, streamID int64, ty
}

// OnDeltaStreamClosed invokes DeltaStreamClosedFunc.
func (c CallbackFuncs) OnDeltaStreamClosed(streamID int64) {
func (c CallbackFuncs) OnDeltaStreamClosed(streamID int64, node *core.Node) {
if c.DeltaStreamClosedFunc != nil {
c.DeltaStreamClosedFunc(streamID)
c.DeltaStreamClosedFunc(streamID, node)
}
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/test/v3/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"log"
"sync"

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/v3"
)

type Callbacks struct {
Expand All @@ -18,6 +20,8 @@ type Callbacks struct {
mu sync.Mutex
}

var _ server.Callbacks = &Callbacks{}

func (cb *Callbacks) Report() {
cb.mu.Lock()
defer cb.mu.Unlock()
Expand All @@ -29,9 +33,9 @@ func (cb *Callbacks) OnStreamOpen(_ context.Context, id int64, typ string) error
}
return nil
}
func (cb *Callbacks) OnStreamClosed(id int64) {
func (cb *Callbacks) OnStreamClosed(id int64, node *core.Node) {
if cb.Debug {
log.Printf("stream %d closed\n", id)
log.Printf("stream %d of node %s closed\n", id, node.Id)
}
}
func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string) error {
Expand All @@ -40,9 +44,9 @@ func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string)
}
return nil
}
func (cb *Callbacks) OnDeltaStreamClosed(id int64) {
func (cb *Callbacks) OnDeltaStreamClosed(id int64, node *core.Node) {
if cb.Debug {
log.Printf("delta stream %d closed\n", id)
log.Printf("delta stream %d of node %s closed\n", id, node.Id)
}
}
func (cb *Callbacks) OnStreamRequest(int64, *discovery.DiscoveryRequest) error {
Expand Down