Skip to content

Commit

Permalink
Cache SSH connections
Browse files Browse the repository at this point in the history
The underlying SSH connections are kept open and are reused
across several SSH sessions. This is due to upstream issues in
which concurrent/parallel SSH connections may lead to instability.

golang/go#51926
golang/go#27140
Signed-off-by: Paulo Gomes <[email protected]>
  • Loading branch information
Paulo Gomes committed Mar 25, 2022
1 parent c2b58f0 commit 9effa90
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 43 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/onsi/gomega v1.18.1
github.com/otiai10/copy v1.7.0
github.com/spf13/pflag v1.0.5
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd
golang.org/x/crypto v0.0.0-20220321153916-2c7772ba3064
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/api v0.73.0
gotest.tools v2.2.0+incompatible
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,8 @@ golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0
golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd h1:XcWmESyNjXJMLahc3mqVQJcgSTDxFxhETVlfk9uGc38=
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220321153916-2c7772ba3064 h1:S25/rfnfsMVgORT4/J61MJ7rdyseOZOyvLIrZEZ7s6s=
golang.org/x/crypto v0.0.0-20220321153916-2c7772ba3064/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func main() {
}()

if managed.Enabled() {
managed.InitManagedTransport()
managed.InitManagedTransport(ctrl.Log.WithName("managed-transport"))
}

setupLog.Info("starting manager")
Expand Down
6 changes: 6 additions & 0 deletions pkg/git/libgit2/managed/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,12 @@ func createClientRequest(targetUrl string, action git2go.SmartServiceAction, t *
}

func (t *httpSmartSubtransport) Close() error {
traceLog.Info("[http]: httpSmartSubtransport.Close()")
return nil
}

func (t *httpSmartSubtransport) Free() {
traceLog.Info("[http]: httpSmartSubtransport.Free()")
}

type httpSmartSubtransportStream struct {
Expand Down Expand Up @@ -277,6 +279,8 @@ func (self *httpSmartSubtransportStream) Write(buf []byte) (int, error) {

func (self *httpSmartSubtransportStream) Free() {
if self.resp != nil {
traceLog.Info("[http]: httpSmartSubtransportStream.Free()")

// ensure body is fully processed and closed
// for increased likelihood of transport reuse in HTTP/1.x.
// it should not be a problem to do this more than once.
Expand Down Expand Up @@ -344,6 +348,7 @@ func (self *httpSmartSubtransportStream) sendRequest() error {
}

req.SetBasicAuth(userName, password)
traceLog.Info("[http]: new request", "method", req.Method, "URL", req.URL)
resp, err = self.client.Do(req)
if err != nil {
return err
Expand All @@ -363,6 +368,7 @@ func (self *httpSmartSubtransportStream) sendRequest() error {
return err
}

traceLog.Info("[http]: POST redirect", "URL", self.req.URL)
continue
}

Expand Down
13 changes: 12 additions & 1 deletion pkg/git/libgit2/managed/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package managed
import (
"sync"
"time"

"github.com/fluxcd/pkg/runtime/logger"
"github.com/go-logr/logr"
)

var (
Expand All @@ -34,6 +37,9 @@ var (
// regardless of the current operation (i.e. connection,
// handshake, put/get).
fullHttpClientTimeOut time.Duration = 10 * time.Minute

debugLog logr.Logger
traceLog logr.Logger
)

// InitManagedTransport initialises HTTP(S) and SSH managed transport
Expand All @@ -47,9 +53,14 @@ var (
//
// This function will only register managed transports once, subsequent calls
// leads to no-op.
func InitManagedTransport() error {
func InitManagedTransport(log logr.Logger) error {
var err error

once.Do(func() {
log.Info("Enabling experimental managed transport")
debugLog = log.V(logger.DebugLevel)
traceLog = log.V(logger.TraceLevel)

if err = registerManagedHTTP(); err != nil {
return
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/git/libgit2/managed/managed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/fluxcd/pkg/gittestserver"
"github.com/fluxcd/pkg/ssh"
"github.com/fluxcd/source-controller/pkg/git"
"github.com/go-logr/logr"

git2go "github.com/libgit2/git2go/v33"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -247,7 +248,7 @@ func TestManagedTransport_E2E(t *testing.T) {
defer server.StopSSH()

// Force managed transport to be enabled
InitManagedTransport()
InitManagedTransport(logr.Discard())

repoPath := "test.git"
err = server.InitRepo("../testdata/git/repo", git.DefaultBranch, repoPath)
Expand Down Expand Up @@ -312,7 +313,7 @@ func TestManagedTransport_HandleRedirect(t *testing.T) {
defer os.RemoveAll(tmpDir)

// Force managed transport to be enabled
InitManagedTransport()
InitManagedTransport(logr.Discard())

// GitHub will cause a 301 and redirect to https
repo, err := git2go.Clone("http://github.com/stefanprodan/podinfo", tmpDir, &git2go.CloneOptions{
Expand Down
151 changes: 113 additions & 38 deletions pkg/git/libgit2/managed/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"net/url"
"runtime"
"strings"
"sync"
"time"

"golang.org/x/crypto/ssh"
Expand All @@ -63,6 +64,17 @@ import (
// registerManagedSSH registers a Go-native implementation of
// SSH transport that doesn't rely on any lower-level libraries
// such as libssh2.
//
// The underlying SSH connections are kept open and are reused
// across several SSH sessions. This is due to upstream issues in
// which concurrent/parallel SSH connections may lead to instability.
//
// Connections are created on first attempt to use a given remote. The
// connection is removed from the cache on the first failed session related
// operation.
//
// https://github.com/golang/go/issues/51926
// https://github.com/golang/go/issues/27140
func registerManagedSSH() error {
for _, protocol := range []string{"ssh", "ssh+git", "git+ssh"} {
_, err := git2go.NewRegisteredSmartTransport(protocol, false, sshSmartSubtransportFactory)
Expand Down Expand Up @@ -90,6 +102,9 @@ type sshSmartSubtransport struct {
currentStream *sshSmartSubtransportStream
}

var aMux sync.RWMutex
var sshClients map[string]*ssh.Client = make(map[string]*ssh.Client)

func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
Expand Down Expand Up @@ -136,7 +151,14 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
}
defer cred.Free()

sshConfig, err := getSSHConfigFromCredential(cred)
var addr string
if u.Port() != "" {
addr = fmt.Sprintf("%s:%s", u.Hostname(), u.Port())
} else {
addr = fmt.Sprintf("%s:22", u.Hostname())
}

ckey, sshConfig, err := cacheKeyAndConfig(addr, cred)
if err != nil {
return nil, err
}
Expand All @@ -157,52 +179,66 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
return t.transport.SmartCertificateCheck(cert, true, hostname)
}

var addr string
if u.Port() != "" {
addr = fmt.Sprintf("%s:%s", u.Hostname(), u.Port())
} else {
addr = fmt.Sprintf("%s:22", u.Hostname())
aMux.RLock()
if c, ok := sshClients[ckey]; ok {
traceLog.Info("[ssh]: cache hit", "remoteAddress", addr)
t.client = c
}
aMux.RUnlock()

if t.client == nil {
traceLog.Info("[ssh]: cache miss", "remoteAddress", addr)

aMux.Lock()
defer aMux.Unlock()

// In some scenarios the ssh handshake can hang indefinitely at
// golang.org/x/crypto/ssh.(*handshakeTransport).kexLoop.
//
// xref: https://github.com/golang/go/issues/51926
done := make(chan error, 1)
go func() {
t.client, err = ssh.Dial("tcp", addr, sshConfig)
done <- err
}()

dialTimeout := sshConfig.Timeout + (30 * time.Second)

// In some scenarios the ssh handshake can hang indefinitely at
// golang.org/x/crypto/ssh.(*handshakeTransport).kexLoop.
//
// xref: https://github.com/golang/go/issues/51926
done := make(chan error, 1)
go func() {
t.client, err = ssh.Dial("tcp", addr, sshConfig)
done <- err
}()

select {
case doneErr := <-done:
if doneErr != nil {
err = fmt.Errorf("ssh.Dial: %w", doneErr)
select {
case doneErr := <-done:
if doneErr != nil {
err = fmt.Errorf("ssh.Dial: %w", doneErr)
}
case <-time.After(dialTimeout):
err = fmt.Errorf("timed out waiting for ssh.Dial after %s", dialTimeout)
}
case <-time.After(sshConfig.Timeout + (5 * time.Second)):
err = fmt.Errorf("timed out waiting for ssh.Dial")
}

if err != nil {
return nil, err
if err != nil {
return nil, err
}

sshClients[ckey] = t.client
}

t.session, err = t.client.NewSession()
if err != nil {
traceLog.Info("[ssh]: creating new ssh session")
if t.session, err = t.client.NewSession(); err != nil {
discardCachedSshClient(ckey)
return nil, err
}

t.stdin, err = t.session.StdinPipe()
if err != nil {
if t.stdin, err = t.session.StdinPipe(); err != nil {
discardCachedSshClient(ckey)
return nil, err
}

t.stdout, err = t.session.StdoutPipe()
if err != nil {
if t.stdout, err = t.session.StdoutPipe(); err != nil {
discardCachedSshClient(ckey)
return nil, err
}

traceLog.Info("[ssh]: run on remote", "cmd", cmd)
if err := t.session.Start(cmd); err != nil {
discardCachedSshClient(ckey)
return nil, err
}

Expand All @@ -216,6 +252,8 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi

func (t *sshSmartSubtransport) Close() error {
var returnErr error

traceLog.Info("[ssh]: sshSmartSubtransport.Close()")
t.currentStream = nil
if t.client != nil {
if err := t.stdin.Close(); err != nil {
Expand All @@ -224,12 +262,12 @@ func (t *sshSmartSubtransport) Close() error {
t.client = nil
}
if t.session != nil {

// In some scenarios session.Wait() can hang indefinitely.
// Here we force a timeout to skip the wait and continue
// with closing the session.
done := make(chan error, 1)
go func() {
traceLog.Info("[ssh]: session.Wait()")
err := t.session.Wait()
done <- err
}()
Expand All @@ -243,6 +281,7 @@ func (t *sshSmartSubtransport) Close() error {
returnErr = fmt.Errorf("timed out waiting for session.Wait()")
}

traceLog.Info("[ssh]: session.Close()")
if err := t.session.Close(); err != nil {
returnErr = fmt.Errorf("cannot close session: %w", err)
}
Expand All @@ -252,6 +291,7 @@ func (t *sshSmartSubtransport) Close() error {
}

func (t *sshSmartSubtransport) Free() {
traceLog.Info("[ssh]: sshSmartSubtransport.Free()")
}

type sshSmartSubtransportStream struct {
Expand All @@ -267,19 +307,20 @@ func (stream *sshSmartSubtransportStream) Write(buf []byte) (int, error) {
}

func (stream *sshSmartSubtransportStream) Free() {
traceLog.Info("[ssh]: sshSmartSubtransportStream.Free()")
}

func getSSHConfigFromCredential(cred *git2go.Credential) (*ssh.ClientConfig, error) {
func cacheKeyAndConfig(remoteAddress string, cred *git2go.Credential) (string, *ssh.ClientConfig, error) {
username, _, privatekey, passphrase, err := cred.GetSSHKey()
if err != nil {
return nil, err
return "", nil, err
}

var pemBytes []byte
if cred.Type() == git2go.CredentialTypeSSHMemory {
pemBytes = []byte(privatekey)
} else {
return nil, fmt.Errorf("file based SSH credential is not supported")
return "", nil, fmt.Errorf("file based SSH credential is not supported")
}

var key ssh.Signer
Expand All @@ -290,12 +331,46 @@ func getSSHConfigFromCredential(cred *git2go.Credential) (*ssh.ClientConfig, err
}

if err != nil {
return nil, err
return "", nil, err
}

return &ssh.ClientConfig{
ck := cacheKey(remoteAddress, username, key.PublicKey().Marshal())
cfg := &ssh.ClientConfig{
User: username,
Auth: []ssh.AuthMethod{ssh.PublicKeys(key)},
Timeout: sshConnectionTimeOut,
}, nil
}

return ck, cfg, nil
}

// cacheKey generates a cache key that is multi-tenancy safe.
//
// Stablishing multiple and concurrent ssh connections leads to stability
// issues documented above. However, the caching/sharing of already stablished
// connections could represent a vector for users to bypass the ssh authentication
// mechanism.
//
// cacheKey tries to ensure that connections are only shared by users that
// have the exact same remoteAddress and credentials. An important assumption
// here is that public key is always derived from the private key in cacheKeyAndConfig.
func cacheKey(remoteAddress, userName string, pubKey []byte) string {
h := sha256.New()

v := fmt.Sprintf("%s-%s-%v", remoteAddress, userName, pubKey)

h.Write([]byte(v))
return fmt.Sprintf("%x", h.Sum(nil))
}

// discardCachedSshClient discards the cached ssh client, forcing the next git operation
// to create a new one via ssh.Dial.
func discardCachedSshClient(key string) {
aMux.Lock()
defer aMux.Unlock()

if _, found := sshClients[key]; found {
traceLog.Info("[ssh]: discard cached ssh client")
delete(sshClients, key)
}
}
Loading

0 comments on commit 9effa90

Please sign in to comment.