Skip to content

Commit

Permalink
use fallback
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrato committed Oct 4, 2024
1 parent 5cc10d4 commit 17998d9
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 131 deletions.
35 changes: 21 additions & 14 deletions lib/kube/proxy/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,9 @@ current-context: foo
},
kubeCluster: mustCreateKubernetesClusterV3(t, "foo"),
kubeClusterVersion: &apimachineryversion.Info{
Major: "1",
Minor: "20",
Major: "1",
Minor: "20",
GitVersion: "1.20.0",
},
rbacSupportedTypes: rbacSupportedTypes,
},
Expand All @@ -228,8 +229,9 @@ current-context: foo
clientRestCfg: &rest.Config{},
},
kubeClusterVersion: &apimachineryversion.Info{
Major: "1",
Minor: "20",
Major: "1",
Minor: "20",
GitVersion: "1.20.0",
},
kubeCluster: mustCreateKubernetesClusterV3(t, "bar"),
rbacSupportedTypes: rbacSupportedTypes,
Expand All @@ -242,8 +244,9 @@ current-context: foo
clientRestCfg: &rest.Config{},
},
kubeClusterVersion: &apimachineryversion.Info{
Major: "1",
Minor: "20",
Major: "1",
Minor: "20",
GitVersion: "1.20.0",
},
kubeCluster: mustCreateKubernetesClusterV3(t, "baz"),
rbacSupportedTypes: rbacSupportedTypes,
Expand Down Expand Up @@ -271,8 +274,9 @@ current-context: foo
clientRestCfg: &rest.Config{},
},
kubeClusterVersion: &apimachineryversion.Info{
Major: "1",
Minor: "20",
Major: "1",
Minor: "20",
GitVersion: "1.20.0",
},
kubeCluster: mustCreateKubernetesClusterV3(t, teleClusterName),
rbacSupportedTypes: rbacSupportedTypes,
Expand All @@ -293,8 +297,9 @@ current-context: foo
clientRestCfg: &rest.Config{},
},
kubeClusterVersion: &apimachineryversion.Info{
Major: "1",
Minor: "20",
Major: "1",
Minor: "20",
GitVersion: "1.20.0",
},
kubeCluster: mustCreateKubernetesClusterV3(t, "foo"),
rbacSupportedTypes: rbacSupportedTypes,
Expand All @@ -307,8 +312,9 @@ current-context: foo
clientRestCfg: &rest.Config{},
},
kubeClusterVersion: &apimachineryversion.Info{
Major: "1",
Minor: "20",
Major: "1",
Minor: "20",
GitVersion: "1.20.0",
},
kubeCluster: mustCreateKubernetesClusterV3(t, "bar"),
rbacSupportedTypes: rbacSupportedTypes,
Expand All @@ -321,8 +327,9 @@ current-context: foo
clientRestCfg: &rest.Config{},
},
kubeClusterVersion: &apimachineryversion.Info{
Major: "1",
Minor: "20",
Major: "1",
Minor: "20",
GitVersion: "1.20.0",
},
kubeCluster: mustCreateKubernetesClusterV3(t, "baz"),
rbacSupportedTypes: rbacSupportedTypes,
Expand Down
32 changes: 15 additions & 17 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2175,27 +2175,25 @@ func (f *Forwarder) getSPDYExecutor(sess *clusterSession, req *http.Request) (re
}

func (f *Forwarder) getPortForwardDialer(sess *clusterSession, req *http.Request) (httpstream.Dialer, error) {
isWSSupported := false
if !f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) {
// We're forwarding it to another Teleport kube_service,
// which supports the websocket protocol.
isWSSupported = f.allServersSupportTunneledSPDY(sess)
} else {
if details, err := f.findKubeDetailsByClusterName(sess.kubeClusterName); err == nil {
// We're accessing the Kubernetes cluster directly, check if it is version that supports new protocol.
details.rwMu.RLock()
isWSSupported = kubernetesSupportsPortTunnedledSPDY(details.kubeClusterVersion)
details.rwMu.RUnlock()
}
}

if isWSSupported {
wsDialer, err := f.getWebsocketDialer(sess, req)
return wsDialer, trace.Wrap(err)
wsDialer, err := f.getWebsocketDialer(sess, req)
if err != nil {
return nil, trace.Wrap(err)
}

spdyDialer, err := f.getSPDYDialer(sess, req)
return spdyDialer, trace.Wrap(err)
if err != nil {
return nil, trace.Wrap(err)
}

return portforward.NewFallbackDialer(wsDialer, spdyDialer, func(err error) bool {
result := httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err) || kubeerrors.IsForbidden(err) || isTeleportUpgradeFailure(err)
if result {
// If the error is a known upgrade failure, we can retry with the other protocol.
sess.connCtx, sess.connMonitorCancel = context.WithCancelCause(req.Context())
}
return result
}), nil
}

// getSPDYDialer returns a dialer that can be used to upgrade the connection
Expand Down
24 changes: 22 additions & 2 deletions lib/kube/proxy/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -289,11 +290,30 @@ func extractKubeAPIStatusFromReq(rsp *http.Response) error {
} else {
if obj, _, err := statusCodecs.UniversalDecoder().Decode(responseErrorBytes, nil, &metav1.Status{}); err == nil {
if status, ok := obj.(*metav1.Status); ok {
return &apierrors.StatusError{ErrStatus: *status}
return &upgradeFailureError{Cause: &apierrors.StatusError{ErrStatus: *status}}
}
}
responseError = string(responseErrorBytes)
responseError = strings.TrimSpace(responseError)
}
return fmt.Errorf("unable to upgrade connection: %s", responseError)
return &upgradeFailureError{Cause: fmt.Errorf("unable to upgrade connection: %s", responseError)}
}

// upgradeFailureError encapsulates the cause for why the streaming
// upgrade request failed. Implements error interface.
type upgradeFailureError struct {
Cause error
}

func (u *upgradeFailureError) Error() string {
return u.Cause.Error()
}

func (u *upgradeFailureError) Unwrap() error {
return u.Cause
}

func isTeleportUpgradeFailure(err error) bool {
var upgradeErr *upgradeFailureError
return errors.As(err, &upgradeErr)
}
80 changes: 0 additions & 80 deletions lib/kube/proxy/roundtrip_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"net/http"

"github.com/coreos/go-semver/semver"
gwebsocket "github.com/gorilla/websocket"
"github.com/gravitational/trace"
"k8s.io/apimachinery/pkg/util/httpstream"
Expand All @@ -31,9 +30,7 @@ import (
"k8s.io/apimachinery/pkg/version"
kwebsocket "k8s.io/client-go/transport/websocket"

"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/utils"
)

// WebsocketRoundTripper knows how to upgrade an HTTP request to one that supports
Expand Down Expand Up @@ -133,80 +130,3 @@ func kubernetesSupportsExecSubprotocolV5(serverVersion *version.Info) bool {

return parsedVersion.AtLeast(kubeExecSubprotocolV5MinVersion)
}

var kubePortforwardSPDYOverWebsocket = func() *versionUtil.Version {
const kubePortforwardSPDYOverWebsocketVersion = "v1.31.0"
return versionUtil.MustParse(kubePortforwardSPDYOverWebsocketVersion)
}()

func kubernetesSupportsPortTunnedledSPDY(serverVersion *version.Info) bool {
if serverVersion == nil {
return false
}
parsedVersion, err := versionUtil.ParseSemantic(serverVersion.GitVersion)
if err != nil {
return false
}
return parsedVersion.AtLeast(kubePortforwardSPDYOverWebsocket)
}

// versionWithTunneledSPDY is the version of Teleport that starts
// supporting SPDY over Websockets for portforward.
var versionWithTunneledSPDY = semver.New(utils.VersionBeforeAlpha("17.0.0"))

// teleportVersionInterface is an interface that allows to get the Teleport version of
// a kube server.
// TODO(tigrato): DELETE IN 18.0.0
type teleportVersionInterface interface {
GetTeleportVersion() string
}

// allServersSupportTunneledSPDY checks if all paths for this sessions support
// SPDY over websocket subprotocol.
// If all of them do and target kubernetes cluster supports it as well
// we can use websocket dialer, otherwise we'll use the SPDY dialer.
func (f *Forwarder) allServersSupportTunneledSPDY(sess *clusterSession) bool {
// If the cluster is remote, we need to check if all remote proxies
// support SPDY over websocket
if sess.teleportCluster.isRemote {
proxies, err := f.getRemoteClusterProxies(sess.teleportCluster.name)
return err == nil && allServersSupportTunneledSPDY(proxies)
}
// If the cluster is not remote, validate the kube services support of
// SPDY over websocket
return allServersSupportTunneledSPDY(sess.kubeServers)
}

// allServersSupportExecSubprotocolV5 returns true if all servers in the list
// support SPDY over websockets.
// TODO(tigrato): DELETE IN 18.0.0
func allServersSupportTunneledSPDY[T teleportVersionInterface](servers []T) bool {
if len(servers) == 0 {
return false
}

for _, server := range servers {
serverVersion := server.GetTeleportVersion()
semVer, err := semver.NewVersion(serverVersion)
if err != nil || semVer.LessThan(*versionWithTunneledSPDY) {
return false
}
}
return true
}

// getRemoteClusterProxies returns a list of proxies registered at the remote cluster.
// It's used to determine whether the remote cluster supports SPDY over websocket.
func (f *Forwarder) getRemoteClusterProxies(clusterName string) ([]types.Server, error) {
targetCluster, err := f.cfg.ReverseTunnelSrv.GetSite(clusterName)
if err != nil {
return nil, trace.Wrap(err)
}
// Get the remote cluster's cache.
caching, err := targetCluster.CachingAccessPoint()
if err != nil {
return nil, trace.Wrap(err)
}
proxies, err := caching.GetProxies()
return proxies, trace.Wrap(err)
}
52 changes: 34 additions & 18 deletions lib/kube/proxy/testing/kube_server/kube_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
portforwardconstants "k8s.io/apimachinery/pkg/util/portforward"
apiremotecommand "k8s.io/apimachinery/pkg/util/remotecommand"
versionUtil "k8s.io/apimachinery/pkg/util/version"
apimachineryversion "k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
"k8s.io/client-go/tools/portforward"
Expand Down Expand Up @@ -149,21 +150,22 @@ type KubeUpgradeRequests struct {
}

type KubeMockServer struct {
router *httprouter.Router
log *log.Entry
server *httptest.Server
TLS *tls.Config
URL string
Address string
CA []byte
deletedResources map[deletedResource][]string
getPodError *metav1.Status
execPodError *metav1.Status
portforwardError *metav1.Status
mu sync.Mutex
version *apimachineryversion.Info
KubeExecRequests KubeUpgradeRequests
KubePortforward KubeUpgradeRequests
router *httprouter.Router
log *log.Entry
server *httptest.Server
TLS *tls.Config
URL string
Address string
CA []byte
deletedResources map[deletedResource][]string
getPodError *metav1.Status
execPodError *metav1.Status
portforwardError *metav1.Status
mu sync.Mutex
version *apimachineryversion.Info
KubeExecRequests KubeUpgradeRequests
KubePortforward KubeUpgradeRequests
supportsTunneledSPDY bool
}

// NewKubeAPIMock creates Kubernetes API server for handling exec calls.
Expand All @@ -179,8 +181,9 @@ func NewKubeAPIMock(opts ...Option) (*KubeMockServer, error) {
log: log.NewEntry(log.New()),
deletedResources: make(map[deletedResource][]string),
version: &apimachineryversion.Info{
Major: "1",
Minor: "20",
Major: "1",
Minor: "20",
GitVersion: "1.20.0",
},
}

Expand All @@ -196,6 +199,14 @@ func NewKubeAPIMock(opts ...Option) (*KubeMockServer, error) {
s.TLS = s.server.TLS
s.Address = strings.TrimPrefix(s.server.URL, "https://")
s.URL = s.server.URL

parsedVersion, err := versionUtil.ParseSemantic(s.version.GitVersion)
if err != nil {
return nil, trace.Wrap(err, "failed to parse version")
}
const minSupportVersion = "v1.31.0"
s.supportsTunneledSPDY = parsedVersion.AtLeast(versionUtil.MustParse(minSupportVersion))

return s, nil
}

Expand Down Expand Up @@ -758,6 +769,11 @@ func (s *KubeMockServer) portforward(w http.ResponseWriter, req *http.Request, p
var err error
var conn httpstream.Connection
if wsstream.IsWebSocketRequestWithTunnelingProtocol(req) {
if !s.supportsTunneledSPDY {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("server does not support tunneled SPDY"))
return nil, nil
}
s.KubePortforward.Websocket.Add(1)
// Try to upgrade the websocket connection.
// Beyond this point, we don't need to write errors to the response.
Expand Down Expand Up @@ -792,7 +808,7 @@ func (s *KubeMockServer) portforward(w http.ResponseWriter, req *http.Request, p
}

if conn == nil {
err = trace.ConnectionProblem(nil, "unable to upgrade SPDY connection")
err = trace.ConnectionProblem(nil, "unable to upgrade connection")
return nil, err
}
defer conn.Close()
Expand Down

0 comments on commit 17998d9

Please sign in to comment.