Skip to content

Commit

Permalink
pass transformers to executeRequest (knative#5512)
Browse files Browse the repository at this point in the history
  • Loading branch information
skonto committed Jun 16, 2021
1 parent 91c5291 commit 11be12f
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions pkg/channel/message_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ func (d *MessageDispatcherImpl) DispatchMessageWithRetries(ctx context.Context,
// Try to send to destination
messagesToFinish = append(messagesToFinish, message)

ctx, responseMessage, responseAdditionalHeaders, dispatchExecutionInfo, err = d.executeRequest(ctx, destination, message, additionalHeaders, retriesConfig)
ctx, responseMessage, responseAdditionalHeaders, dispatchExecutionInfo, err = d.executeRequest(ctx, destination, message, additionalHeaders, retriesConfig, transformers...)
if err != nil {
// If DeadLetter is configured, then send original message with knative error extensions
if deadLetter != nil {
transformers := d.dispatchExecutionInfoTransformers(dispatchExecutionInfo)
_, deadLetterResponse, _, dispatchExecutionInfo, deadLetterErr := d.executeRequest(ctx, deadLetter, message, additionalHeaders, retriesConfig, transformers...)
dispatchTransformers := d.dispatchExecutionInfoTransformers(dispatchExecutionInfo)
_, deadLetterResponse, _, dispatchExecutionInfo, deadLetterErr := d.executeRequest(ctx, deadLetter, message, additionalHeaders, retriesConfig, append(transformers, dispatchTransformers)...)
if deadLetterErr != nil {
return dispatchExecutionInfo, fmt.Errorf("unable to complete request to either %s (%v) or %s (%v)", destination, err, deadLetter, deadLetterErr)
}
Expand Down Expand Up @@ -158,8 +158,8 @@ func (d *MessageDispatcherImpl) DispatchMessageWithRetries(ctx context.Context,
if err != nil {
// If DeadLetter is configured, then send original message with knative error extensions
if deadLetter != nil {
transformers := d.dispatchExecutionInfoTransformers(dispatchExecutionInfo)
_, deadLetterResponse, _, dispatchExecutionInfo, deadLetterErr := d.executeRequest(ctx, deadLetter, message, responseAdditionalHeaders, retriesConfig, transformers...)
dispatchTransformers := d.dispatchExecutionInfoTransformers(dispatchExecutionInfo)
_, deadLetterResponse, _, dispatchExecutionInfo, deadLetterErr := d.executeRequest(ctx, deadLetter, message, responseAdditionalHeaders, retriesConfig, append(transformers, dispatchTransformers)...)
if deadLetterErr != nil {
return dispatchExecutionInfo, fmt.Errorf("failed to forward reply to %s (%v) and failed to send it to the dead letter sink %s (%v)", reply, err, deadLetter, deadLetterErr)
}
Expand Down

0 comments on commit 11be12f

Please sign in to comment.