Skip to content

Commit

Permalink
Fix bug
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi committed Jul 17, 2023
1 parent 887cbc3 commit 6486546
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 10 deletions.
46 changes: 36 additions & 10 deletions pkg/adapter/v2/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import (
"net/url"
"time"

obshttp "github.com/cloudevents/sdk-go/observability/opencensus/v2/http"
cloudevents "github.com/cloudevents/sdk-go/v2"
ceclient "github.com/cloudevents/sdk-go/v2/client"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/protocol"
"github.com/cloudevents/sdk-go/v2/protocol/http"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/plugin/ochttp/propagation/tracecontext"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/tracing/propagation/tracecontextb3"

Expand All @@ -53,7 +53,9 @@ type Client interface {
var newClientHTTPObserved = NewClientHTTPObserved

func NewClientHTTPObserved(topt []http.Option, copt []ceclient.Option) (Client, error) {
t, err := obshttp.NewObservedHTTP(topt...)
t, err := http.New(append(topt,
http.WithMiddleware(tracecontextMiddleware),
)...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -126,11 +128,11 @@ func GetClientConfig(ctx context.Context) ClientConfig {

func NewClient(cfg ClientConfig) (Client, error) {
transport := &ochttp.Transport{
Base: nethttp.DefaultTransport.(*nethttp.Transport),
Propagation: tracecontextb3.TraceContextEgress,
}

pOpts := make([]http.Option, 0)
var closeIdler closeIdler = nethttp.DefaultTransport.(*nethttp.Transport)

ceOverrides := cfg.CeOverrides
if cfg.Env != nil {
Expand All @@ -146,18 +148,18 @@ func NewClient(cfg ClientConfig) (Client, error) {
clientConfig := eventingtls.NewDefaultClientConfig()
clientConfig.CACerts = cfg.Env.GetCACerts()

httpTransport := nethttp.DefaultTransport.(*nethttp.Transport).Clone()
httpTransport.TLSClientConfig, err = eventingtls.GetTLSClientConfig(clientConfig)
tlsConfig, err := eventingtls.GetTLSClientConfig(clientConfig)
if err != nil {
return nil, err
}

cfg.Env.GetLogger().Debugw("Setting up TLS transport", "env", cfg.Env)
httpsTransport := transport.Base.(*nethttp.Transport).Clone()
httpsTransport.TLSClientConfig = tlsConfig

closeIdler = httpTransport
cfg.Env.GetLogger().Debugw("Setting up TLS transport", "env", cfg.Env)

transport = &ochttp.Transport{
Base: httpTransport,
Base: httpsTransport,
Propagation: tracecontextb3.TraceContextEgress,
}
}
Expand All @@ -172,7 +174,11 @@ func NewClient(cfg ClientConfig) (Client, error) {
pOpts = append(pOpts, http.WithHeader(apis.KnNamespaceHeader, cfg.Env.GetNamespace()))
}

pOpts = append(pOpts, http.WithRoundTripper(transport))
httpClient := nethttp.Client{Transport: roundTripperDecorator(transport)}

// Important: prepend HTTP client option to make sure that other options are applied to this
// client and not to the default client.
pOpts = append([]http.Option{http.WithClient(httpClient)}, pOpts...)

// Make sure that explicitly set options have priority
opts := append(pOpts, cfg.Options...)
Expand All @@ -187,7 +193,7 @@ func NewClient(cfg ClientConfig) (Client, error) {
}
return &client{
ceClient: ceClient,
closeIdler: closeIdler,
closeIdler: transport.Base.(*nethttp.Transport),
ceOverrides: ceOverrides,
reporter: cfg.Reporter,
crStatusEventClient: cfg.CrStatusEventClient,
Expand Down Expand Up @@ -337,3 +343,23 @@ func MetricTagFromContext(ctx context.Context) *MetricTag {
ResourceGroup: "unknown",
}
}

func roundTripperDecorator(roundTripper nethttp.RoundTripper) nethttp.RoundTripper {
return &ochttp.Transport{
Propagation: &tracecontext.HTTPFormat{},
Base: roundTripper,
FormatSpanName: formatSpanName,
}
}

func formatSpanName(r *nethttp.Request) string {
return "cloudevents.http." + r.URL.Path
}

func tracecontextMiddleware(h nethttp.Handler) nethttp.Handler {
return &ochttp.Handler{
Propagation: &tracecontext.HTTPFormat{},
Handler: h,
FormatSpanName: formatSpanName,
}
}
4 changes: 4 additions & 0 deletions pkg/adapter/v2/cloudevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func TestNewCloudEventsClient_request(t *testing.T) {
}

func TestTLS(t *testing.T) {
t.Parallel()

ctx, _ := SetupFakeContext(t)
ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, 20*time.Millisecond, 5)
Expand Down Expand Up @@ -344,7 +345,10 @@ func TestTLS(t *testing.T) {
}

for _, tc := range tt {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

reporter, err := source.NewStatsReporter()
assert.Nil(t, err)

Expand Down

0 comments on commit 6486546

Please sign in to comment.