Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache: add default logger #483

Merged
merged 5 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ require (
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/proto/otlp v0.7.0
golang.org/x/net v0.0.0-20201021035429-f5854403a974 // indirect
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013
google.golang.org/grpc v1.36.0
google.golang.org/protobuf v1.25.0
Expand Down
11 changes: 8 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200822124328-c89045814202 h1:VvcQYSHwXgi7W+TpUR6A9g6Up98WAHf3f/ulnJ62IyA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -74,10 +75,14 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 h1:myAQVi0cGEoqQVR5POX+8RR2mrocKqNN1hmeMqhX27k=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
Expand Down
55 changes: 20 additions & 35 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func NewSnapshotCache(ads bool, hash NodeHash, logger log.Logger) SnapshotCache
}

func newSnapshotCache(ads bool, hash NodeHash, logger log.Logger) *snapshotCache {
if logger == nil {
logger = log.NewDefaultLogger()
}

cache := &snapshotCache{
log: logger,
ads: ads,
Expand Down Expand Up @@ -170,12 +174,12 @@ func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) {
if len(resourcesWithTTL) == 0 {
continue
}
if cache.log != nil {
cache.log.Debugf("respond open watch %d%v with heartbeat for version %q", id, watch.Request.ResourceNames, version)
cache.log.Debugf("respond open watch %d%v with heartbeat for version %q", id, watch.Request.ResourceNames, version)
err := cache.respond(ctx, watch.Request, watch.Response, resourcesWithTTL, version, true)
if err != nil {
cache.log.Errorf("received error when attempting to respond to watches: %v", err)
}

_ = cache.respond(ctx, watch.Request, watch.Response, resourcesWithTTL, version, true)

// The watch must be deleted and we must rely on the client to ack this response to create a new watch.
delete(info.watches, id)
}
Expand All @@ -198,9 +202,8 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
for id, watch := range info.watches {
version := snapshot.GetVersion(watch.Request.TypeUrl)
if version != watch.Request.VersionInfo {
if cache.log != nil {
cache.log.Debugf("respond open watch %d%v with new version %q", id, watch.Request.ResourceNames, version)
}
cache.log.Debugf("respond open watch %d%v with new version %q", id, watch.Request.ResourceNames, version)

resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl)
err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false)
if err != nil {
Expand Down Expand Up @@ -309,10 +312,8 @@ func (cache *snapshotCache) CreateWatch(request *Request, value chan Response) f
// if the requested version is up-to-date or missing a response, leave an open watch
if !exists || request.VersionInfo == version {
watchID := cache.nextWatchID()
if cache.log != nil {
cache.log.Debugf("open watch %d for %s%v from nodeID %q, version %q", watchID,
request.TypeUrl, request.ResourceNames, nodeID, request.VersionInfo)
}
cache.log.Debugf("open watch %d for %s%v from nodeID %q, version %q", watchID, request.TypeUrl, request.ResourceNames, nodeID, request.VersionInfo)

info.mu.Lock()
info.watches[watchID] = ResponseWatch{Request: request, Response: value}
info.mu.Unlock()
Expand Down Expand Up @@ -351,16 +352,12 @@ func (cache *snapshotCache) respond(ctx context.Context, request *Request, value
// if they do not, then the watch is never responded, and it is expected that envoy makes another request
if len(request.ResourceNames) != 0 && cache.ads {
if err := superset(nameSet(request.ResourceNames), resources); err != nil {
if cache.log != nil {
cache.log.Warnf("ADS mode: not responding to request: %v", err)
}
cache.log.Warnf("ADS mode: not responding to request: %v", err)
return nil
}
}
if cache.log != nil {
cache.log.Debugf("respond %s%v version %q with version %q",
request.TypeUrl, request.ResourceNames, request.VersionInfo, version)
}

cache.log.Debugf("respond %s%v version %q with version %q", request.TypeUrl, request.ResourceNames, request.VersionInfo, version)

select {
case value <- createResponse(ctx, request, resources, version, heartbeat):
Expand Down Expand Up @@ -426,27 +423,19 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream
if exists {
err := snapshot.ConstructVersionMap()
if err != nil {
if cache.log != nil {
cache.log.Errorf("failed to compute version for snapshot resources inline, waiting for next snapshot update")
}
cache.log.Errorf("failed to compute version for snapshot resources inline, waiting for next snapshot update")
}
response, err := cache.respondDelta(context.Background(), &snapshot, request, value, state)
if err != nil {
if cache.log != nil {
cache.log.Errorf("failed to respond with delta response, waiting for next snapshot update: %s", err)
}
cache.log.Errorf("failed to respond with delta response, waiting for next snapshot update: %s", err)
}

delayedResponse = response == nil
}

if delayedResponse {
watchID := cache.nextDeltaWatchID()
if cache.log != nil {
cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, system version %q", watchID,
t, state.GetResourceVersions(), nodeID, snapshot.GetVersion(t))
}

cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, system version %q", watchID, t, state.GetResourceVersions(), nodeID, snapshot.GetVersion(t))
info.SetDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: state})

return cache.cancelDeltaWatch(nodeID, watchID)
Expand Down Expand Up @@ -511,9 +500,7 @@ func (cache *snapshotCache) Fetch(ctx context.Context, request *Request) (Respon
// It might be beneficial to hold the request since Envoy will re-attempt the refresh.
version := snapshot.GetVersion(request.TypeUrl)
if request.VersionInfo == version {
if cache.log != nil {
cache.log.Warnf("skip fetch: version up to date")
}
cache.log.Warnf("skip fetch: version up to date")
return nil, &types.SkipFetchError{}
}

Expand All @@ -532,9 +519,7 @@ func (cache *snapshotCache) GetStatusInfo(node string) StatusInfo {

info, exists := cache.status[node]
if !exists {
if cache.log != nil {
cache.log.Warnf("node does not exist")
}
cache.log.Warnf("node does not exist")
return nil
}

Expand Down
27 changes: 27 additions & 0 deletions pkg/log/default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package log

// DefaultLogger is enabled when no consuming clients provide
// a logger to the server/cache subsystem
type DefaultLogger struct {
}

// NewDefaultLogger creates a DefaultLogger which is a no-op to maintain current functionality
func NewDefaultLogger() *DefaultLogger {
return &DefaultLogger{}
}

// Debugf logs a message at level debug on the standard logger.
func (l *DefaultLogger) Debugf(format string, args ...interface{}) {
}

// Infof logs a message at level info on the standard logger.
func (l *DefaultLogger) Infof(format string, args ...interface{}) {
}

// Warnf logs a message at level warn on the standard logger.
func (l *DefaultLogger) Warnf(format string, args ...interface{}) {
}

// Errorf logs a message at level error on the standard logger.
func (l *DefaultLogger) Errorf(format string, args ...interface{}) {
}
5 changes: 5 additions & 0 deletions pkg/log/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,8 @@ func TestNilLoggerFuncs(t *testing.T) {
xdsLogger.Warnf("warn")
xdsLogger.Errorf("error")
}

func TestDefaultLogger(t *testing.T) {
logger := NewDefaultLogger()
assert.NotNil(t, logger)
}
4 changes: 0 additions & 4 deletions pkg/server/v3/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,12 @@ import (

discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/log"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
)

// HTTPGateway is a custom implementation of [gRPC gateway](https://github.com/grpc-ecosystem/grpc-gateway)
// specialized to Envoy xDS API.
type HTTPGateway struct {
// Log is an optional log for errors in response write
Log log.Logger

// Server is the underlying gRPC server
Server Server
}
Expand Down
11 changes: 1 addition & 10 deletions pkg/server/v3/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/server/v3"
)

type logger struct {
t *testing.T
}

func (log logger) Debugf(format string, args ...interface{}) { log.t.Logf(format, args...) }
func (log logger) Infof(format string, args ...interface{}) { log.t.Logf(format, args...) }
func (log logger) Warnf(format string, args ...interface{}) { log.t.Logf(format, args...) }
func (log logger) Errorf(format string, args ...interface{}) { log.t.Logf(format, args...) }

func TestGateway(t *testing.T) {
config := makeMockConfigWatcher()
config.responses = map[string][]cache.Response{
Expand All @@ -63,7 +54,7 @@ func TestGateway(t *testing.T) {
},
},
}
gtw := server.HTTPGateway{Log: logger{t: t}, Server: server.NewServer(context.Background(), config, nil)}
gtw := server.HTTPGateway{Server: server.NewServer(context.Background(), config, nil)}

failCases := []struct {
path string
Expand Down
27 changes: 3 additions & 24 deletions pkg/test/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ func main() {
cb := &testv3.Callbacks{Signal: signal, Debug: debug}

// mux integration
config := cache.NewSnapshotCache(mode == resource.Ads, cache.IDHash{}, logger{})
// nil for logger uses default logger
config := cache.NewSnapshotCache(mode == resource.Ads, cache.IDHash{}, nil)
var configCache cache.Cache = config
typeURL := "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"
eds := cache.NewLinearCache(typeURL)
Expand Down Expand Up @@ -207,7 +208,7 @@ func main() {
// start the xDS server
go test.RunAccessLogServer(ctx, als, alsPort)
go test.RunManagementServer(ctx, srv, port)
go test.RunManagementGateway(ctx, srv, gatewayPort, logger{})
go test.RunManagementGateway(ctx, srv, gatewayPort)

log.Println("waiting for the first request...")
select {
Expand Down Expand Up @@ -345,25 +346,3 @@ func callEcho() (int, int) {
}
}
}

type logger struct{}

func (logger logger) Debugf(format string, args ...interface{}) {
if debug {
log.Printf(format+"\n", args...)
}
}

func (logger logger) Infof(format string, args ...interface{}) {
if debug {
log.Printf(format+"\n", args...)
}
}

func (logger logger) Warnf(format string, args ...interface{}) {
log.Printf(format+"\n", args...)
}

func (logger logger) Errorf(format string, args ...interface{}) {
log.Printf(format+"\n", args...)
}
5 changes: 2 additions & 3 deletions pkg/test/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,12 @@ func RunManagementServer(ctx context.Context, srv server.Server, port uint) {
}

// RunManagementGateway starts an HTTP gateway to an xDS server.
func RunManagementGateway(ctx context.Context, srv server.Server, port uint, lg gcplogger.Logger) {
func RunManagementGateway(ctx context.Context, srv server.Server, port uint) {
log.Printf("gateway listening HTTP/1.1 on %d\n", port)
server := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: &HTTPGateway{
Gateway: server.HTTPGateway{Log: lg, Server: srv},
Log: lg,
Gateway: server.HTTPGateway{Server: srv},
},
}
go func() {
Expand Down