Skip to content

Commit

Permalink
NET-4657/add resource service client (#18053)
Browse files Browse the repository at this point in the history
### Description

<!-- Please describe why you're making this change, in plain English.
-->
Dan had already started on this
[task](#17849) which is needed
to start building the HTTP APIs. This just needed some cleanup to get it
ready for review.

Overview:

- Rename `internalResourceServiceClient` to
`insecureResourceServiceClient` for name consistency
- Configure a `secureResourceServiceClient` with auth enabled

### PR Checklist

* [ ] ~updated test coverage~
* [ ] ~external facing docs updated~
* [x] appropriate backport labels added
* [ ] ~not a security concern~
  • Loading branch information
JadhavPoonam committed Jul 14, 2023
1 parent ad6364a commit 5208ea9
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 16 deletions.
4 changes: 4 additions & 0 deletions agent/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/types"

Expand Down Expand Up @@ -163,6 +164,9 @@ func (a *TestACLAgent) Stats() map[string]map[string]string {
func (a *TestACLAgent) ReloadConfig(_ consul.ReloadableConfig) error {
return fmt.Errorf("Unimplemented")
}
func (a *TestACLAgent) ResourceServiceClient() pbresource.ResourceServiceClient {
return nil
}

func TestACL_Version8EnabledByDefault(t *testing.T) {
t.Parallel()
Expand Down
4 changes: 4 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import (
"github.com/hashicorp/consul/lib/mutex"
"github.com/hashicorp/consul/lib/routine"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/pboperator"
"github.com/hashicorp/consul/proto/private/pbpeering"
"github.com/hashicorp/consul/tlsutil"
Expand Down Expand Up @@ -198,6 +199,9 @@ type delegate interface {

RPC(ctx context.Context, method string, args interface{}, reply interface{}) error

// ResourceServiceClient is a client for the gRPC Resource Service.
ResourceServiceClient() pbresource.ResourceServiceClient

SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error
Shutdown() error
Stats() map[string]map[string]string
Expand Down
15 changes: 15 additions & 0 deletions agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
)
Expand Down Expand Up @@ -93,6 +94,9 @@ type Client struct {
EnterpriseClient

tlsConfigurator *tlsutil.Configurator

// resourceServiceClient is a client for the gRPC Resource Service.
resourceServiceClient pbresource.ResourceServiceClient
}

// NewClient creates and returns a Client
Expand Down Expand Up @@ -151,6 +155,13 @@ func NewClient(config *Config, deps Deps) (*Client, error) {
}
c.router = deps.Router

conn, err := deps.GRPCConnPool.ClientConn(deps.ConnPool.Datacenter)
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("Failed to get gRPC client connection: %w", err)
}
c.resourceServiceClient = pbresource.NewResourceServiceClient(conn)

// Start LAN event handlers after the router is complete since the event
// handlers depend on the router and the router depends on Serf.
go c.lanEventHandler()
Expand Down Expand Up @@ -451,3 +462,7 @@ func (c *Client) AgentEnterpriseMeta() *acl.EnterpriseMeta {
func (c *Client) agentSegmentName() string {
return c.config.Segment
}

func (c *Client) ResourceServiceClient() pbresource.ResourceServiceClient {
return c.resourceServiceClient
}
78 changes: 62 additions & 16 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,10 +442,20 @@ type Server struct {
// typeRegistry contains Consul's registered resource types.
typeRegistry resource.Registry

// internalResourceServiceClient is a client that can be used to communicate
// with the Resource Service in-process (i.e. not via the network) without auth.
// It should only be used for purely-internal workloads, such as controllers.
internalResourceServiceClient pbresource.ResourceServiceClient
// resourceServiceServer implements the Resource Service.
resourceServiceServer *resourcegrpc.Server

// insecureResourceServiceClient is a client that can be used to communicate
// with the Resource Service in-process (i.e. not via the network) *without*
// auth. It should only be used for purely-internal workloads, such as
// controllers.
insecureResourceServiceClient pbresource.ResourceServiceClient

// secureResourceServiceClient is a client that can be used to communicate
// with the Resource Service in-process (i.e. not via the network) *with* auth.
// It can be used to make requests to the Resource Service on behalf of the user
// (e.g. from the HTTP API).
secureResourceServiceClient pbresource.ResourceServiceClient

// controllerManager schedules the execution of controllers.
controllerManager *controller.Manager
Expand Down Expand Up @@ -803,11 +813,16 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
s.grpcHandler = newGRPCHandlerFromConfig(flat, config, s)
s.grpcLeaderForwarder = flat.LeaderForwarder

if err := s.setupInternalResourceService(logger); err != nil {
if err := s.setupSecureResourceServiceClient(); err != nil {
return nil, err
}

if err := s.setupInsecureResourceServiceClient(logger); err != nil {
return nil, err
}

s.controllerManager = controller.NewManager(
s.internalResourceServiceClient,
s.insecureResourceServiceClient,
logger.Named(logging.ControllerRuntime),
)
s.registerResources(flat)
Expand Down Expand Up @@ -929,6 +944,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
s.peerStreamServer.Register(srv)
s.externalACLServer.Register(srv)
s.externalConnectCAServer.Register(srv)
s.resourceServiceServer.Register(srv)
}

return agentgrpc.NewHandler(deps.Logger, config.RPCAddr, register, nil, s.incomingRPCLimiter)
Expand Down Expand Up @@ -1334,23 +1350,50 @@ func (s *Server) setupExternalGRPC(config *Config, logger hclog.Logger) {
})
s.peerStreamServer.Register(s.externalGRPCServer)

resourcegrpc.NewServer(resourcegrpc.Config{
s.resourceServiceServer = resourcegrpc.NewServer(resourcegrpc.Config{
Registry: s.typeRegistry,
Backend: s.raftStorageBackend,
ACLResolver: s.ACLResolver,
Logger: logger.Named("grpc-api.resource"),
}).Register(s.externalGRPCServer)
})
s.resourceServiceServer.Register(s.externalGRPCServer)
}

func (s *Server) setupInternalResourceService(logger hclog.Logger) error {
server := grpc.NewServer()

resourcegrpc.NewServer(resourcegrpc.Config{
func (s *Server) setupInsecureResourceServiceClient(logger hclog.Logger) error {
server := resourcegrpc.NewServer(resourcegrpc.Config{
Registry: s.typeRegistry,
Backend: s.raftStorageBackend,
ACLResolver: resolver.DANGER_NO_AUTH{},
Logger: logger.Named("grpc-api.resource"),
}).Register(server)
})

conn, err := s.runInProcessGRPCServer(server.Register)
if err != nil {
return err
}
s.insecureResourceServiceClient = pbresource.NewResourceServiceClient(conn)

return nil
}

func (s *Server) setupSecureResourceServiceClient() error {
conn, err := s.runInProcessGRPCServer(s.resourceServiceServer.Register)
if err != nil {
return err
}
s.secureResourceServiceClient = pbresource.NewResourceServiceClient(conn)

return nil
}

// runInProcessGRPCServer runs a gRPC server that can only be accessed in the
// same process, rather than over the network, using a pipe listener.
func (s *Server) runInProcessGRPCServer(registerFn ...func(*grpc.Server)) (*grpc.ClientConn, error) {
server := grpc.NewServer()

for _, fn := range registerFn {
fn(server)
}

pipe := agentgrpc.NewPipeListener()
go server.Serve(pipe)
Expand All @@ -1367,15 +1410,14 @@ func (s *Server) setupInternalResourceService(logger hclog.Logger) error {
)
if err != nil {
server.Stop()
return err
return nil, err
}
go func() {
<-s.shutdownCh
conn.Close()
}()
s.internalResourceServiceClient = pbresource.NewResourceServiceClient(conn)

return nil
return conn, nil
}

// Shutdown is used to shutdown the server
Expand Down Expand Up @@ -2095,6 +2137,10 @@ func (s *Server) hcpServerStatus(deps Deps) hcp.StatusCallback {
}
}

func (s *Server) ResourceServiceClient() pbresource.ResourceServiceClient {
return s.secureResourceServiceClient
}

func fileExists(name string) (bool, error) {
_, err := os.Stat(name)
if err == nil {
Expand Down

0 comments on commit 5208ea9

Please sign in to comment.