From 2a88d67994e14f3d905b6af22948f007cae2c089 Mon Sep 17 00:00:00 2001 From: Jeevan Rangaramaiah Date: Tue, 13 Jun 2023 14:26:45 +0200 Subject: [PATCH 1/9] remove legacy http client and message sender --- cmd/heartbeats/main.go | 5 +- pkg/kncloudevents/http_client.go | 83 ------- pkg/kncloudevents/http_client_new.go | 1 - pkg/kncloudevents/http_client_test.go | 72 ------ pkg/kncloudevents/message_sender.go | 82 ------- pkg/kncloudevents/message_sender_test.go | 300 ----------------------- test/test_images/wathola-fetcher/main.go | 5 +- 7 files changed, 8 insertions(+), 540 deletions(-) delete mode 100644 pkg/kncloudevents/http_client.go mode change 100644 => 100755 pkg/kncloudevents/http_client_new.go delete mode 100644 pkg/kncloudevents/http_client_test.go delete mode 100644 pkg/kncloudevents/message_sender.go delete mode 100644 pkg/kncloudevents/message_sender_test.go diff --git a/cmd/heartbeats/main.go b/cmd/heartbeats/main.go index d681541b184..a3f27c974ff 100644 --- a/cmd/heartbeats/main.go +++ b/cmd/heartbeats/main.go @@ -222,7 +222,10 @@ func main() { // maybeQuitIstioProxy shuts down Istio's proxy when available. func maybeQuitIstioProxy() { - _, err := http.DefaultClient.Get("http://localhost:15020/quitquitquit") + req, _ := http.NewRequest(http.MethodPost, "http://localhost:15020/quitquitquit", nil) + + _, err := http.DefaultClient.Do(req) + if err != nil && !errors.Is(err, syscall.ECONNREFUSED) { log.Println("[Ignore this warning if Istio proxy is not used on this pod]", err) } diff --git a/pkg/kncloudevents/http_client.go b/pkg/kncloudevents/http_client.go deleted file mode 100644 index c36d0afc074..00000000000 --- a/pkg/kncloudevents/http_client.go +++ /dev/null @@ -1,83 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kncloudevents - -import ( - nethttp "net/http" - "sync" - - "go.opencensus.io/plugin/ochttp" - "knative.dev/pkg/tracing/propagation/tracecontextb3" -) - -type legacyHolder struct { - clientMutex sync.Mutex - connectionArgs *ConnectionArgs - client **nethttp.Client -} - -var legacyClientHolder = legacyHolder{} - -// The used HTTP client is a singleton, so the same http client is reused across all the application. -// If connection args is modified, client is cleaned and a new one is created. -func getClient() *nethttp.Client { - legacyClientHolder.clientMutex.Lock() - defer legacyClientHolder.clientMutex.Unlock() - - if legacyClientHolder.client == nil { - // Add connection options to the default transport. - var base = nethttp.DefaultTransport.(*nethttp.Transport).Clone() - legacyClientHolder.connectionArgs.configureTransport(base) - c := &nethttp.Client{ - // Add output tracing. - Transport: &ochttp.Transport{ - Base: base, - Propagation: tracecontextb3.TraceContextEgress, - }, - } - legacyClientHolder.client = &c - } - - return *legacyClientHolder.client -} - -// ConfigureConnectionArgs configures the new connection args. -// The existing client won't be affected, but a new one will be created. -// Use sparingly, because it might lead to creating a lot of clients, none of them sharing their connection pool! -func configureConnectionArgsOldClient(ca *ConnectionArgs) { - legacyClientHolder.clientMutex.Lock() - defer legacyClientHolder.clientMutex.Unlock() - - // Check if same config - if legacyClientHolder.connectionArgs != nil && - ca != nil && - ca.MaxIdleConns == legacyClientHolder.connectionArgs.MaxIdleConns && - ca.MaxIdleConnsPerHost == legacyClientHolder.connectionArgs.MaxIdleConnsPerHost { - return - } - - if legacyClientHolder.client != nil { - // Let's try to clean up a bit the existing client - // Note: this won't remove it nor close it - (*legacyClientHolder.client).CloseIdleConnections() - - // Setting client to nil - legacyClientHolder.client = nil - } - - legacyClientHolder.connectionArgs = ca -} diff --git a/pkg/kncloudevents/http_client_new.go b/pkg/kncloudevents/http_client_new.go old mode 100644 new mode 100755 index 0c9f1be1a8b..56c938f64a9 --- a/pkg/kncloudevents/http_client_new.go +++ b/pkg/kncloudevents/http_client_new.go @@ -123,7 +123,6 @@ func DeleteAddressableHandler(addressable duckv1.Addressable) { // ConfigureConnectionArgs configures the new connection args. // Use sparingly, because it might lead to creating a lot of clients, none of them sharing their connection pool! func ConfigureConnectionArgs(ca *ConnectionArgs) { - configureConnectionArgsOldClient(ca) //also configure the connection args of the old client clients.mu.Lock() defer clients.mu.Unlock() diff --git a/pkg/kncloudevents/http_client_test.go b/pkg/kncloudevents/http_client_test.go deleted file mode 100644 index 33f8a613758..00000000000 --- a/pkg/kncloudevents/http_client_test.go +++ /dev/null @@ -1,72 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kncloudevents - -import ( - nethttp "net/http" - "testing" - - "github.com/stretchr/testify/require" - "go.opencensus.io/plugin/ochttp" -) - -func TestConfigureConnectionArgsOldClient(t *testing.T) { - // Set connection args - configureConnectionArgsOldClient(&ConnectionArgs{ - MaxIdleConnsPerHost: 1000, - MaxIdleConns: 1000, - }) - client1 := getClient() - - require.Same(t, getClient(), client1) - require.Equal(t, 1000, castToTransport(client1).MaxIdleConns) - require.Equal(t, 1000, castToTransport(client1).MaxIdleConnsPerHost) - - // Set other connection args - configureConnectionArgsOldClient(&ConnectionArgs{ - MaxIdleConnsPerHost: 2000, - MaxIdleConns: 2000, - }) - client2 := getClient() - - require.Same(t, getClient(), client2) - require.Equal(t, 2000, castToTransport(client2).MaxIdleConns) - require.Equal(t, 2000, castToTransport(client2).MaxIdleConnsPerHost) - - // Try to set the same value and client should not be cleaned up - configureConnectionArgsOldClient(&ConnectionArgs{ - MaxIdleConnsPerHost: 2000, - MaxIdleConns: 2000, - }) - require.Same(t, getClient(), client2) - - // Set back to nil - configureConnectionArgsOldClient(nil) - client3 := getClient() - - require.Same(t, getClient(), client3) - require.Equal(t, nethttp.DefaultTransport.(*nethttp.Transport).MaxIdleConns, castToTransport(client3).MaxIdleConns) - require.Equal(t, nethttp.DefaultTransport.(*nethttp.Transport).MaxIdleConnsPerHost, castToTransport(client3).MaxIdleConnsPerHost) - - require.NotSame(t, client1, client2) - require.NotSame(t, client1, client3) - require.NotSame(t, client2, client3) -} - -func castToTransport(client *nethttp.Client) *nethttp.Transport { - return client.Transport.(*ochttp.Transport).Base.(*nethttp.Transport) -} diff --git a/pkg/kncloudevents/message_sender.go b/pkg/kncloudevents/message_sender.go deleted file mode 100644 index b026bbf439e..00000000000 --- a/pkg/kncloudevents/message_sender.go +++ /dev/null @@ -1,82 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kncloudevents - -import ( - "context" - nethttp "net/http" - - "github.com/hashicorp/go-retryablehttp" -) - -// HTTPMessageSender is a wrapper for an http client that can send cloudevents.Request with retries. -// Deprecated: Use kncloudevents.CloudEventsRequest instead. -type HTTPMessageSender struct { - Client *nethttp.Client - Target string -} - -func NewHTTPMessageSenderWithTarget(target string) (*HTTPMessageSender, error) { - return &HTTPMessageSender{Client: getClient(), Target: target}, nil -} - -func (s *HTTPMessageSender) NewCloudEventRequest(ctx context.Context) (*nethttp.Request, error) { - return nethttp.NewRequestWithContext(ctx, "POST", s.Target, nil) -} - -func (s *HTTPMessageSender) NewCloudEventRequestWithTarget(ctx context.Context, target string) (*nethttp.Request, error) { - return nethttp.NewRequestWithContext(ctx, "POST", target, nil) -} - -func (s *HTTPMessageSender) Send(req *nethttp.Request) (*nethttp.Response, error) { - return s.Client.Do(req) -} - -func (s *HTTPMessageSender) SendWithRetries(req *nethttp.Request, config *RetryConfig) (*nethttp.Response, error) { - if config == nil { - return s.Send(req) - } - - client := s.Client - if config.RequestTimeout != 0 { - client = &nethttp.Client{ - Transport: client.Transport, - CheckRedirect: client.CheckRedirect, - Jar: client.Jar, - Timeout: config.RequestTimeout, - } - } - - retryableClient := retryablehttp.Client{ - HTTPClient: client, - RetryWaitMin: defaultRetryWaitMin, - RetryWaitMax: defaultRetryWaitMax, - RetryMax: config.RetryMax, - CheckRetry: retryablehttp.CheckRetry(config.CheckRetry), - Backoff: generateBackoffFn(config), - ErrorHandler: func(resp *nethttp.Response, err error, numTries int) (*nethttp.Response, error) { - return resp, err - }, - } - - retryableReq, err := retryablehttp.FromRequest(req) - if err != nil { - return nil, err - } - - return retryableClient.Do(retryableReq) -} diff --git a/pkg/kncloudevents/message_sender_test.go b/pkg/kncloudevents/message_sender_test.go deleted file mode 100644 index 543833cdb00..00000000000 --- a/pkg/kncloudevents/message_sender_test.go +++ /dev/null @@ -1,300 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kncloudevents - -import ( - "context" - "net" - "net/http" - "net/http/httptest" - "net/url" - "sync/atomic" - "testing" - "time" - - "github.com/cloudevents/sdk-go/v2/binding/buffering" - bindingtest "github.com/cloudevents/sdk-go/v2/binding/test" - cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" - cetest "github.com/cloudevents/sdk-go/v2/test" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "k8s.io/utils/pointer" - - v1 "knative.dev/eventing/pkg/apis/duck/v1" -) - -func TestHTTPMessageSenderSendWithRetries(t *testing.T) { - t.Parallel() - - tests := []struct { - name string - config *RetryConfig - wantStatus int - wantDispatch int - }{{ - name: "5 max retry", - config: &RetryConfig{ - RetryMax: 5, - CheckRetry: func(ctx context.Context, resp *http.Response, err error) (bool, error) { - return true, nil - }, - Backoff: func(attemptNum int, resp *http.Response) time.Duration { - return time.Millisecond - }, - }, - wantStatus: http.StatusServiceUnavailable, - wantDispatch: 6, - }, { - name: "1 max retry", - config: &RetryConfig{ - RetryMax: 1, - CheckRetry: func(ctx context.Context, resp *http.Response, err error) (bool, error) { - return true, nil - }, - Backoff: func(attemptNum int, resp *http.Response) time.Duration { - return time.Millisecond - }, - }, - wantStatus: http.StatusServiceUnavailable, - wantDispatch: 2, - }, { - name: "with no retryConfig", - wantStatus: http.StatusServiceUnavailable, - wantDispatch: 1, - }} - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - var n int32 - server := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - atomic.AddInt32(&n, 1) - writer.WriteHeader(tt.wantStatus) - })) - - sender := &HTTPMessageSender{ - Client: http.DefaultClient, - } - - request, err := http.NewRequest("POST", server.URL, nil) - assert.Nil(t, err) - got, err := sender.SendWithRetries(request, tt.config) - if err != nil { - t.Fatalf("SendWithRetries() error = %v, wantErr nil", err) - } - if got.StatusCode != http.StatusServiceUnavailable { - t.Fatalf("SendWithRetries() got = %v, want %v", got.StatusCode, http.StatusServiceUnavailable) - } - if count := int(atomic.LoadInt32(&n)); count != tt.wantDispatch { - t.Fatalf("expected %d retries got %d", tt.config.RetryMax, count) - } - }) - } -} - -func TestHTTPMessageSenderSendWithRetriesWithBufferedMessage(t *testing.T) { - t.Parallel() - - const wantToSkip = 9 - config := &RetryConfig{ - RetryMax: wantToSkip, - CheckRetry: func(ctx context.Context, resp *http.Response, err error) (bool, error) { - return true, nil - }, - Backoff: func(attemptNum int, resp *http.Response) time.Duration { - return time.Millisecond * 50 * time.Duration(attemptNum) - }, - } - - var n uint32 - server := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - thisReqN := atomic.AddUint32(&n, 1) - if thisReqN <= wantToSkip { - writer.WriteHeader(http.StatusServiceUnavailable) - } else { - writer.WriteHeader(http.StatusAccepted) - } - })) - - sender := &HTTPMessageSender{ - Client: http.DefaultClient, - } - - request, err := http.NewRequest("POST", server.URL, nil) - assert.Nil(t, err) - - // Create a message similar to the one we send with channels - mockMessage := bindingtest.MustCreateMockBinaryMessage(cetest.FullEvent()) - bufferedMessage, err := buffering.BufferMessage(context.TODO(), mockMessage) - assert.Nil(t, err) - - err = cehttp.WriteRequest(context.TODO(), bufferedMessage, request) - assert.Nil(t, err) - - got, err := sender.SendWithRetries(request, config) - if err != nil { - t.Fatalf("SendWithRetries() error = %v, wantErr nil", err) - } - if got.StatusCode != http.StatusAccepted { - t.Fatalf("SendWithRetries() got = %v, want %v", got.StatusCode, http.StatusAccepted) - } - if count := atomic.LoadUint32(&n); count != wantToSkip+1 { - t.Fatalf("expected %d count got %d", wantToSkip+1, count) - } -} - -func TestHTTPMessageSenderSendWithRetriesWithSingleRequestTimeout(t *testing.T) { - t.Parallel() - - timeout := time.Second * 3 - - var n int32 - server := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - newVal := atomic.AddInt32(&n, 1) - if newVal >= 5 { - writer.WriteHeader(http.StatusOK) - } else { - // Let's add a bit more time - time.Sleep(timeout + (200 * time.Millisecond)) - writer.WriteHeader(http.StatusAccepted) - } - })) - defer server.Close() - - sender := &HTTPMessageSender{ - Client: getClient(), - } - config := &RetryConfig{ - RetryMax: 5, - CheckRetry: SelectiveRetry, - Backoff: func(attemptNum int, resp *http.Response) time.Duration { - return time.Millisecond - }, - RequestTimeout: timeout, - } - - request, err := http.NewRequest("POST", server.URL, nil) - require.NoError(t, err) - - got, err := sender.SendWithRetries(request, config) - - require.Equal(t, 5, int(atomic.LoadInt32(&n))) - require.NoError(t, err) - require.Equal(t, http.StatusOK, got.StatusCode) -} - -func TestRetriesOnNetworkErrors(t *testing.T) { - - n := int32(10) - linear := v1.BackoffPolicyLinear - target := "127.0.0.1:63468" - - calls := make(chan struct{}) - defer close(calls) - - nCalls := int32(0) - - cont := make(chan struct{}) - defer close(cont) - - go func() { - for range calls { - - nCalls++ - // Simulate that the target service is back up. - // - // First n/2-1 calls we get connection refused since there is no server running. - // Now we start a server that responds with a retryable error, so we expect that - // the client continues to retry for a different reason. - // - // The last time we return 200, so we don't expect a new retry. - if n/2 == nCalls { - - l, err := net.Listen("tcp", target) - assert.Nil(t, err) - - s := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - if n-1 != nCalls { - writer.WriteHeader(http.StatusServiceUnavailable) - return - } - })) - defer s.Close() //nolint // defers in this range loop won't run unless the channel gets closed - - assert.Nil(t, s.Listener.Close()) - - s.Listener = l - - s.Start() - } - cont <- struct{}{} - } - }() - - r, err := RetryConfigFromDeliverySpec(v1.DeliverySpec{ - Retry: pointer.Int32(n), - BackoffPolicy: &linear, - BackoffDelay: pointer.String("PT0.1S"), - }) - assert.Nil(t, err) - - checkRetry := r.CheckRetry - - r.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) { - calls <- struct{}{} - <-cont - - return checkRetry(ctx, resp, err) - } - - req, err := http.NewRequest("POST", "http://"+target, nil) - assert.Nil(t, err) - - sender, err := NewHTTPMessageSenderWithTarget("") - assert.Nil(t, err) - - _, err = sender.SendWithRetries(req, &r) - assert.Nil(t, err) - - // nCalls keeps track of how many times a call to check retry occurs. - // Since the number of request are n + 1 and the last one is successful the expected number of calls are n. - assert.Equal(t, n, nCalls, "expected %d got %d", n, nCalls) -} - -func TestHTTPMessageSender_NewCloudEventRequestWithTarget(t *testing.T) { - s := &HTTPMessageSender{ - Client: getClient(), - Target: "localhost", - } - - expectedUrl, err := url.Parse("example.com") - require.NoError(t, err) - req, err := s.NewCloudEventRequestWithTarget(context.TODO(), "example.com") - require.NoError(t, err) - require.Equal(t, req.URL, expectedUrl) -} - -func TestHTTPMessageSender_NewCloudEventRequest(t *testing.T) { - s := &HTTPMessageSender{ - Client: getClient(), - Target: "localhost", - } - - expectedUrl, err := url.Parse("localhost") - require.NoError(t, err) - req, err := s.NewCloudEventRequest(context.TODO()) - require.NoError(t, err) - require.Equal(t, req.URL, expectedUrl) -} diff --git a/test/test_images/wathola-fetcher/main.go b/test/test_images/wathola-fetcher/main.go index b32bf48c341..8348e4668f3 100644 --- a/test/test_images/wathola-fetcher/main.go +++ b/test/test_images/wathola-fetcher/main.go @@ -31,7 +31,10 @@ func main() { } func maybeQuitIstioProxy() { - _, err := http.DefaultClient.Get("http://localhost:15020/quitquitquit") + req, _ := http.NewRequest(http.MethodPost, "http://localhost:15020/quitquitquit", nil) + + _, err := http.DefaultClient.Do(req) + if err != nil && !errors.Is(err, syscall.ECONNREFUSED) { log.Println("[Ignore this warning if Istio proxy is not used on this pod]", err) } From a2b7256e1fdb1d9656515da017fc8b9243c82341 Mon Sep 17 00:00:00 2001 From: Jeevan Date: Thu, 13 Jul 2023 14:44:07 +0530 Subject: [PATCH 2/9] Removed legacy http client and message sender along with usage. Removed message sender usage from message dispatcher test Removed message sender usage from message dispatcher test --- .../message_dispatcher_benchmark_test.go | 7 ------- .../message_dispatcher_test.go | 20 ------------------- 2 files changed, 27 deletions(-) diff --git a/pkg/inmemorychannel/message_dispatcher_benchmark_test.go b/pkg/inmemorychannel/message_dispatcher_benchmark_test.go index 371bdcf3e8d..275a61668d0 100644 --- a/pkg/inmemorychannel/message_dispatcher_benchmark_test.go +++ b/pkg/inmemorychannel/message_dispatcher_benchmark_test.go @@ -76,12 +76,6 @@ func BenchmarkDispatcher_dispatch_ok_through_2_channels(b *testing.B) { }, } - // Let's mock this stuff! - httpSender, err := kncloudevents.NewHTTPMessageSenderWithTarget(channelA.URL.String()) - if err != nil { - b.Fatal(err) - } - multiChannelFanoutHandler, err := multichannelfanout.NewMessageHandlerWithConfig(context.TODO(), logger, channel.NewMessageDispatcher(logger), config, reporter) if err != nil { b.Fatal(err) @@ -96,7 +90,6 @@ func BenchmarkDispatcher_dispatch_ok_through_2_channels(b *testing.B) { dispatcher := NewMessageDispatcher(dispatcherArgs) requestHandler := kncloudevents.CreateHandler(dispatcher.handler) - httpSender.Client = mockedHTTPClient(clientMock(channelA.URL.Host, transformations.URL.Host, channelB.URL.Host, receiver.URL.Host, requestHandler)) // Start the bench b.ResetTimer() diff --git a/pkg/inmemorychannel/message_dispatcher_test.go b/pkg/inmemorychannel/message_dispatcher_test.go index a1256c4d5e1..5146fdb2ad9 100644 --- a/pkg/inmemorychannel/message_dispatcher_test.go +++ b/pkg/inmemorychannel/message_dispatcher_test.go @@ -258,29 +258,9 @@ func TestDispatcher_dispatch(t *testing.T) { }() dispatcher.WaitReady() - // Ok now everything should be ready to send the event - httpsender, err := kncloudevents.NewHTTPMessageSenderWithTarget(channelAProxy.URL) - if err != nil { - t.Fatal(err) - } - - req, err := httpsender.NewCloudEventRequest(context.Background()) - if err != nil { - t.Fatal(err) - } - event := test.FullEvent() _ = protocolhttp.WriteRequest(context.Background(), binding.ToMessage(&event), req) - res, err := httpsender.Send(req) - if err != nil { - t.Fatal(err) - } - - if res.StatusCode != http.StatusAccepted { - t.Fatal("Expected 202, Have", res.StatusCode) - } - transformationsFailureWg.Wait() deadLetterWg.Wait() err = <-transformationsCh From a20347c4e38854fe625eaf31257dc53394537e61 Mon Sep 17 00:00:00 2001 From: Jeevan Date: Sat, 15 Jul 2023 10:57:17 +0530 Subject: [PATCH 3/9] resolving on review commits. --- .../message_dispatcher_benchmark_test.go | 6 ++++++ pkg/inmemorychannel/message_dispatcher_test.go | 15 +++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/pkg/inmemorychannel/message_dispatcher_benchmark_test.go b/pkg/inmemorychannel/message_dispatcher_benchmark_test.go index 275a61668d0..e173c6536b8 100644 --- a/pkg/inmemorychannel/message_dispatcher_benchmark_test.go +++ b/pkg/inmemorychannel/message_dispatcher_benchmark_test.go @@ -76,6 +76,12 @@ func BenchmarkDispatcher_dispatch_ok_through_2_channels(b *testing.B) { }, } + // Let's mock this stuff! + request, err := kncloudevents.NewCloudEventRequest(context, channelA.URL.String()) + if err != nil { + b.Fatal(err) + } + multiChannelFanoutHandler, err := multichannelfanout.NewMessageHandlerWithConfig(context.TODO(), logger, channel.NewMessageDispatcher(logger), config, reporter) if err != nil { b.Fatal(err) diff --git a/pkg/inmemorychannel/message_dispatcher_test.go b/pkg/inmemorychannel/message_dispatcher_test.go index 5146fdb2ad9..c0cf3a113e9 100644 --- a/pkg/inmemorychannel/message_dispatcher_test.go +++ b/pkg/inmemorychannel/message_dispatcher_test.go @@ -258,9 +258,24 @@ func TestDispatcher_dispatch(t *testing.T) { }() dispatcher.WaitReady() + // Ok now everything should be ready to send the event + request, err := kncloudevents.NewCloudEventRequest(context, channelAProxy.URL) + if err != nil { + t.Fatal(err) + } + event := test.FullEvent() _ = protocolhttp.WriteRequest(context.Background(), binding.ToMessage(&event), req) + res, err := request.Send() + if err != nil { + t.Fatal(err) + } + + if res.StatusCode != http.StatusAccepted { + t.Fatal("Expected 202, Have", res.StatusCode) + } + transformationsFailureWg.Wait() deadLetterWg.Wait() err = <-transformationsCh From 341c8822c337730f5bcdb20301d21dc2b533676d Mon Sep 17 00:00:00 2001 From: Jeevan Date: Tue, 18 Jul 2023 18:19:01 +0530 Subject: [PATCH 4/9] resolving on review commits and fixing test failures --- pkg/inmemorychannel/message_dispatcher_benchmark_test.go | 5 ----- pkg/inmemorychannel/message_dispatcher_test.go | 2 +- pkg/kncloudevents/http_client_new_test.go | 4 ++++ 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/inmemorychannel/message_dispatcher_benchmark_test.go b/pkg/inmemorychannel/message_dispatcher_benchmark_test.go index e173c6536b8..866cdca44cc 100644 --- a/pkg/inmemorychannel/message_dispatcher_benchmark_test.go +++ b/pkg/inmemorychannel/message_dispatcher_benchmark_test.go @@ -77,11 +77,6 @@ func BenchmarkDispatcher_dispatch_ok_through_2_channels(b *testing.B) { } // Let's mock this stuff! - request, err := kncloudevents.NewCloudEventRequest(context, channelA.URL.String()) - if err != nil { - b.Fatal(err) - } - multiChannelFanoutHandler, err := multichannelfanout.NewMessageHandlerWithConfig(context.TODO(), logger, channel.NewMessageDispatcher(logger), config, reporter) if err != nil { b.Fatal(err) diff --git a/pkg/inmemorychannel/message_dispatcher_test.go b/pkg/inmemorychannel/message_dispatcher_test.go index c0cf3a113e9..9e19cec02ab 100644 --- a/pkg/inmemorychannel/message_dispatcher_test.go +++ b/pkg/inmemorychannel/message_dispatcher_test.go @@ -265,7 +265,7 @@ func TestDispatcher_dispatch(t *testing.T) { } event := test.FullEvent() - _ = protocolhttp.WriteRequest(context.Background(), binding.ToMessage(&event), req) + _ = protocolhttp.WriteRequest(context.Background(), binding.ToMessage(&event), request) res, err := request.Send() if err != nil { diff --git a/pkg/kncloudevents/http_client_new_test.go b/pkg/kncloudevents/http_client_new_test.go index 5d717bbf5f9..9bc450fc168 100644 --- a/pkg/kncloudevents/http_client_new_test.go +++ b/pkg/kncloudevents/http_client_new_test.go @@ -145,3 +145,7 @@ func Test_ConfigureConnectionArgs(t *testing.T) { require.NotSame(t, client1, client3) require.NotSame(t, client2, client3) } + +func castToTransport(client *nethttp.Client) *nethttp.Transport { + return client.Transport.(*ochttp.Transport).Base.(*nethttp.Transport) +} \ No newline at end of file From e2f54d215c8b5499c5b7e2be1b1c969213e552b1 Mon Sep 17 00:00:00 2001 From: Jeevan Date: Tue, 18 Jul 2023 18:19:01 +0530 Subject: [PATCH 5/9] resolving on review commits and fixing test failures --- pkg/inmemorychannel/message_dispatcher_benchmark_test.go | 5 ----- pkg/inmemorychannel/message_dispatcher_test.go | 2 +- pkg/kncloudevents/http_client_new_test.go | 5 +++++ 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/inmemorychannel/message_dispatcher_benchmark_test.go b/pkg/inmemorychannel/message_dispatcher_benchmark_test.go index e173c6536b8..866cdca44cc 100644 --- a/pkg/inmemorychannel/message_dispatcher_benchmark_test.go +++ b/pkg/inmemorychannel/message_dispatcher_benchmark_test.go @@ -77,11 +77,6 @@ func BenchmarkDispatcher_dispatch_ok_through_2_channels(b *testing.B) { } // Let's mock this stuff! - request, err := kncloudevents.NewCloudEventRequest(context, channelA.URL.String()) - if err != nil { - b.Fatal(err) - } - multiChannelFanoutHandler, err := multichannelfanout.NewMessageHandlerWithConfig(context.TODO(), logger, channel.NewMessageDispatcher(logger), config, reporter) if err != nil { b.Fatal(err) diff --git a/pkg/inmemorychannel/message_dispatcher_test.go b/pkg/inmemorychannel/message_dispatcher_test.go index c0cf3a113e9..9e19cec02ab 100644 --- a/pkg/inmemorychannel/message_dispatcher_test.go +++ b/pkg/inmemorychannel/message_dispatcher_test.go @@ -265,7 +265,7 @@ func TestDispatcher_dispatch(t *testing.T) { } event := test.FullEvent() - _ = protocolhttp.WriteRequest(context.Background(), binding.ToMessage(&event), req) + _ = protocolhttp.WriteRequest(context.Background(), binding.ToMessage(&event), request) res, err := request.Send() if err != nil { diff --git a/pkg/kncloudevents/http_client_new_test.go b/pkg/kncloudevents/http_client_new_test.go index 5d717bbf5f9..9410d2346bd 100644 --- a/pkg/kncloudevents/http_client_new_test.go +++ b/pkg/kncloudevents/http_client_new_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" + "go.opencensus.io/plugin/ochttp" ) var ( @@ -145,3 +146,7 @@ func Test_ConfigureConnectionArgs(t *testing.T) { require.NotSame(t, client1, client3) require.NotSame(t, client2, client3) } + +func castToTransport(client *nethttp.Client) *nethttp.Transport { + return client.Transport.(*ochttp.Transport).Base.(*nethttp.Transport) +} \ No newline at end of file From 6544a804366ece241566370b45596cbae8451c49 Mon Sep 17 00:00:00 2001 From: Jeevan Date: Tue, 18 Jul 2023 18:42:04 +0530 Subject: [PATCH 6/9] Update pkg/inmemorychannel/message_dispatcher_test.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Christoph Stäbler --- pkg/inmemorychannel/message_dispatcher_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/inmemorychannel/message_dispatcher_test.go b/pkg/inmemorychannel/message_dispatcher_test.go index 9e19cec02ab..626aef35cee 100644 --- a/pkg/inmemorychannel/message_dispatcher_test.go +++ b/pkg/inmemorychannel/message_dispatcher_test.go @@ -259,7 +259,7 @@ func TestDispatcher_dispatch(t *testing.T) { dispatcher.WaitReady() // Ok now everything should be ready to send the event - request, err := kncloudevents.NewCloudEventRequest(context, channelAProxy.URL) + request, err := kncloudevents.NewCloudEventRequest(context, channelAProxy.URL) if err != nil { t.Fatal(err) } From e967f383c43c1efa13d0afa06e21e47cb13ad21d Mon Sep 17 00:00:00 2001 From: Jeevan Date: Tue, 18 Jul 2023 19:33:52 +0530 Subject: [PATCH 7/9] resolving on review commits and fixing test failures --- pkg/inmemorychannel/message_dispatcher_benchmark_test.go | 1 + pkg/inmemorychannel/message_dispatcher_test.go | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/inmemorychannel/message_dispatcher_benchmark_test.go b/pkg/inmemorychannel/message_dispatcher_benchmark_test.go index 866cdca44cc..fa1e63b4d13 100644 --- a/pkg/inmemorychannel/message_dispatcher_benchmark_test.go +++ b/pkg/inmemorychannel/message_dispatcher_benchmark_test.go @@ -91,6 +91,7 @@ func BenchmarkDispatcher_dispatch_ok_through_2_channels(b *testing.B) { dispatcher := NewMessageDispatcher(dispatcherArgs) requestHandler := kncloudevents.CreateHandler(dispatcher.handler) + mockedHTTPClient(clientMock(channelA.URL.Host, transformations.URL.Host, channelB.URL.Host, receiver.URL.Host, requestHandler)) // Start the bench b.ResetTimer() diff --git a/pkg/inmemorychannel/message_dispatcher_test.go b/pkg/inmemorychannel/message_dispatcher_test.go index 626aef35cee..54cbba4f028 100644 --- a/pkg/inmemorychannel/message_dispatcher_test.go +++ b/pkg/inmemorychannel/message_dispatcher_test.go @@ -259,13 +259,13 @@ func TestDispatcher_dispatch(t *testing.T) { dispatcher.WaitReady() // Ok now everything should be ready to send the event - request, err := kncloudevents.NewCloudEventRequest(context, channelAProxy.URL) + request, err := kncloudevents.NewCloudEventRequest(context.TODO(), *mustParseUrlToAddressable(t, channelAProxy.URL)) if err != nil { t.Fatal(err) } event := test.FullEvent() - _ = protocolhttp.WriteRequest(context.Background(), binding.ToMessage(&event), request) + _ = protocolhttp.WriteRequest(context.Background(), binding.ToMessage(&event), request.Request) res, err := request.Send() if err != nil { From 54b3be8a02f09b3aaf17aa841a124940f63fa55f Mon Sep 17 00:00:00 2001 From: Jeevan Date: Sat, 22 Jul 2023 07:27:58 +0530 Subject: [PATCH 8/9] remove the dispatcher benchmark test --- .../message_dispatcher_benchmark_test.go | 155 ------------------ 1 file changed, 155 deletions(-) delete mode 100644 pkg/inmemorychannel/message_dispatcher_benchmark_test.go diff --git a/pkg/inmemorychannel/message_dispatcher_benchmark_test.go b/pkg/inmemorychannel/message_dispatcher_benchmark_test.go deleted file mode 100644 index fa1e63b4d13..00000000000 --- a/pkg/inmemorychannel/message_dispatcher_benchmark_test.go +++ /dev/null @@ -1,155 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package inmemorychannel - -import ( - "context" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/cloudevents/sdk-go/v2/binding" - "github.com/cloudevents/sdk-go/v2/binding/transformer" - protocolhttp "github.com/cloudevents/sdk-go/v2/protocol/http" - "github.com/cloudevents/sdk-go/v2/test" - "go.uber.org/zap" - - "knative.dev/eventing/pkg/channel" - "knative.dev/eventing/pkg/channel/fanout" - "knative.dev/eventing/pkg/channel/multichannelfanout" - "knative.dev/eventing/pkg/kncloudevents" -) - -// This test emulates a real dispatcher usage -// send -> channela -> sub aaaa -> transformationsServer -> channelb -> sub bbbb -> receiver -func BenchmarkDispatcher_dispatch_ok_through_2_channels(b *testing.B) { - logger := zap.NewNop() - reporter := channel.NewStatsReporter("testcontainer", "testpod") - - channelA := mustParseUrlToAddressable(b, "http://channela.svc/") - transformations := mustParseUrlToAddressable(b, "http://transformations.svc/") - channelB := mustParseUrlToAddressable(b, "http://channelb.svc/") - receiver := mustParseUrlToAddressable(b, "http://receiver.svc/") - - // The message flow is: - // send -> channela -> sub aaaa -> transformationsServer -> channelb -> sub bbbb -> receiver - config := multichannelfanout.Config{ - ChannelConfigs: []multichannelfanout.ChannelConfig{ - { - Namespace: "default", - Name: "channela", - HostName: "channela.svc", - FanoutConfig: fanout.Config{ - AsyncHandler: false, - Subscriptions: []fanout.Subscription{{ - Subscriber: *transformations, - Reply: channelB, - }}, - }, - }, - { - Namespace: "default", - Name: "channelb", - HostName: "channelb.svc", - FanoutConfig: fanout.Config{ - AsyncHandler: false, - Subscriptions: []fanout.Subscription{{ - Subscriber: *receiver, - }}, - }, - }, - }, - } - - // Let's mock this stuff! - multiChannelFanoutHandler, err := multichannelfanout.NewMessageHandlerWithConfig(context.TODO(), logger, channel.NewMessageDispatcher(logger), config, reporter) - if err != nil { - b.Fatal(err) - } - dispatcherArgs := &InMemoryMessageDispatcherArgs{ - Port: 8080, - ReadTimeout: 1 * time.Minute, - WriteTimeout: 1 * time.Minute, - Handler: multiChannelFanoutHandler, - Logger: logger, - } - - dispatcher := NewMessageDispatcher(dispatcherArgs) - requestHandler := kncloudevents.CreateHandler(dispatcher.handler) - mockedHTTPClient(clientMock(channelA.URL.Host, transformations.URL.Host, channelB.URL.Host, receiver.URL.Host, requestHandler)) - - // Start the bench - b.ResetTimer() - for i := 0; i < b.N; i++ { - req, _ := kncloudevents.NewCloudEventRequest(context.Background(), *channelA) - - event := test.FullEvent() - _ = protocolhttp.WriteRequest(context.Background(), binding.ToMessage(&event), req.Request) - - _, _ = req.Send() - } -} - -func clientMock(channelAHost string, transformationsHost string, channelBHost string, receiverHost string, channelHandler http.Handler) roundTripFunc { - return func(req *http.Request) *http.Response { - response := httptest.ResponseRecorder{} - if req.URL.Host == channelAHost || req.URL.Host == channelBHost { - channelHandler.ServeHTTP(&response, req) - return response.Result() - } - if req.URL.Host == transformationsHost { - message := protocolhttp.NewMessageFromHttpRequest(req) - defer message.Finish(nil) - - _ = protocolhttp.WriteResponseWriter( - context.Background(), - message, - 200, - &response, - transformer.AddExtension("transformed", "true"), - ) - return response.Result() - } - if req.URL.Host == receiverHost { - transformed := req.Header.Get("ce-transformed") - - if transformed != "true" { - response.WriteHeader(500) - } else { - response.WriteHeader(200) - } - - return response.Result() - } - - response.WriteHeader(404) - return response.Result() - } -} - -type roundTripFunc func(req *http.Request) *http.Response - -func (f roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { - return f(req), nil -} - -func mockedHTTPClient(fn roundTripFunc) *http.Client { - return &http.Client{ - Transport: fn, - } -} From 54abcccca1b68de65e8b5d9fe40eaadc3579ba91 Mon Sep 17 00:00:00 2001 From: Jeevan Date: Tue, 25 Jul 2023 09:13:26 +0530 Subject: [PATCH 9/9] import fix --- pkg/kncloudevents/http_client_new_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/kncloudevents/http_client_new_test.go b/pkg/kncloudevents/http_client_new_test.go index 9410d2346bd..f9d9cd87380 100644 --- a/pkg/kncloudevents/http_client_new_test.go +++ b/pkg/kncloudevents/http_client_new_test.go @@ -21,9 +21,9 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.opencensus.io/plugin/ochttp" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" - "go.opencensus.io/plugin/ochttp" ) var ( @@ -149,4 +149,4 @@ func Test_ConfigureConnectionArgs(t *testing.T) { func castToTransport(client *nethttp.Client) *nethttp.Transport { return client.Transport.(*ochttp.Transport).Base.(*nethttp.Transport) -} \ No newline at end of file +}