Skip to content

Commit

Permalink
All Hops Encrypted: TLS between activator and queue-Proxy (#12815)
Browse files Browse the repository at this point in the history
* All Hops Encrypted: TLS between activator and queue-Proxy

Fix #12502
Fix #12503

* Large capital

* Fix review comments

* Refactor loop

* Add TODO comment
  • Loading branch information
nak3 authored Apr 19, 2022
1 parent 98bb924 commit 6ec4509
Show file tree
Hide file tree
Showing 17 changed files with 285 additions and 33 deletions.
Binary file added cmd/activator/.main.go.swp
Binary file not shown.
33 changes: 32 additions & 1 deletion cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"log"
Expand Down Expand Up @@ -153,6 +154,36 @@ func main() {
logger.Fatalw("Failed to construct network config", zap.Error(err))
}

// Enable TLS against queue-proxy when the CA and SA are specified.
tlsEnabled := networkConfig.QueueProxyCA != "" && networkConfig.QueueProxySAN != ""

// Enable TLS client when queue-proxy-ca is specified.
// At this moment activator with TLS does not disable HTTP.
// See also https://github.com/knative/serving/issues/12808.
if tlsEnabled {
caSecret, err := kubeClient.CoreV1().Secrets(system.Namespace()).Get(ctx, networkConfig.QueueProxyCA, metav1.GetOptions{})
if err != nil {
logger.Fatalw("Failed to get secret", zap.Error(err))
}

pool, err := x509.SystemCertPool()
if err != nil {
pool = x509.NewCertPool()
}

if ok := pool.AppendCertsFromPEM(caSecret.Data["ca.crt"]); !ok {
logger.Fatalw("Failed to append ca cert to the RootCAs")
}

tlsConf := &tls.Config{
RootCAs: pool,
InsecureSkipVerify: false,
ServerName: networkConfig.QueueProxySAN,
MinVersion: tls.VersionTLS12,
}
transport = pkgnet.NewProxyAutoTLSTransport(env.MaxIdleProxyConns, env.MaxIdleProxyConnsPerHost, tlsConf)
}

// Start throttler.
throttler := activatornet.NewThrottler(ctx, env.PodIP)
go throttler.Run(ctx, transport, networkConfig.EnableMeshPodAddressability, networkConfig.MeshCompatibilityMode)
Expand Down Expand Up @@ -188,7 +219,7 @@ func main() {

// Create activation handler chain
// Note: innermost handlers are specified first, ie. the last handler in the chain will be executed first
ah := activatorhandler.New(ctx, throttler, transport, networkConfig.EnableMeshPodAddressability, logger)
ah := activatorhandler.New(ctx, throttler, transport, networkConfig.EnableMeshPodAddressability, logger, tlsEnabled)
ah = concurrencyReporter.Handler(ah)
ah = activatorhandler.NewTracingHandler(ah)
reqLogHandler, err := pkghttp.NewRequestLogHandler(ah, logging.NewSyncFileWriter(os.Stdout), "",
Expand Down
75 changes: 63 additions & 12 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,18 @@ const (
// This is to give networking a little bit more time to remove the pod
// from its configuration and propagate that to all loadbalancers and nodes.
drainSleepDuration = 30 * time.Second

// certPath is the path for the server certificate mounted by queue-proxy.
certPath = queue.CertDirectory + "/tls.crt"

// keyPath is the path for the server certificate key mounted by queue-proxy.
keyPath = queue.CertDirectory + "/tls.key"
)

type config struct {
ContainerConcurrency int `split_words:"true" required:"true"`
QueueServingPort string `split_words:"true" required:"true"`
QueueServingTLSPort string `split_words:"true" required:"true"`
UserPort string `split_words:"true" required:"true"`
RevisionTimeoutSeconds int `split_words:"true" required:"true"`
MaxDurationSeconds int `split_words:"true"` // optional
Expand Down Expand Up @@ -162,25 +169,52 @@ func main() {
if env.ConcurrencyStateEndpoint != "" {
concurrencyendpoint = queue.NewConcurrencyEndpoint(env.ConcurrencyStateEndpoint, env.ConcurrencyStateTokenPath)
}
mainServer, drain := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint)
servers := map[string]*http.Server{

// Enable TLS when certificate is mounted.
tlsEnabled := exists(logger, certPath) && exists(logger, keyPath)

mainServer, drain := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint, false)
httpServers := map[string]*http.Server{
"main": mainServer,
"admin": buildAdminServer(logger, drain),
"metrics": buildMetricsServer(promStatReporter, protoStatReporter),
"admin": buildAdminServer(logger, drain),
}
if env.EnableProfiling {
servers["profile"] = profiling.NewServer(profiling.NewHandler(logger, true))
httpServers["profile"] = profiling.NewServer(profiling.NewHandler(logger, true))
}

// Enable TLS server when activator server certs are mounted.
// At this moment activator with TLS does not disable HTTP.
// See also https://github.com/knative/serving/issues/12808.
var tlsServers map[string]*http.Server
if tlsEnabled {
mainTLSServer, drain := buildServer(ctx, env, probe, stats, logger, concurrencyendpoint, true /* enable TLS */)
tlsServers = map[string]*http.Server{
"tlsMain": mainTLSServer,
"tlsAdmin": buildAdminServer(logger, drain),
}
// Drop admin http server as we Use TLS for the admin server.
// TODO: The drain created with mainServer above is lost. Unify the two drain.
delete(httpServers, "admin")
}

errCh := make(chan error)
for name, server := range servers {
for name, server := range httpServers {
go func(name string, s *http.Server) {
// Don't forward ErrServerClosed as that indicates we're already shutting down.
if err := s.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
errCh <- fmt.Errorf("%s server failed to serve: %w", name, err)
}
}(name, server)
}
for name, server := range tlsServers {
go func(name string, s *http.Server) {
// Don't forward ErrServerClosed as that indicates we're already shutting down.
if err := s.ListenAndServeTLS(certPath, keyPath); err != nil && !errors.Is(err, http.ErrServerClosed) {
errCh <- fmt.Errorf("%s server failed to serve: %w", name, err)
}
}(name, server)
}

// Blocks until we actually receive a TERM signal or one of the servers
// exits unexpectedly. We fold both signals together because we only want
Expand All @@ -200,9 +234,9 @@ func main() {
drain()

// Removing the main server from the shutdown logic as we've already shut it down.
delete(servers, "main")
delete(httpServers, "main")

for serverName, srv := range servers {
for serverName, srv := range httpServers {
logger.Info("Shutting down server: ", serverName)
if err := srv.Shutdown(context.Background()); err != nil {
logger.Errorw("Failed to shutdown server", zap.String("server", serverName), zap.Error(err))
Expand All @@ -212,6 +246,14 @@ func main() {
}
}

func exists(logger *zap.SugaredLogger, filename string) bool {
_, err := os.Stat(filename)
if err != nil && !os.IsNotExist(err) {
logger.Fatalw(fmt.Sprintf("Failed to verify the file path %q", filename), zap.Error(err))
}
return err == nil
}

func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2 bool) *readiness.Probe {
coreProbe, err := readiness.DecodeProbe(encodedProbe)
if err != nil {
Expand All @@ -224,18 +266,20 @@ func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2
}

func buildServer(ctx context.Context, env config, probeContainer func() bool, stats *network.RequestStats, logger *zap.SugaredLogger,
ce *queue.ConcurrencyEndpoint) (server *http.Server, drain func()) {
ce *queue.ConcurrencyEndpoint, enableTLS bool) (server *http.Server, drain func()) {
// TODO: If TLS is enabled, execute probes twice and tracking two different sets of container health.

target := net.JoinHostPort("127.0.0.1", env.UserPort)

httpProxy := pkghttp.NewHeaderPruningReverseProxy(target, pkghttp.NoHostOverride, activator.RevisionHeaders)
httpProxy := pkghttp.NewHeaderPruningReverseProxy(target, pkghttp.NoHostOverride, activator.RevisionHeaders, false /* use HTTP */)
httpProxy.Transport = buildTransport(env, logger)
httpProxy.ErrorHandler = pkghandler.Error(logger)
httpProxy.BufferPool = network.NewBufferPool()
httpProxy.FlushInterval = network.FlushInterval

// TODO: During HTTP and HTTPS transition, counting concurrency could not be accurate. Count accurately.
breaker := buildBreaker(logger, env)
metricsSupported := supportsMetrics(ctx, logger, env)
metricsSupported := supportsMetrics(ctx, logger, env, enableTLS)
tracingEnabled := env.TracingConfigBackend != tracingconfig.None
concurrencyStateEnabled := env.ConcurrencyStateEndpoint != ""
firstByteTimeout := time.Duration(env.RevisionTimeoutSeconds) * time.Second
Expand Down Expand Up @@ -287,6 +331,10 @@ func buildServer(ctx context.Context, env config, probeContainer func() bool, st
composedHandler = requestLogHandler(logger, composedHandler, env)
}

if enableTLS {
return pkgnet.NewServer(":"+env.QueueServingTLSPort, composedHandler), drainer.Drain
}

return pkgnet.NewServer(":"+env.QueueServingPort, composedHandler), drainer.Drain
}

Expand Down Expand Up @@ -333,12 +381,15 @@ func buildBreaker(logger *zap.SugaredLogger, env config) *queue.Breaker {
return queue.NewBreaker(params)
}

func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config) bool {
func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config, enableTLS bool) bool {
// Keep it on HTTP because Metrics needs to be registered on either TLS server or non-TLS server.
if enableTLS {
return false
}
// Setup request metrics reporting for end-user metrics.
if env.ServingRequestMetricsBackend == "" {
return false
}

if err := setupMetricsExporter(ctx, logger, env.ServingRequestMetricsBackend, env.MetricsCollectorAddress); err != nil {
logger.Errorw("Error setting up request metrics exporter. Request metrics will be unavailable.", zap.Error(err))
return false
Expand Down
24 changes: 22 additions & 2 deletions pkg/activator/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"errors"
"net/http"
"net/http/httputil"
"strconv"
"strings"

"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/trace"
Expand All @@ -35,6 +37,7 @@ import (
"knative.dev/serving/pkg/activator"
activatorconfig "knative.dev/serving/pkg/activator/config"
pkghttp "knative.dev/serving/pkg/http"
"knative.dev/serving/pkg/networking"
"knative.dev/serving/pkg/queue"
"knative.dev/serving/pkg/reconciler/serverlessservice/resources/names"
)
Expand All @@ -53,10 +56,11 @@ type activationHandler struct {
throttler Throttler
bufferPool httputil.BufferPool
logger *zap.SugaredLogger
tls bool
}

// New constructs a new http.Handler that deals with revision activation.
func New(_ context.Context, t Throttler, transport http.RoundTripper, usePassthroughLb bool, logger *zap.SugaredLogger) http.Handler {
func New(_ context.Context, t Throttler, transport http.RoundTripper, usePassthroughLb bool, logger *zap.SugaredLogger, tlsEnabled bool) http.Handler {
return &activationHandler{
transport: transport,
tracingTransport: &ochttp.Transport{
Expand All @@ -67,6 +71,7 @@ func New(_ context.Context, t Throttler, transport http.RoundTripper, usePassthr
throttler: t,
bufferPool: network.NewBufferPool(),
logger: logger,
tls: tlsEnabled,
}
}

Expand Down Expand Up @@ -116,7 +121,14 @@ func (a *activationHandler) proxyRequest(revID types.NamespacedName, w http.Resp
if usePassthroughLb {
hostOverride = names.PrivateService(revID.Name) + "." + revID.Namespace
}
proxy := pkghttp.NewHeaderPruningReverseProxy(target, hostOverride, activator.RevisionHeaders)

var proxy *httputil.ReverseProxy
if a.tls {
proxy = pkghttp.NewHeaderPruningReverseProxy(useSecurePort(target), hostOverride, activator.RevisionHeaders, true /* uss HTTPS */)
} else {
proxy = pkghttp.NewHeaderPruningReverseProxy(target, hostOverride, activator.RevisionHeaders, false /* use HTTPS */)
}

proxy.BufferPool = a.bufferPool
proxy.Transport = a.transport
if tracingEnabled {
Expand All @@ -129,3 +141,11 @@ func (a *activationHandler) proxyRequest(revID types.NamespacedName, w http.Resp

proxy.ServeHTTP(w, r)
}

// useSecurePort replaces the default port with HTTPS port (8112).
// TODO: endpointsToDests() should support HTTPS instead of this overwrite but it needs metadata request to be encrypted.
// This code should be removed when https://github.com/knative/serving/issues/12821 was solved.
func useSecurePort(target string) string {
target = strings.Split(target, ":")[0]
return target + ":" + strconv.Itoa(networking.BackendHTTPSPort)
}
10 changes: 5 additions & 5 deletions pkg/activator/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestActivationHandler(t *testing.T) {

ctx, cancel, _ := rtesting.SetupFakeContextWithCancel(t)
defer cancel()
handler := New(ctx, test.throttler, rt, false /*usePassthroughLb*/, logging.FromContext(ctx))
handler := New(ctx, test.throttler, rt, false /*usePassthroughLb*/, logging.FromContext(ctx), false /* TLS */)

resp := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestActivationHandlerProxyHeader(t *testing.T) {
ctx, cancel, _ := rtesting.SetupFakeContextWithCancel(t)
defer cancel()

handler := New(ctx, fakeThrottler{}, rt, false /*usePassthroughLb*/, logging.FromContext(ctx))
handler := New(ctx, fakeThrottler{}, rt, false /*usePassthroughLb*/, logging.FromContext(ctx), false /* TLS */)

writer := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestActivationHandlerPassthroughLb(t *testing.T) {
ctx, cancel, _ := rtesting.SetupFakeContextWithCancel(t)
defer cancel()

handler := New(ctx, fakeThrottler{}, rt, true /*usePassthroughLb*/, logging.FromContext(ctx))
handler := New(ctx, fakeThrottler{}, rt, true /*usePassthroughLb*/, logging.FromContext(ctx), false /* TLS */)

writer := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestActivationHandlerTraceSpans(t *testing.T) {
oct.Finish()
}()

handler := New(ctx, fakeThrottler{}, rt, false /*usePassthroughLb*/, logging.FromContext(ctx))
handler := New(ctx, fakeThrottler{}, rt, false /*usePassthroughLb*/, logging.FromContext(ctx), false /* TLS */)

// Set up config store to populate context.
configStore := setupConfigStore(t, logging.FromContext(ctx))
Expand Down Expand Up @@ -345,7 +345,7 @@ func BenchmarkHandler(b *testing.B) {
}, nil
})

handler := New(ctx, fakeThrottler{}, rt, false /*usePassthroughLb*/, logging.FromContext(ctx))
handler := New(ctx, fakeThrottler{}, rt, false /*usePassthroughLb*/, logging.FromContext(ctx), false /* TLS */)

request := func() *http.Request {
req := httptest.NewRequest(http.MethodGet, "http://example.com", nil)
Expand Down
2 changes: 1 addition & 1 deletion pkg/activator/handler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func BenchmarkHandlerChain(b *testing.B) {
})

// Make sure to update this if the activator's main file changes.
ah := New(ctx, fakeThrottler{}, rt, false, logger)
ah := New(ctx, fakeThrottler{}, rt, false, logger, false /* TLS */)
ah = concurrencyReporter.Handler(ah)
ah = NewTracingHandler(ah)
ah, _ = pkghttp.NewRequestLogHandler(ah, io.Discard, "", nil, false)
Expand Down
8 changes: 6 additions & 2 deletions pkg/http/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@ const NoHostOverride = ""
// If hostOverride is not an empty string, the outgoing request's Host header will be
// replaced with that explicit value and the passthrough loadbalancing header will be
// set to enable pod-addressability.
func NewHeaderPruningReverseProxy(target, hostOverride string, headersToRemove []string) *httputil.ReverseProxy {
func NewHeaderPruningReverseProxy(target, hostOverride string, headersToRemove []string, useHTTPS bool) *httputil.ReverseProxy {
return &httputil.ReverseProxy{
Director: func(req *http.Request) {
req.URL.Scheme = "http"
if useHTTPS {
req.URL.Scheme = "https"
} else {
req.URL.Scheme = "http"
}
req.URL.Host = target

if hostOverride != NoHostOverride {
Expand Down
Loading

0 comments on commit 6ec4509

Please sign in to comment.