diff --git a/go.mod b/go.mod index 70f6b2072e..8053ffe35c 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,8 @@ require ( github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 github.com/stretchr/testify v1.7.0 go.opentelemetry.io/proto/otlp v0.7.0 + golang.org/x/net v0.0.0-20201021035429-f5854403a974 // indirect + golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 google.golang.org/grpc v1.36.0 google.golang.org/protobuf v1.25.0 diff --git a/go.sum b/go.sum index bf03e1a6ca..c5777cec80 100644 --- a/go.sum +++ b/go.sum @@ -63,8 +63,9 @@ golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20200822124328-c89045814202 h1:VvcQYSHwXgi7W+TpUR6A9g6Up98WAHf3f/ulnJ62IyA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -74,10 +75,14 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 h1:myAQVi0cGEoqQVR5POX+8RR2mrocKqNN1hmeMqhX27k= +golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index e6560f5ea2..10da0e3234 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -102,6 +102,10 @@ func NewSnapshotCache(ads bool, hash NodeHash, logger log.Logger) SnapshotCache } func newSnapshotCache(ads bool, hash NodeHash, logger log.Logger) *snapshotCache { + if logger == nil { + logger = log.NewDefaultLogger() + } + cache := &snapshotCache{ log: logger, ads: ads, @@ -170,12 +174,12 @@ func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) { if len(resourcesWithTTL) == 0 { continue } - if cache.log != nil { - cache.log.Debugf("respond open watch %d%v with heartbeat for version %q", id, watch.Request.ResourceNames, version) + cache.log.Debugf("respond open watch %d%v with heartbeat for version %q", id, watch.Request.ResourceNames, version) + err := cache.respond(ctx, watch.Request, watch.Response, resourcesWithTTL, version, true) + if err != nil { + cache.log.Errorf("received error when attempting to respond to watches: %v", err) } - _ = cache.respond(ctx, watch.Request, watch.Response, resourcesWithTTL, version, true) - // The watch must be deleted and we must rely on the client to ack this response to create a new watch. delete(info.watches, id) } @@ -198,9 +202,8 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh for id, watch := range info.watches { version := snapshot.GetVersion(watch.Request.TypeUrl) if version != watch.Request.VersionInfo { - if cache.log != nil { - cache.log.Debugf("respond open watch %d%v with new version %q", id, watch.Request.ResourceNames, version) - } + cache.log.Debugf("respond open watch %d%v with new version %q", id, watch.Request.ResourceNames, version) + resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl) err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false) if err != nil { @@ -309,10 +312,8 @@ func (cache *snapshotCache) CreateWatch(request *Request, value chan Response) f // if the requested version is up-to-date or missing a response, leave an open watch if !exists || request.VersionInfo == version { watchID := cache.nextWatchID() - if cache.log != nil { - cache.log.Debugf("open watch %d for %s%v from nodeID %q, version %q", watchID, - request.TypeUrl, request.ResourceNames, nodeID, request.VersionInfo) - } + cache.log.Debugf("open watch %d for %s%v from nodeID %q, version %q", watchID, request.TypeUrl, request.ResourceNames, nodeID, request.VersionInfo) + info.mu.Lock() info.watches[watchID] = ResponseWatch{Request: request, Response: value} info.mu.Unlock() @@ -351,16 +352,12 @@ func (cache *snapshotCache) respond(ctx context.Context, request *Request, value // if they do not, then the watch is never responded, and it is expected that envoy makes another request if len(request.ResourceNames) != 0 && cache.ads { if err := superset(nameSet(request.ResourceNames), resources); err != nil { - if cache.log != nil { - cache.log.Warnf("ADS mode: not responding to request: %v", err) - } + cache.log.Warnf("ADS mode: not responding to request: %v", err) return nil } } - if cache.log != nil { - cache.log.Debugf("respond %s%v version %q with version %q", - request.TypeUrl, request.ResourceNames, request.VersionInfo, version) - } + + cache.log.Debugf("respond %s%v version %q with version %q", request.TypeUrl, request.ResourceNames, request.VersionInfo, version) select { case value <- createResponse(ctx, request, resources, version, heartbeat): @@ -426,15 +423,11 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream if exists { err := snapshot.ConstructVersionMap() if err != nil { - if cache.log != nil { - cache.log.Errorf("failed to compute version for snapshot resources inline, waiting for next snapshot update") - } + cache.log.Errorf("failed to compute version for snapshot resources inline, waiting for next snapshot update") } response, err := cache.respondDelta(context.Background(), &snapshot, request, value, state) if err != nil { - if cache.log != nil { - cache.log.Errorf("failed to respond with delta response, waiting for next snapshot update: %s", err) - } + cache.log.Errorf("failed to respond with delta response, waiting for next snapshot update: %s", err) } delayedResponse = response == nil @@ -442,11 +435,7 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream if delayedResponse { watchID := cache.nextDeltaWatchID() - if cache.log != nil { - cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, system version %q", watchID, - t, state.GetResourceVersions(), nodeID, snapshot.GetVersion(t)) - } - + cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, system version %q", watchID, t, state.GetResourceVersions(), nodeID, snapshot.GetVersion(t)) info.SetDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: state}) return cache.cancelDeltaWatch(nodeID, watchID) @@ -511,9 +500,7 @@ func (cache *snapshotCache) Fetch(ctx context.Context, request *Request) (Respon // It might be beneficial to hold the request since Envoy will re-attempt the refresh. version := snapshot.GetVersion(request.TypeUrl) if request.VersionInfo == version { - if cache.log != nil { - cache.log.Warnf("skip fetch: version up to date") - } + cache.log.Warnf("skip fetch: version up to date") return nil, &types.SkipFetchError{} } @@ -532,9 +519,7 @@ func (cache *snapshotCache) GetStatusInfo(node string) StatusInfo { info, exists := cache.status[node] if !exists { - if cache.log != nil { - cache.log.Warnf("node does not exist") - } + cache.log.Warnf("node does not exist") return nil } diff --git a/pkg/log/default.go b/pkg/log/default.go new file mode 100644 index 0000000000..05b2bdfc2f --- /dev/null +++ b/pkg/log/default.go @@ -0,0 +1,27 @@ +package log + +// DefaultLogger is enabled when no consuming clients provide +// a logger to the server/cache subsystem +type DefaultLogger struct { +} + +// NewDefaultLogger creates a DefaultLogger which is a no-op to maintain current functionality +func NewDefaultLogger() *DefaultLogger { + return &DefaultLogger{} +} + +// Debugf logs a message at level debug on the standard logger. +func (l *DefaultLogger) Debugf(format string, args ...interface{}) { +} + +// Infof logs a message at level info on the standard logger. +func (l *DefaultLogger) Infof(format string, args ...interface{}) { +} + +// Warnf logs a message at level warn on the standard logger. +func (l *DefaultLogger) Warnf(format string, args ...interface{}) { +} + +// Errorf logs a message at level error on the standard logger. +func (l *DefaultLogger) Errorf(format string, args ...interface{}) { +} diff --git a/pkg/log/log_test.go b/pkg/log/log_test.go index 562d9534bc..eaf2fa8f06 100644 --- a/pkg/log/log_test.go +++ b/pkg/log/log_test.go @@ -69,3 +69,8 @@ func TestNilLoggerFuncs(t *testing.T) { xdsLogger.Warnf("warn") xdsLogger.Errorf("error") } + +func TestDefaultLogger(t *testing.T) { + logger := NewDefaultLogger() + assert.NotNil(t, logger) +} diff --git a/pkg/server/v3/gateway.go b/pkg/server/v3/gateway.go index 0d82cc4611..7b3e56a807 100644 --- a/pkg/server/v3/gateway.go +++ b/pkg/server/v3/gateway.go @@ -25,16 +25,12 @@ import ( discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/log" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" ) // HTTPGateway is a custom implementation of [gRPC gateway](https://github.com/grpc-ecosystem/grpc-gateway) // specialized to Envoy xDS API. type HTTPGateway struct { - // Log is an optional log for errors in response write - Log log.Logger - // Server is the underlying gRPC server Server Server } diff --git a/pkg/server/v3/gateway_test.go b/pkg/server/v3/gateway_test.go index a7c7094627..b9884eb2e4 100644 --- a/pkg/server/v3/gateway_test.go +++ b/pkg/server/v3/gateway_test.go @@ -29,15 +29,6 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/server/v3" ) -type logger struct { - t *testing.T -} - -func (log logger) Debugf(format string, args ...interface{}) { log.t.Logf(format, args...) } -func (log logger) Infof(format string, args ...interface{}) { log.t.Logf(format, args...) } -func (log logger) Warnf(format string, args ...interface{}) { log.t.Logf(format, args...) } -func (log logger) Errorf(format string, args ...interface{}) { log.t.Logf(format, args...) } - func TestGateway(t *testing.T) { config := makeMockConfigWatcher() config.responses = map[string][]cache.Response{ @@ -63,7 +54,7 @@ func TestGateway(t *testing.T) { }, }, } - gtw := server.HTTPGateway{Log: logger{t: t}, Server: server.NewServer(context.Background(), config, nil)} + gtw := server.HTTPGateway{Server: server.NewServer(context.Background(), config, nil)} failCases := []struct { path string diff --git a/pkg/test/main/main.go b/pkg/test/main/main.go index 51b217893d..55cc692296 100644 --- a/pkg/test/main/main.go +++ b/pkg/test/main/main.go @@ -170,7 +170,8 @@ func main() { cb := &testv3.Callbacks{Signal: signal, Debug: debug} // mux integration - config := cache.NewSnapshotCache(mode == resource.Ads, cache.IDHash{}, logger{}) + // nil for logger uses default logger + config := cache.NewSnapshotCache(mode == resource.Ads, cache.IDHash{}, nil) var configCache cache.Cache = config typeURL := "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment" eds := cache.NewLinearCache(typeURL) @@ -207,7 +208,7 @@ func main() { // start the xDS server go test.RunAccessLogServer(ctx, als, alsPort) go test.RunManagementServer(ctx, srv, port) - go test.RunManagementGateway(ctx, srv, gatewayPort, logger{}) + go test.RunManagementGateway(ctx, srv, gatewayPort) log.Println("waiting for the first request...") select { @@ -345,25 +346,3 @@ func callEcho() (int, int) { } } } - -type logger struct{} - -func (logger logger) Debugf(format string, args ...interface{}) { - if debug { - log.Printf(format+"\n", args...) - } -} - -func (logger logger) Infof(format string, args ...interface{}) { - if debug { - log.Printf(format+"\n", args...) - } -} - -func (logger logger) Warnf(format string, args ...interface{}) { - log.Printf(format+"\n", args...) -} - -func (logger logger) Errorf(format string, args ...interface{}) { - log.Printf(format+"\n", args...) -} diff --git a/pkg/test/server.go b/pkg/test/server.go index 490e3a5063..61223e0d21 100644 --- a/pkg/test/server.go +++ b/pkg/test/server.go @@ -94,13 +94,12 @@ func RunManagementServer(ctx context.Context, srv server.Server, port uint) { } // RunManagementGateway starts an HTTP gateway to an xDS server. -func RunManagementGateway(ctx context.Context, srv server.Server, port uint, lg gcplogger.Logger) { +func RunManagementGateway(ctx context.Context, srv server.Server, port uint) { log.Printf("gateway listening HTTP/1.1 on %d\n", port) server := &http.Server{ Addr: fmt.Sprintf(":%d", port), Handler: &HTTPGateway{ - Gateway: server.HTTPGateway{Log: lg, Server: srv}, - Log: lg, + Gateway: server.HTTPGateway{Server: srv}, }, } go func() {