Skip to content

Commit

Permalink
Merge pull request #13375 from serathius/authority-3.5
Browse files Browse the repository at this point in the history
Cherry pick "Fix http2 authority header in single endpoint scenario" to release-3.5
  • Loading branch information
ptabor authored Sep 30, 2021
2 parents 4312298 + 79f9a45 commit edb3b5a
Show file tree
Hide file tree
Showing 61 changed files with 983 additions and 317 deletions.
18 changes: 15 additions & 3 deletions client/v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,16 +296,28 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.
dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
}

initialEndpoints := strings.Join(c.cfg.Endpoints, ";")
target := fmt.Sprintf("%s://%p/#initially=[%s]", resolver.Schema, c, initialEndpoints)
target := fmt.Sprintf("%s://%p/%s", resolver.Schema, c, authority(c.Endpoints()[0]))
conn, err := grpc.DialContext(dctx, target, opts...)
if err != nil {
return nil, err
}
return conn, nil
}

func authority(endpoint string) string {
spl := strings.SplitN(endpoint, "://", 2)
if len(spl) < 2 {
if strings.HasPrefix(endpoint, "unix:") {
return endpoint[len("unix:"):]
}
if strings.HasPrefix(endpoint, "unixs:") {
return endpoint[len("unixs:"):]
}
return endpoint
}
return spl[1]
}

func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
r := endpoint.RequiresCredentials(ep)
switch r {
Expand Down
69 changes: 69 additions & 0 deletions pkg/grpc_testing/recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc_testing

import (
"context"
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

type GrpcRecorder struct {
mux sync.RWMutex
requests []RequestInfo
}

type RequestInfo struct {
FullMethod string
Authority string
}

func (ri *GrpcRecorder) UnaryInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
ri.record(toRequestInfo(ctx, info))
resp, err := handler(ctx, req)
return resp, err
}
}

func (ri *GrpcRecorder) RecordedRequests() []RequestInfo {
ri.mux.RLock()
defer ri.mux.RUnlock()
reqs := make([]RequestInfo, len(ri.requests))
copy(reqs, ri.requests)
return reqs
}

func toRequestInfo(ctx context.Context, info *grpc.UnaryServerInfo) RequestInfo {
req := RequestInfo{
FullMethod: info.FullMethod,
}
md, ok := metadata.FromIncomingContext(ctx)
if ok {
as := md.Get(":authority")
if len(as) != 0 {
req.Authority = as[0]
}
}
return req
}

func (ri *GrpcRecorder) record(r RequestInfo) {
ri.mux.Lock()
defer ri.mux.Unlock()
ri.requests = append(ri.requests, r)
}
2 changes: 1 addition & 1 deletion server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func (e *Etcd) servePeers() (err error) {

for _, p := range e.Peers {
u := p.Listener.Addr().String()
gs := v3rpc.Server(e.Server, peerTLScfg)
gs := v3rpc.Server(e.Server, peerTLScfg, nil)
m := cmux.New(p.Listener)
go gs.Serve(m.Match(cmux.HTTP2()))
srv := &http.Server{
Expand Down
4 changes: 2 additions & 2 deletions server/embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (sctx *serveCtx) serve(
}()

if sctx.insecure {
gs = v3rpc.Server(s, nil, gopts...)
gs = v3rpc.Server(s, nil, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
Expand Down Expand Up @@ -148,7 +148,7 @@ func (sctx *serveCtx) serve(
if tlsErr != nil {
return tlsErr
}
gs = v3rpc.Server(s, tlscfg, gopts...)
gs = v3rpc.Server(s, tlscfg, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
Expand Down
6 changes: 4 additions & 2 deletions server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,21 @@ const (
maxSendBytes = math.MaxInt32
)

func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server {
var opts []grpc.ServerOption
opts = append(opts, grpc.CustomCodec(&codec{}))
if tls != nil {
bundle := credentials.NewBundle(credentials.Config{TLSConfig: tls})
opts = append(opts, grpc.Creds(bundle.TransportCredentials()))
}

chainUnaryInterceptors := []grpc.UnaryServerInterceptor{
newLogUnaryInterceptor(s),
newUnaryInterceptor(s),
grpc_prometheus.UnaryServerInterceptor,
}
if interceptor != nil {
chainUnaryInterceptors = append(chainUnaryInterceptors, interceptor)
}

chainStreamInterceptors := []grpc.StreamServerInterceptor{
newStreamInterceptor(s),
Expand Down
6 changes: 5 additions & 1 deletion tests/e2e/cluster_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal {
return p.etcdProc.WithStopSignal(sig)
}

func (p *proxyEtcdProcess) Logs() logsExpect {
return p.etcdProc.Logs()
}

type proxyProc struct {
lg *zap.Logger
execPath string
Expand All @@ -132,7 +136,7 @@ func (pp *proxyProc) start() error {
if pp.proc != nil {
panic("already started")
}
proc, err := spawnCmdWithLogger(pp.lg, append([]string{pp.execPath}, pp.args...))
proc, err := spawnCmdWithLogger(pp.lg, append([]string{pp.execPath}, pp.args...), nil)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions tests/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ type etcdProcessClusterConfig struct {
execPath string
dataDirPath string
keepDataDir bool
envVars map[string]string

clusterSize int

Expand Down Expand Up @@ -318,6 +319,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
lg: lg,
execPath: cfg.execPath,
args: args,
envVars: cfg.envVars,
tlsArgs: cfg.tlsArgs(),
dataDirPath: dataDirPath,
keepDataDir: cfg.keepDataDir,
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func etcdctlBackup(t testing.TB, clus *etcdProcessCluster, dataDir, backupDir st
cmdArgs = append(cmdArgs, "--with-v3=false")
}
t.Logf("Running: %v", cmdArgs)
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, nil)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v3_alarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,5 @@ func alarmTest(cx ctlCtx) {

func ctlV3Alarm(cx ctlCtx, cmd string, as ...string) error {
cmdArgs := append(cx.PrefixArgs(), "alarm", cmd)
return spawnWithExpects(cmdArgs, as...)
return spawnWithExpects(cmdArgs, cx.envMap, as...)
}
38 changes: 19 additions & 19 deletions tests/e2e/ctl_v3_auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func authEnable(cx ctlCtx) error {

func ctlV3AuthEnable(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgs(), "auth", "enable")
return spawnWithExpect(cmdArgs, "Authentication Enabled")
return spawnWithExpectWithEnv(cmdArgs, cx.envMap, "Authentication Enabled")
}

func authDisableTest(cx ctlCtx) {
Expand Down Expand Up @@ -139,12 +139,12 @@ func authDisableTest(cx ctlCtx) {

func ctlV3AuthDisable(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgs(), "auth", "disable")
return spawnWithExpect(cmdArgs, "Authentication Disabled")
return spawnWithExpectWithEnv(cmdArgs, cx.envMap, "Authentication Disabled")
}

func authStatusTest(cx ctlCtx) {
cmdArgs := append(cx.PrefixArgs(), "auth", "status")
if err := spawnWithExpects(cmdArgs, "Authentication Status: false", "AuthRevision:"); err != nil {
if err := spawnWithExpects(cmdArgs, cx.envMap, "Authentication Status: false", "AuthRevision:"); err != nil {
cx.t.Fatal(err)
}

Expand All @@ -155,15 +155,15 @@ func authStatusTest(cx ctlCtx) {
cx.user, cx.pass = "root", "root"
cmdArgs = append(cx.PrefixArgs(), "auth", "status")

if err := spawnWithExpects(cmdArgs, "Authentication Status: true", "AuthRevision:"); err != nil {
if err := spawnWithExpects(cmdArgs, cx.envMap, "Authentication Status: true", "AuthRevision:"); err != nil {
cx.t.Fatal(err)
}

cmdArgs = append(cx.PrefixArgs(), "auth", "status", "--write-out", "json")
if err := spawnWithExpect(cmdArgs, "enabled"); err != nil {
if err := spawnWithExpectWithEnv(cmdArgs, cx.envMap, "enabled"); err != nil {
cx.t.Fatal(err)
}
if err := spawnWithExpect(cmdArgs, "authRevision"); err != nil {
if err := spawnWithExpectWithEnv(cmdArgs, cx.envMap, "authRevision"); err != nil {
cx.t.Fatal(err)
}
}
Expand Down Expand Up @@ -381,25 +381,25 @@ func authRoleRevokeDuringOpsTest(cx ctlCtx) {
}

func ctlV3PutFailAuth(cx ctlCtx, key, val string) error {
return spawnWithExpect(append(cx.PrefixArgs(), "put", key, val), "authentication failed")
return spawnWithExpectWithEnv(append(cx.PrefixArgs(), "put", key, val), cx.envMap, "authentication failed")
}

func ctlV3PutFailPerm(cx ctlCtx, key, val string) error {
return spawnWithExpect(append(cx.PrefixArgs(), "put", key, val), "permission denied")
return spawnWithExpectWithEnv(append(cx.PrefixArgs(), "put", key, val), cx.envMap, "permission denied")
}

func authSetupTestUser(cx ctlCtx) {
if err := ctlV3User(cx, []string{"add", "test-user", "--interactive=false"}, "User test-user created", []string{"pass"}); err != nil {
cx.t.Fatal(err)
}
if err := spawnWithExpect(append(cx.PrefixArgs(), "role", "add", "test-role"), "Role test-role created"); err != nil {
if err := spawnWithExpectWithEnv(append(cx.PrefixArgs(), "role", "add", "test-role"), cx.envMap, "Role test-role created"); err != nil {
cx.t.Fatal(err)
}
if err := ctlV3User(cx, []string{"grant-role", "test-user", "test-role"}, "Role test-role is granted to user test-user", nil); err != nil {
cx.t.Fatal(err)
}
cmd := append(cx.PrefixArgs(), "role", "grant-permission", "test-role", "readwrite", "foo")
if err := spawnWithExpect(cmd, "Role test-role updated"); err != nil {
if err := spawnWithExpectWithEnv(cmd, cx.envMap, "Role test-role updated"); err != nil {
cx.t.Fatal(err)
}
}
Expand Down Expand Up @@ -611,7 +611,7 @@ func authTestCertCN(cx ctlCtx) {
if err := ctlV3User(cx, []string{"add", "example.com", "--interactive=false"}, "User example.com created", []string{""}); err != nil {
cx.t.Fatal(err)
}
if err := spawnWithExpect(append(cx.PrefixArgs(), "role", "add", "test-role"), "Role test-role created"); err != nil {
if err := spawnWithExpectWithEnv(append(cx.PrefixArgs(), "role", "add", "test-role"), cx.envMap, "Role test-role created"); err != nil {
cx.t.Fatal(err)
}
if err := ctlV3User(cx, []string{"grant-role", "example.com", "test-role"}, "Role test-role is granted to user example.com", nil); err != nil {
Expand Down Expand Up @@ -921,21 +921,21 @@ func authTestRoleGet(cx ctlCtx) {
"KV Read:", "foo",
"KV Write:", "foo",
}
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "test-role"), expected...); err != nil {
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "test-role"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err)
}

// test-user can get the information of test-role because it belongs to the role
cx.user, cx.pass = "test-user", "pass"
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "test-role"), expected...); err != nil {
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "test-role"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err)
}

// test-user cannot get the information of root because it doesn't belong to the role
expected = []string{
"Error: etcdserver: permission denied",
}
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "root"), expected...); err != nil {
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "root"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err)
}
}
Expand All @@ -952,21 +952,21 @@ func authTestUserGet(cx ctlCtx) {
"Roles: test-role",
}

if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "test-user"), expected...); err != nil {
if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "test-user"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err)
}

// test-user can get the information of test-user itself
cx.user, cx.pass = "test-user", "pass"
if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "test-user"), expected...); err != nil {
if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "test-user"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err)
}

// test-user cannot get the information of root
expected = []string{
"Error: etcdserver: permission denied",
}
if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "root"), expected...); err != nil {
if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "root"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err)
}
}
Expand All @@ -977,7 +977,7 @@ func authTestRoleList(cx ctlCtx) {
}
cx.user, cx.pass = "root", "root"
authSetupTestUser(cx)
if err := spawnWithExpect(append(cx.PrefixArgs(), "role", "list"), "test-role"); err != nil {
if err := spawnWithExpectWithEnv(append(cx.PrefixArgs(), "role", "list"), cx.envMap, "test-role"); err != nil {
cx.t.Fatal(err)
}
}
Expand Down Expand Up @@ -1088,7 +1088,7 @@ func certCNAndUsername(cx ctlCtx, noPassword bool) {
cx.t.Fatal(err)
}
}
if err := spawnWithExpect(append(cx.PrefixArgs(), "role", "add", "test-role-cn"), "Role test-role-cn created"); err != nil {
if err := spawnWithExpectWithEnv(append(cx.PrefixArgs(), "role", "add", "test-role-cn"), cx.envMap, "Role test-role-cn created"); err != nil {
cx.t.Fatal(err)
}
if err := ctlV3User(cx, []string{"grant-role", "example.com", "test-role-cn"}, "Role test-role-cn is granted to user example.com", nil); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v3_compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,5 @@ func ctlV3Compact(cx ctlCtx, rev int64, physical bool) error {
if physical {
cmdArgs = append(cmdArgs, "--physical")
}
return spawnWithExpect(cmdArgs, "compacted revision "+rs)
return spawnWithExpectWithEnv(cmdArgs, cx.envMap, "compacted revision "+rs)
}
4 changes: 2 additions & 2 deletions tests/e2e/ctl_v3_defrag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ func ctlV3OnlineDefrag(cx ctlCtx) error {
for i := range lines {
lines[i] = "Finished defragmenting etcd member"
}
return spawnWithExpects(cmdArgs, lines...)
return spawnWithExpects(cmdArgs, cx.envMap, lines...)
}

func ctlV3OfflineDefrag(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgsUtl(), "defrag", "--data-dir", cx.dataDir)
lines := []string{"finished defragmenting directory"}
return spawnWithExpects(cmdArgs, lines...)
return spawnWithExpects(cmdArgs, cx.envMap, lines...)
}

func defragOfflineTest(cx ctlCtx) {
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v3_elect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func testElect(cx ctlCtx) {
// ctlV3Elect creates a elect process with a channel listening for when it wins the election.
func ctlV3Elect(cx ctlCtx, name, proposal string) (*expect.ExpectProcess, <-chan string, error) {
cmdArgs := append(cx.PrefixArgs(), "elect", name, proposal)
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, cx.envMap)
outc := make(chan string, 1)
if err != nil {
close(outc)
Expand Down
Loading

0 comments on commit edb3b5a

Please sign in to comment.