Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-enable PingSource TLS E2E test #7082

Merged
merged 10 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/adapter/mtping/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,19 @@ func (a *cronJobsRunner) newPingSourceClient(source *sourcesv1.PingSource) (adap
env.Sink = source.Status.SinkURI.String()
env.CACerts = source.Status.SinkCACerts

a.Logger.Debugw("Creating client",
"namespace", source.Namespace,
"name", source.Name,
"env", env,
"source", source,
)

cfg := adapter.ClientConfig{
Env: &env,
CeOverrides: source.Spec.CloudEventOverrides,
Reporter: a.clientConfig.Reporter,
CrStatusEventClient: a.clientConfig.CrStatusEventClient,
Options: a.clientConfig.Options,
Client: a.clientConfig.Client,
}

return adapter.NewClient(cfg)
Expand Down
98 changes: 54 additions & 44 deletions pkg/adapter/mtping/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,19 @@ import (
"bytes"
"context"
"encoding/base64"
"net/http"
"net/http/httptest"
"reflect"
"sync"
"testing"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
bindingshttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/pointer"

"knative.dev/pkg/apis"
Expand All @@ -38,7 +43,6 @@ import (
rectesting "knative.dev/pkg/reconciler/testing"

"knative.dev/eventing/pkg/adapter/v2"
adaptertesting "knative.dev/eventing/pkg/adapter/v2/test"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
"knative.dev/eventing/pkg/eventingtls/eventingtlstesting"
)
Expand Down Expand Up @@ -77,11 +81,6 @@ func TestAddRunRemoveSchedules(t *testing.T) {
ContentType: cloudevents.TextPlain,
Data: sampleData,
},
Status: sourcesv1.PingSourceStatus{
SourceStatus: duckv1.SourceStatus{
SinkURI: &apis.URL{Path: "a sink"},
},
},
},
wantData: []byte(sampleData),
wantContentType: cloudevents.TextPlain,
Expand All @@ -101,11 +100,6 @@ func TestAddRunRemoveSchedules(t *testing.T) {
ContentType: cloudevents.TextPlain,
Data: sampleData,
},
Status: sourcesv1.PingSourceStatus{
SourceStatus: duckv1.SourceStatus{
SinkURI: &apis.URL{Path: "a sink"},
},
},
},
wantData: []byte(sampleData),
wantContentType: cloudevents.TextPlain,
Expand All @@ -123,11 +117,6 @@ func TestAddRunRemoveSchedules(t *testing.T) {
ContentType: cloudevents.TextPlain,
DataBase64: sampleDataBase64,
},
Status: sourcesv1.PingSourceStatus{
SourceStatus: duckv1.SourceStatus{
SinkURI: &apis.URL{Path: "a sink"},
},
},
},
wantData: decodeBase64(sampleDataBase64),
wantContentType: cloudevents.TextPlain,
Expand All @@ -145,11 +134,6 @@ func TestAddRunRemoveSchedules(t *testing.T) {
Data: sampleJSONData,
ContentType: cloudevents.ApplicationJSON,
},
Status: sourcesv1.PingSourceStatus{
SourceStatus: duckv1.SourceStatus{
SinkURI: &apis.URL{Path: "a sink"},
},
},
},
wantData: []byte(sampleJSONData),
wantContentType: cloudevents.ApplicationJSON,
Expand All @@ -167,11 +151,6 @@ func TestAddRunRemoveSchedules(t *testing.T) {
DataBase64: sampleJSONDataBase64,
ContentType: cloudevents.ApplicationJSON,
},
Status: sourcesv1.PingSourceStatus{
SourceStatus: duckv1.SourceStatus{
SinkURI: &apis.URL{Path: "a sink"},
},
},
},
wantData: decodeBase64(sampleJSONDataBase64),
wantContentType: cloudevents.ApplicationJSON,
Expand All @@ -189,11 +168,6 @@ func TestAddRunRemoveSchedules(t *testing.T) {
Data: sampleXmlData,
ContentType: cloudevents.ApplicationXML,
},
Status: sourcesv1.PingSourceStatus{
SourceStatus: duckv1.SourceStatus{
SinkURI: &apis.URL{Path: "a sink"},
},
},
},
wantData: []byte(sampleXmlData),
wantContentType: cloudevents.ApplicationXML,
Expand All @@ -203,9 +177,15 @@ func TestAddRunRemoveSchedules(t *testing.T) {
t.Run(n, func(t *testing.T) {
ctx, _ := rectesting.SetupFakeContext(t)
logger := logging.FromContext(ctx)
ce := adaptertesting.NewTestClient()

runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger)
h, events := eventsAccumulator()

s := httptest.NewServer(h)
defer s.Close()
url, _ := apis.ParseURL(s.URL)

runner := NewCronJobsRunner(adapter.ClientConfig{}, kubeclient.Get(ctx), logger)
tc.src.Status.SinkURI = url
entryId := runner.AddSchedule(tc.src)

entry := runner.cron.Entry(entryId)
Expand All @@ -215,7 +195,7 @@ func TestAddRunRemoveSchedules(t *testing.T) {

entry.Job.Run()

validateSent(t, ce, tc.wantData, tc.wantContentType, tc.src.Spec.CloudEventOverrides.Extensions)
validateSent(t, *events, tc.wantData, tc.wantContentType, tc.src.Spec.CloudEventOverrides.Extensions)

runner.RemoveSchedule(entryId)

Expand Down Expand Up @@ -331,9 +311,8 @@ func TestSendEventsTLS(t *testing.T) {
func TestStartStopCron(t *testing.T) {
ctx, _ := rectesting.SetupFakeContext(t)
logger := logging.FromContext(ctx)
ce := adaptertesting.NewTestClient()

runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger)
runner := NewCronJobsRunner(adapter.ClientConfig{}, kubeclient.Get(ctx), logger)

ctx, cancel := context.WithCancel(context.Background())
wctx, wcancel := context.WithCancel(context.Background())
Expand All @@ -360,9 +339,17 @@ func TestStartStopCronDelayWait(t *testing.T) {
}
ctx, _ := rectesting.SetupFakeContext(t)
logger := logging.FromContext(ctx)
ce := adaptertesting.NewTestClientWithDelay(time.Second * 5)

runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger)
h, events := eventsAccumulator()

s := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
time.Sleep(5 * time.Second)
h.ServeHTTP(writer, request)
}))
defer s.Close()
url, _ := apis.ParseURL(s.URL)

runner := NewCronJobsRunner(adapter.ClientConfig{}, kubeclient.Get(ctx), logger)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -384,7 +371,7 @@ func TestStartStopCronDelayWait(t *testing.T) {
},
Status: sourcesv1.PingSourceStatus{
SourceStatus: duckv1.SourceStatus{
SinkURI: &apis.URL{Path: "a delayed sink"},
SinkURI: url,
},
},
})
Expand All @@ -398,15 +385,18 @@ func TestStartStopCronDelayWait(t *testing.T) {

runner.Stop() // cron job because of delay is still running.

validateSent(t, ce, []byte("some delayed data"), cloudevents.TextPlain, nil)
validateSent(t, *events, []byte("some delayed data"), cloudevents.TextPlain, nil)
}

func validateSent(t *testing.T, ce *adaptertesting.TestCloudEventsClient, wantData []byte, wantContentType string, extensions map[string]string) {
if got := len(ce.Sent()); got != 1 {
t.Error("Expected 1 event to be sent, got", got)
func validateSent(t *testing.T, events []cloudevents.Event, wantData []byte, wantContentType string, extensions map[string]string) {
err := wait.PollImmediate(time.Second, time.Minute, func() (done bool, err error) {
return len(events) == 1, nil
})
if err != nil {
t.Fatal("Expected 1 event to be sent, got", len(events))
}

event := ce.Sent()[0]
event := events[0]

if gotContentType := event.DataContentType(); gotContentType != wantContentType {
t.Errorf("Expected event with contentType=%q to be sent, got %q", wantContentType, gotContentType)
Expand Down Expand Up @@ -436,3 +426,23 @@ func validateSent(t *testing.T, ce *adaptertesting.TestCloudEventsClient, wantDa
}
}
}

func eventsAccumulator() (http.Handler, *[]cloudevents.Event) {
var mu sync.Mutex
events := make([]cloudevents.Event, 0, 8)

return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
m := bindingshttp.NewMessageFromHttpRequest(request)
event, err := binding.ToEvent(request.Context(), m)
if err != nil {
writer.WriteHeader(http.StatusBadRequest)
return
}

mu.Lock()
defer mu.Unlock()

events = append(events, *event)
writer.WriteHeader(http.StatusOK)
}), &events
}
pierDipi marked this conversation as resolved.
Show resolved Hide resolved
50 changes: 35 additions & 15 deletions pkg/adapter/v2/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import (
"net/url"
"time"

obshttp "github.com/cloudevents/sdk-go/observability/opencensus/v2/http"
cloudevents "github.com/cloudevents/sdk-go/v2"
ceclient "github.com/cloudevents/sdk-go/v2/client"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/protocol"
"github.com/cloudevents/sdk-go/v2/protocol/http"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/plugin/ochttp/propagation/tracecontext"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/tracing/propagation/tracecontextb3"

Expand All @@ -53,7 +53,9 @@ type Client interface {
var newClientHTTPObserved = NewClientHTTPObserved

func NewClientHTTPObserved(topt []http.Option, copt []ceclient.Option) (Client, error) {
t, err := obshttp.NewObservedHTTP(topt...)
t, err := http.New(append(topt,
http.WithMiddleware(tracecontextMiddleware),
)...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -108,8 +110,6 @@ type ClientConfig struct {
Reporter source.StatsReporter
CrStatusEventClient *crstatusevent.CRStatusEventClient
Options []http.Option

Client Client
}

type clientConfigKey struct{}
Expand All @@ -127,16 +127,12 @@ func GetClientConfig(ctx context.Context) ClientConfig {
}

func NewClient(cfg ClientConfig) (Client, error) {
if cfg.Client != nil {
return cfg.Client, nil
}

transport := &ochttp.Transport{
Base: nethttp.DefaultTransport.(*nethttp.Transport),
Propagation: tracecontextb3.TraceContextEgress,
}

pOpts := make([]http.Option, 0)
var closeIdler closeIdler = nethttp.DefaultTransport.(*nethttp.Transport)

ceOverrides := cfg.CeOverrides
if cfg.Env != nil {
Expand All @@ -152,16 +148,16 @@ func NewClient(cfg ClientConfig) (Client, error) {
clientConfig := eventingtls.NewDefaultClientConfig()
clientConfig.CACerts = cfg.Env.GetCACerts()

httpTransport := nethttp.DefaultTransport.(*nethttp.Transport).Clone()
httpTransport.TLSClientConfig, err = eventingtls.GetTLSClientConfig(clientConfig)
tlsConfig, err := eventingtls.GetTLSClientConfig(clientConfig)
if err != nil {
return nil, err
}

closeIdler = httpTransport
httpsTransport := transport.Base.(*nethttp.Transport).Clone()
httpsTransport.TLSClientConfig = tlsConfig

transport = &ochttp.Transport{
Base: httpTransport,
Base: httpsTransport,
Propagation: tracecontextb3.TraceContextEgress,
}
}
Expand All @@ -176,7 +172,11 @@ func NewClient(cfg ClientConfig) (Client, error) {
pOpts = append(pOpts, http.WithHeader(apis.KnNamespaceHeader, cfg.Env.GetNamespace()))
}

pOpts = append(pOpts, http.WithRoundTripper(transport))
httpClient := nethttp.Client{Transport: roundTripperDecorator(transport)}

// Important: prepend HTTP client option to make sure that other options are applied to this
// client and not to the default client.
pOpts = append([]http.Option{http.WithClient(httpClient)}, pOpts...)

// Make sure that explicitly set options have priority
opts := append(pOpts, cfg.Options...)
Expand All @@ -191,7 +191,7 @@ func NewClient(cfg ClientConfig) (Client, error) {
}
return &client{
ceClient: ceClient,
closeIdler: closeIdler,
closeIdler: transport.Base.(*nethttp.Transport),
ceOverrides: ceOverrides,
reporter: cfg.Reporter,
crStatusEventClient: cfg.CrStatusEventClient,
Expand Down Expand Up @@ -341,3 +341,23 @@ func MetricTagFromContext(ctx context.Context) *MetricTag {
ResourceGroup: "unknown",
}
}

func roundTripperDecorator(roundTripper nethttp.RoundTripper) nethttp.RoundTripper {
return &ochttp.Transport{
Propagation: &tracecontext.HTTPFormat{},
Base: roundTripper,
FormatSpanName: formatSpanName,
}
}

func formatSpanName(r *nethttp.Request) string {
return "cloudevents.http." + r.URL.Path
}

func tracecontextMiddleware(h nethttp.Handler) nethttp.Handler {
return &ochttp.Handler{
Propagation: &tracecontext.HTTPFormat{},
Handler: h,
FormatSpanName: formatSpanName,
}
}
4 changes: 4 additions & 0 deletions pkg/adapter/v2/cloudevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func TestNewCloudEventsClient_request(t *testing.T) {
}

func TestTLS(t *testing.T) {
t.Parallel()

ctx, _ := SetupFakeContext(t)
ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, 20*time.Millisecond, 5)
Expand Down Expand Up @@ -344,7 +345,10 @@ func TestTLS(t *testing.T) {
}

for _, tc := range tt {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

reporter, err := source.NewStatsReporter()
assert.Nil(t, err)

Expand Down
4 changes: 1 addition & 3 deletions test/e2e-common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ readonly TMP_DIR
readonly KNATIVE_DEFAULT_NAMESPACE="knative-eventing"

# This the namespace used to install and test Knative Eventing.
export SYSTEM_NAMESPACE
SYSTEM_NAMESPACE="${SYSTEM_NAMESPACE:-"knative-eventing-"$(head -c 128 < \
/dev/urandom | LC_CTYPE=C tr -dc 'a-z0-9' | fold -w 10 | head -n 1)}"
export SYSTEM_NAMESPACE=${SYSTEM_NAMESPACE:-"knative-eventing"}

CERT_MANAGER_NAMESPACE="cert-manager"
export CERT_MANAGER_NAMESPACE
Expand Down
1 change: 0 additions & 1 deletion test/rekt/pingsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func TestPingSourceWithSinkRef(t *testing.T) {
}

func TestPingSourceTLS(t *testing.T) {
t.Skip("seems flaky")
t.Parallel()

ctx, env := global.Environment(
Expand Down
Loading