Skip to content

Commit

Permalink
This is an automated cherry-pick of #11747
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Nov 16, 2024
1 parent 0c49d2a commit 99abc5a
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 17 deletions.
2 changes: 1 addition & 1 deletion cdc/redo/writer/memory/encoding_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) {
zap.String("namespace", e.changefeed.Namespace),
zap.String("changefeed", e.changefeed.ID),
zap.Error(err))
if err != nil && errors.Cause(err) != context.Canceled {
if err != nil {
e.closed <- err
}
close(e.closed)
Expand Down
26 changes: 22 additions & 4 deletions cdc/redo/writer/memory/mem_log_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -99,10 +100,27 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) {
})
require.NoError(t, err)

require.ErrorIs(t, lw.Close(), context.Canceled)
// duplicate close should return the same error
require.ErrorIs(t, lw.Close(), context.Canceled)

err = lw.WriteEvents(ctx, events...)
require.NoError(t, err)
err = lw.FlushLog(ctx)
require.NoError(t, err)
functions := map[string]func(error){
"WriteEvents": func(expected error) {
err := lw.WriteEvents(ctx, events...)
require.ErrorIs(t, errors.Cause(err), expected)
},
"FlushLog": func(expected error) {
err := lw.FlushLog(ctx)
require.ErrorIs(t, errors.Cause(err), expected)
},
}
firstCall := true
for _, f := range functions {
if firstCall {
firstCall = false
f(context.Canceled)
} else {
f(nil)
}
}
}
2 changes: 1 addition & 1 deletion pkg/redo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func IsBlackholeStorage(scheme string) bool {

// InitExternalStorage init an external storage.
var InitExternalStorage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) {
s, err := util.GetExternalStorageWithTimeout(ctx, uri.String(), DefaultTimeout)
s, err := util.GetExternalStorageWithDefaultTimeout(ctx, uri.String())
if err != nil {
return nil, errors.WrapError(errors.ErrStorageInitialize, err,
fmt.Sprintf("can't init external storage for %s", uri.String()))
Expand Down
8 changes: 4 additions & 4 deletions pkg/sink/kafka/claimcheck/claim_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ import (
"go.uber.org/zap"
)

const (
defaultTimeout = 5 * time.Minute
)

// ClaimCheck manage send message to the claim-check external storage.
type ClaimCheck struct {
storage storage.ExternalStorage
Expand All @@ -54,7 +50,11 @@ func New(ctx context.Context, storageURI string, changefeedID model.ChangeFeedID
zap.String("storageURI", util.MaskSensitiveDataInURI(storageURI)))

start := time.Now()
<<<<<<< HEAD
externalStorage, err := util.GetExternalStorageWithTimeout(ctx, storageURI, defaultTimeout)
=======
externalStorage, err := util.GetExternalStorageWithDefaultTimeout(ctx, config.ClaimCheckStorageURI)
>>>>>>> ea3567726f (ticdc(redo, sink): return correct error in redo writer & fix default retryer (#11747))
if err != nil {
log.Error("create external storage failed",
zap.String("namespace", changefeedID.Namespace),
Expand Down
16 changes: 9 additions & 7 deletions pkg/util/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,27 @@ import (
"golang.org/x/sync/errgroup"
)

const defaultTimeout = 5 * time.Minute

// GetExternalStorageFromURI creates a new storage.ExternalStorage from a uri.
func GetExternalStorageFromURI(
ctx context.Context, uri string,
) (storage.ExternalStorage, error) {
return GetExternalStorage(ctx, uri, nil, DefaultS3Retryer())
}

// GetExternalStorageWithTimeout creates a new storage.ExternalStorage from a uri
// GetExternalStorageWithDefaultTimeout creates a new storage.ExternalStorage from a uri
// without retry. It is the caller's responsibility to set timeout to the context.
func GetExternalStorageWithTimeout(
ctx context.Context, uri string, timeout time.Duration,
) (storage.ExternalStorage, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
func GetExternalStorageWithDefaultTimeout(ctx context.Context, uri string) (storage.ExternalStorage, error) {
ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
s, err := GetExternalStorage(ctx, uri, nil, nil)
// total retry time is [1<<7, 1<<8] = [128, 256] + 30*6 = [308, 436] seconds
r := NewS3Retryer(7, 1*time.Second, 2*time.Second)
s, err := GetExternalStorage(ctx, uri, nil, r)

return &extStorageWithTimeout{
ExternalStorage: s,
timeout: timeout,
timeout: defaultTimeout,
}, err
}

Expand Down

0 comments on commit 99abc5a

Please sign in to comment.