Skip to content

Commit

Permalink
Update CE clients.
Browse files Browse the repository at this point in the history
  • Loading branch information
Harwayne committed Jun 23, 2020
1 parent 85c85b1 commit de158b1
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 10 deletions.
9 changes: 7 additions & 2 deletions cmd/sendevent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import (
"os"

cloudevents "github.com/cloudevents/sdk-go/v2"

"go.opencensus.io/plugin/ochttp"
"knative.dev/eventing/pkg/utils"
"knative.dev/pkg/tracing/propagation/tracecontextb3"
)

var (
Expand Down Expand Up @@ -64,7 +65,11 @@ func main() {
source = fmt.Sprintf("http://%s", utils.GetClusterDomainName())
}

t, err := cloudevents.NewHTTP(cloudevents.WithTarget(target))
t, err := cloudevents.NewHTTP(
cloudevents.WithTarget(target),
cloudevents.WithRoundTripper(&ochttp.Transport{
Propagation: tracecontextb3.TraceContextEgress,
}))
if err != nil {
log.Printf("failed to create transport, %v", err)
os.Exit(1)
Expand Down
6 changes: 6 additions & 0 deletions pkg/adapter/v2/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"fmt"
"net/url"

"go.opencensus.io/plugin/ochttp"
"knative.dev/pkg/tracing/propagation/tracecontextb3"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/protocol"
Expand All @@ -36,6 +39,9 @@ func NewCloudEventsClient(target string, ceOverrides *duckv1.CloudEventOverrides
if len(target) > 0 {
pOpts = append(pOpts, cloudevents.WithTarget(target))
}
pOpts = append(pOpts, cloudevents.WithRoundTripper(&ochttp.Transport{
Propagation: tracecontextb3.TraceContextEgress,
}))

p, err := cloudevents.NewHTTP(pOpts...)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/channel/message_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"context"
"errors"
"fmt"
"go.opencensus.io/plugin/ochttp"
"knative.dev/pkg/tracing/propagation/tracecontextb3"
nethttp "net/http"
"net/http/httptest"
"testing"
Expand Down Expand Up @@ -220,6 +222,9 @@ func TestMessageReceiver_ServerStart_trace_propagation(t *testing.T) {
p, err := cloudevents.NewHTTP(
http.WithTarget(server.URL),
http.WithMethod(method),
cloudevents.WithRoundTripper(&ochttp.Transport{
Propagation: tracecontextb3.TraceContextEgress,
}))
)
require.NoError(t, err)
p.RequestTemplate.Host = host
Expand Down
5 changes: 5 additions & 0 deletions test/performance/infra/sender/http_load_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package sender
import (
"context"
"fmt"
"go.opencensus.io/plugin/ochttp"
"knative.dev/pkg/tracing/propagation/tracecontextb3"
"math/rand"
"net"
"net/http"
Expand Down Expand Up @@ -188,6 +190,9 @@ func vegetaAttackerTransport() *http.Transport {
func newCloudEventsClient(sinkUrl string) (cloudevents.Client, error) {
t, err := cloudevents.NewHTTP(
cloudevents.WithTarget(sinkUrl),
cloudevents.WithRoundTripper(&ochttp.Transport{
Propagation: tracecontextb3.TraceContextEgress,
}))
)
if err != nil {
return nil, fmt.Errorf("failed to create transport: %v", err)
Expand Down
11 changes: 8 additions & 3 deletions test/test_images/heartbeats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import (
"strconv"
"time"

duckv1 "knative.dev/pkg/apis/duck/v1"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/kelseyhightower/envconfig"
"go.opencensus.io/plugin/ochttp"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/tracing/propagation/tracecontextb3"
)

type Heartbeat struct {
Expand Down Expand Up @@ -89,7 +90,11 @@ func main() {
ceOverrides = &overrides
}

p, err := cloudevents.NewHTTP(cloudevents.WithTarget(sink))
p, err := cloudevents.NewHTTP(
cloudevents.WithTarget(sink),
cloudevents.WithRoundTripper(&ochttp.Transport{
Propagation: tracecontextb3.TraceContextEgress,
}))
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}
Expand Down
8 changes: 7 additions & 1 deletion test/test_images/sequencestepper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"log"

cloudevents "github.com/cloudevents/sdk-go/v2"
"go.opencensus.io/plugin/ochttp"
"knative.dev/pkg/tracing/propagation/tracecontextb3"
)

var (
Expand Down Expand Up @@ -61,7 +63,11 @@ func main() {
// parse the command line flags
flag.Parse()

t, err := cloudevents.NewHTTP(cloudevents.WithPort(8080))
t, err := cloudevents.NewHTTP(
cloudevents.WithPort(8080),
cloudevents.WithRoundTripper(&ochttp.Transport{
Propagation: tracecontextb3.TraceContextEgress,
}))
if err != nil {
log.Fatalf("failed to create transport, %v", err)
}
Expand Down
9 changes: 7 additions & 2 deletions test/test_images/transformevents/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import (
"log"

cloudevents "github.com/cloudevents/sdk-go/v2"
"go.opencensus.io/plugin/ochttp"
"go.uber.org/zap"

"knative.dev/eventing/pkg/tracing"
"knative.dev/pkg/tracing/propagation/tracecontextb3"
)

var (
Expand Down Expand Up @@ -72,7 +73,11 @@ func main() {
log.Fatalf("Unable to setup trace publishing: %v", err)
}

t, err := cloudevents.NewHTTP(cloudevents.WithPort(8080))
t, err := cloudevents.NewHTTP(
cloudevents.WithPort(8080),
cloudevents.WithRoundTripper(&ochttp.Transport{
Propagation: tracecontextb3.TraceContextEgress,
}))
if err != nil {
log.Fatalf("failed to create transport, %v", err)
}
Expand Down
8 changes: 6 additions & 2 deletions test/upgrade/prober/wathola/client/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/wavesoftware/go-ensure"
"go.opencensus.io/plugin/ochttp"
"knative.dev/eventing/test/upgrade/prober/wathola/config"
"knative.dev/pkg/tracing/propagation/tracecontextb3"
)

var log = config.Log
Expand All @@ -38,9 +40,11 @@ func Receive(
receiveEvent ReceiveEvent,
middlewares ...cloudeventshttp.Middleware,
) {
portOpt := cloudevents.WithPort(port)
opts := make([]cloudeventshttp.Option, 0)
opts = append(opts, portOpt)
opts = append(opts, cloudevents.WithPort(port))
opts = append(opts, cloudevents.WithRoundTripper(&ochttp.Transport{
Propagation: tracecontextb3.TraceContextEgress,
}))
if config.Instance.Readiness.Enabled {
readyOpt := cloudevents.WithMiddleware(readinessMiddleware)
opts = append(opts, readyOpt)
Expand Down

0 comments on commit de158b1

Please sign in to comment.