Skip to content

Commit

Permalink
[fallback] Set xDS resources from the client
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneo committed Aug 7, 2024
1 parent 53d1d8e commit 6a118d7
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 319 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/psm-interop.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ jobs:
protos/grpc/testing/empty.proto
protos/grpc/testing/messages.proto
protos/grpc/testing/test.proto
protos/grpc/testing/xdsconfig/control.proto
protos/grpc/testing/xdsconfig/service.proto
protos/grpc/testing/xdsconfig/xdsconfig.proto
- name: "Run unit tests"
run: python -m tests.unit
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,5 @@ venv/
venv-*/
out/
protos/**/*_pb2*
# Generated Go files
docker/go-control-plane/grpc/interop/grpc_testing/xdsconfig/*.pb.go
9 changes: 9 additions & 0 deletions docker/go-control-plane/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,12 @@ the root of `grpc/psm-interop` checkout.
```
docker build . -f docker/go-control-plane/Dockerfile
```

## Local development

Run the following command from this repository root to generate protocol files:
```
protoc -I=. --go_out=docker/go-control-plane \
protos/grpc/testing/xdsconfig/*.proto \
--go-grpc_out=docker/go-control-plane/
```
124 changes: 0 additions & 124 deletions docker/go-control-plane/controlplane/resource.go

This file was deleted.

108 changes: 34 additions & 74 deletions docker/go-control-plane/fallback-control-plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,16 @@ import (
"log"
"net"
"os"
"strconv"
"sync"

"google.golang.org/grpc"
channelz "google.golang.org/grpc/channelz/service"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/grpc/psm-interop/docker/go-control-plane/controlplane"
xdsconfigpb "github.com/grpc/psm-interop/docker/go-control-plane/grpc/interop/grpc_testing/xdsconfig"

v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
Expand All @@ -52,12 +48,11 @@ import (
)

var (
port = flag.Uint("port", 3333, "Port to listen on")
nodeid = flag.String("nodeid", "test-id", "Node ID")
upstream = flag.String("upstream", "localhost:3000", "upstream server")
port = flag.Uint("port", 3333, "Port to listen on")
nodeid = flag.String("nodeid", "test-id", "Node ID")
)

type Filter struct {
type resourceKey struct {
ResourceType string
ResourceName string
}
Expand All @@ -68,8 +63,7 @@ type controlService struct {
xdsconfigpb.UnsafeXdsConfigControlServiceServer
version uint32
mu sync.Mutex // Guards access to all fields listed below
clusters map[string]*v3clusterpb.Cluster
listeners map[string]*v3listenerpb.Listener
resources map[resourceKey]*proto.Message
filters map[string]map[string]bool
cache cache.SnapshotCache
}
Expand All @@ -86,45 +80,48 @@ func (srv *controlService) StopOnRequest(_ context.Context, req *xdsconfigpb.Sto
}
res := xdsconfigpb.StopOnRequestResponse{}
for t, names := range srv.filters {
for name, _ := range names {
for name := range names {
res.Filters = append(res.Filters, &xdsconfigpb.StopOnRequestResponse_ResourceFilter{ResourceType: t, ResourceName: name})
}
}
return &res, nil
}

// UpsertResources allows the test to provide a new or replace existing xDS
// resource. Notification will be sent to any control plane clients watching
// SetResources allows the test to provide a new or replace existing xDS
// resources. Notification will be sent to any control plane clients watching
// the resource being updated.
func (srv *controlService) UpsertResources(_ context.Context, req *xdsconfigpb.UpsertResourcesRequest) (*xdsconfigpb.UpsertResourcesResponse, error) {
func (srv *controlService) SetResources(_ context.Context, req *xdsconfigpb.SetResourcesRequest) (*xdsconfigpb.SetResourcesResponse, error) {
srv.mu.Lock()
defer srv.mu.Unlock()
srv.version++
listener := controlplane.ListenerName
if req.Listener != nil {
listener = *req.Listener
if len(req.Resources) > 0 {
srv.version++
}
for _, resource := range req.Resources {
key := resourceKey{ResourceType: resource.Type, ResourceName: resource.Name}
contents := resource.Body
if contents != nil {
body, err := contents.UnmarshalNew()
if err != nil {
log.Printf("Failed to parse %s/%s: %v", key.ResourceType, key.ResourceName, err)
} else {
srv.resources[key] = &body
}
} else {
delete(srv.resources, key)
}
}
srv.clusters[req.Cluster] = controlplane.MakeCluster(req.Cluster, req.UpstreamHost, req.UpstreamPort)
srv.listeners[listener] = controlplane.MakeHTTPListener(listener, req.Cluster)
if err := srv.RefreshSnapshot(); err != nil {
return nil, err
}
res := &xdsconfigpb.UpsertResourcesResponse{}
for _, l := range srv.listeners {
a, err := anypb.New(l)
res := xdsconfigpb.SetResourcesResponse{}
for key, message := range srv.resources {
a, err := anypb.New(*message)
if err != nil {
log.Fatalf("Failed to convert listener %v to pb: %v\n", l, err)
log.Printf("Can not wrap resource %s/%s into any: %v", key.ResourceType, key.ResourceName, err)
}
res.Resource = append(res.Resource, a)
}
for _, c := range srv.clusters {
a, err := anypb.New(c)
if err != nil {
log.Fatalf("Failed to convert cluster %v to pb: %v\n", c, err)
}
res.Resource = append(res.Resource, a)
}
return res, nil
return &res, nil
}

// Abruptly stops the server when the client requests a resource that the test
Expand All @@ -146,31 +143,15 @@ func (srv *controlService) onStreamRequest(id int64, req *v3discoverypb.Discover
}

func (srv *controlService) RefreshSnapshot() error {
var listeners []types.Resource
for _, l := range srv.listeners {
listeners = append(listeners, l)
}
var clusters []types.Resource
for _, c := range srv.clusters {
clusters = append(clusters, c)
resources := map[resource.Type][]types.Resource{}
for k, resource := range srv.resources {
resources[k.ResourceType] = append(resources[k.ResourceType], *resource)
}
resources := map[resource.Type][]types.Resource{resource.ListenerType: listeners, resource.ClusterType: clusters}
// Create the snapshot that we'll serve to Envoy
snapshot, err := cache.NewSnapshot(fmt.Sprint(srv.version), resources)
if err != nil {
return err
}
log.Printf("Snapshot contents:\n")
for _, values := range snapshot.Resources {
for name, item := range values.Items {
text, err := protojson.MarshalOptions{Multiline: true}.Marshal(item.Resource)
if err != nil {
log.Printf("Resource %v, error: %v\n", name, err)
continue
}
log.Printf("%v => %v\n", name, string(text))
}
}
if err := snapshot.Consistent(); err != nil {
log.Printf("Snapshot inconsistency: %v\n", err)
return err
Expand All @@ -183,7 +164,6 @@ func (srv *controlService) RefreshSnapshot() error {
return nil
}


func (srv *controlService) RunServer(port uint) error {
if err := srv.RefreshSnapshot(); err != nil {
log.Fatalf("Failed to refresh snapshot: %v\n", err)
Expand All @@ -208,32 +188,12 @@ func (srv *controlService) RunServer(port uint) error {
return nil
}

func parseHostPort(host_port string) (string, uint32, error) {
host, upstreamPort, err := net.SplitHostPort(*upstream)
if err != nil {
return "", 0, fmt.Errorf("Incorrect upstream host name: %s: %v\n", host_port, err)
}
parsedUpstreamPort, err := strconv.Atoi(upstreamPort)
if err != nil || parsedUpstreamPort <= 0 {
return "", 0, fmt.Errorf("Not a valid port number: %d: %v\n", upstreamPort, err)
}
return host, uint32(parsedUpstreamPort), nil
}


// Main entry point. Configures and starts a gRPC server that serves xDS traffic
// and provides an interface for tests to manage control plane behavior.
func main() {
flag.Parse()
host, upstreamPort, err := parseHostPort(*upstream)
if err != nil {
log.Fatalf("Incorrect upstream host name: %s: %v\n", upstream, err)
}
initial_cds := controlplane.MakeCluster(controlplane.ClusterName, host, upstreamPort)
initial_lds := controlplane.MakeHTTPListener(controlplane.ListenerName, controlplane.ClusterName)
controlService := &controlService{version: 1,
clusters: map[string]*v3clusterpb.Cluster{controlplane.ListenerName: initial_cds},
listeners: map[string]*v3listenerpb.Listener{controlplane.ListenerName: initial_lds},
resources: map[resourceKey]*proto.Message{},
filters: map[string]map[string]bool{},
cache: cache.NewSnapshotCache(false, cache.IDHash{}, nil),
}
Expand Down
Loading

0 comments on commit 6a118d7

Please sign in to comment.