diff --git a/engine/pkg/externalresource/broker/broker_test.go b/engine/pkg/externalresource/broker/broker_test.go index aa470dc1f3c..03fa5da9b04 100644 --- a/engine/pkg/externalresource/broker/broker_test.go +++ b/engine/pkg/externalresource/broker/broker_test.go @@ -46,12 +46,16 @@ func TestBrokerOpenNewStorage(t *testing.T) { brk, cli, dir := newBroker(t) defer brk.Close() + resID := "/local/test-1" + _, resName, err := resModel.ParseResourceID(resID) + require.NoError(t, err) + cli.On("QueryResource", mock.Anything, - &pb.QueryResourceRequest{ResourceKey: &pb.ResourceKey{JobId: "job-1", ResourceId: "/local/test-1"}}, mock.Anything). + &pb.QueryResourceRequest{ResourceKey: &pb.ResourceKey{JobId: "job-1", ResourceId: resID}}, mock.Anything). Return((*pb.QueryResourceResponse)(nil), status.Error(codes.NotFound, "resource manager error")) - hdl, err := brk.OpenStorage(context.Background(), fakeProjectInfo, "worker-1", "job-1", "/local/test-1") + hdl, err := brk.OpenStorage(context.Background(), fakeProjectInfo, "worker-1", "job-1", resID) require.NoError(t, err) - require.Equal(t, "/local/test-1", hdl.ID()) + require.Equal(t, resID, hdl.ID()) cli.AssertExpectations(t) cli.ExpectedCalls = nil @@ -64,7 +68,7 @@ func TestBrokerOpenNewStorage(t *testing.T) { cli.On("CreateResource", mock.Anything, &pb.CreateResourceRequest{ ProjectInfo: &pb.ProjectInfo{TenantId: fakeProjectInfo.TenantID(), ProjectId: fakeProjectInfo.ProjectID()}, - ResourceId: "/local/test-1", + ResourceId: resID, CreatorExecutor: "executor-1", JobId: "job-1", CreatorWorkerId: "worker-1", @@ -75,7 +79,7 @@ func TestBrokerOpenNewStorage(t *testing.T) { cli.AssertExpectations(t) - local.AssertLocalFileExists(t, dir, "worker-1", "test-1", "1.txt") + local.AssertLocalFileExists(t, dir, "worker-1", resName, "1.txt") } func TestBrokerOpenExistingStorage(t *testing.T) { @@ -84,12 +88,15 @@ func TestBrokerOpenExistingStorage(t *testing.T) { brk, cli, dir := newBroker(t) defer brk.Close() + resID := "/local/test-2" + _, resName, err := resModel.ParseResourceID(resID) + require.NoError(t, err) cli.On("QueryResource", mock.Anything, - &pb.QueryResourceRequest{ResourceKey: &pb.ResourceKey{JobId: "job-1", ResourceId: "/local/test-2"}}, mock.Anything). + &pb.QueryResourceRequest{ResourceKey: &pb.ResourceKey{JobId: "job-1", ResourceId: resID}}, mock.Anything). Return((*pb.QueryResourceResponse)(nil), status.Error(codes.NotFound, "resource manager error")).Once() cli.On("CreateResource", mock.Anything, &pb.CreateResourceRequest{ ProjectInfo: &pb.ProjectInfo{TenantId: fakeProjectInfo.TenantID(), ProjectId: fakeProjectInfo.ProjectID()}, - ResourceId: "/local/test-2", + ResourceId: resID, CreatorExecutor: "executor-1", JobId: "job-1", CreatorWorkerId: "worker-2", @@ -100,23 +107,23 @@ func TestBrokerOpenExistingStorage(t *testing.T) { fakeProjectInfo, "worker-2", "job-1", - "/local/test-2") + resID) require.NoError(t, err) err = hdl.Persist(context.Background()) require.NoError(t, err) cli.On("QueryResource", mock.Anything, - &pb.QueryResourceRequest{ResourceKey: &pb.ResourceKey{JobId: "job-1", ResourceId: "/local/test-2"}}, mock.Anything). + &pb.QueryResourceRequest{ResourceKey: &pb.ResourceKey{JobId: "job-1", ResourceId: resID}}, mock.Anything). Return(&pb.QueryResourceResponse{ CreatorExecutor: "executor-1", JobId: "job-1", CreatorWorkerId: "worker-2", }, nil) - hdl, err = brk.OpenStorage(context.Background(), fakeProjectInfo, "worker-1", "job-1", "/local/test-2") + hdl, err = brk.OpenStorage(context.Background(), fakeProjectInfo, "worker-1", "job-1", resID) require.NoError(t, err) - require.Equal(t, "/local/test-2", hdl.ID()) + require.Equal(t, resID, hdl.ID()) cli.AssertExpectations(t) @@ -126,7 +133,7 @@ func TestBrokerOpenExistingStorage(t *testing.T) { err = f.Close(context.Background()) require.NoError(t, err) - local.AssertLocalFileExists(t, dir, "worker-2", "test-2", "1.txt") + local.AssertLocalFileExists(t, dir, "worker-2", resName, "1.txt") } func TestBrokerRemoveResource(t *testing.T) { @@ -134,7 +141,8 @@ func TestBrokerRemoveResource(t *testing.T) { brk, _, dir := newBroker(t) defer brk.Close() - resPath := filepath.Join(dir, "worker-1", local.ResourceNameToFilePathName("resource-1")) + resName := resModel.EncodeResourceName("resource-1") + resPath := filepath.Join(dir, "worker-1", local.ResourceNameToFilePathName(resName)) err := os.MkdirAll(resPath, 0o700) require.NoError(t, err) diff --git a/engine/pkg/externalresource/broker/storage_handle_test.go b/engine/pkg/externalresource/broker/storage_handle_test.go index 0eb4163cb9d..fe993d83fef 100644 --- a/engine/pkg/externalresource/broker/storage_handle_test.go +++ b/engine/pkg/externalresource/broker/storage_handle_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tiflow/engine/pkg/externalresource/internal/local" "github.com/pingcap/tiflow/engine/pkg/externalresource/manager" "github.com/pingcap/tiflow/engine/pkg/externalresource/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/pingcap/tiflow/engine/pkg/tenant" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -29,7 +30,7 @@ import ( func newResourceIdentForTesting(executor, workerID, resourceName string) internal.ResourceIdent { return internal.ResourceIdent{ - Name: resourceName, + Name: resModel.EncodeResourceName(resourceName), ResourceScope: internal.ResourceScope{ ProjectInfo: tenant.NewProjectInfo("fakeTenant", "fakeProject"), Executor: model.ExecutorID(executor), diff --git a/engine/pkg/externalresource/internal/s3/file_manager.go b/engine/pkg/externalresource/internal/s3/file_manager.go index 20e7a7e317e..e4fb13b1c49 100644 --- a/engine/pkg/externalresource/internal/s3/file_manager.go +++ b/engine/pkg/externalresource/internal/s3/file_manager.go @@ -164,7 +164,7 @@ func (m *FileManager) removeTemporaryFilesForExecutor( ctx context.Context, scope internal.ResourceScope, ) error { // Get all persisted files which is created by current executor. - persistedResSet := make(map[resModel.ResourceName]struct{}) + persistedResSet := make(map[string]struct{}) m.mu.RLock() for workerID, resources := range m.persistedResMap { @@ -183,7 +183,7 @@ func (m *FileManager) removeTemporaryFilesForExecutor( func (m *FileManager) removeAllTemporaryFilesByMeta( ctx context.Context, scope internal.ResourceScope, - persistedResSet map[resModel.ResourceName]struct{}, + persistedResSet map[string]struct{}, ) error { log.Info("Removing temporary resources for executor", zap.Any("scope", scope)) diff --git a/engine/pkg/externalresource/internal/s3/file_path_util.go b/engine/pkg/externalresource/internal/s3/file_path_util.go index 4143ea74852..9bfb229f763 100644 --- a/engine/pkg/externalresource/internal/s3/file_path_util.go +++ b/engine/pkg/externalresource/internal/s3/file_path_util.go @@ -37,24 +37,24 @@ func getPathPredByName(target string) pathPredFunc { } func getPathPredByPersistedResources( - resources persistedResources, prefixCnt int, + resourcePaths map[string]struct{}, prefixCnt int, ) pathPredFunc { return func(path string) bool { - resName := "" + resPath := "" for i := 0; i < prefixCnt; i++ { prefix, newPath, ok := strings.Cut(path, "/") if !ok { return false } - if resName == "" { - resName = prefix + if resPath == "" { + resPath = prefix } else { - resName = fmt.Sprintf("%s/%s", resName, prefix) + resPath = fmt.Sprintf("%s/%s", resPath, prefix) } path = newPath } - _, ok := resources[resName] + _, ok := resourcePaths[resPath] return !ok } } diff --git a/engine/pkg/externalresource/internal/s3/resource_controller_test.go b/engine/pkg/externalresource/internal/s3/resource_controller_test.go index 08602948d58..0fc4d0e147a 100644 --- a/engine/pkg/externalresource/internal/s3/resource_controller_test.go +++ b/engine/pkg/externalresource/internal/s3/resource_controller_test.go @@ -34,9 +34,25 @@ func TestS3ResourceController(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), caseTimeout) defer cancel() + temproraryResNames := make([]resModel.ResourceName, numTemporaryResources) + for i := 0; i < numTemporaryResources; i++ { + resID := fmt.Sprintf("/s3/temporary-resource-%d", i) + _, resName, err := resModel.ParseResourceID(resID) + require.NoError(t, err) + temproraryResNames[i] = resName + } + + persistedResNames := make([]resModel.ResourceName, numPersistedResources) + persistedResMetas := []*resModel.ResourceMeta{} + for i := 0; i < numPersistedResources; i++ { + resID := fmt.Sprintf("/s3/persisted-resource-%d", i) + _, resName, err := resModel.ParseResourceID(resID) + require.NoError(t, err) + persistedResNames[i] = resName + } + fm, factory := NewFileManagerForUT(t.TempDir(), MockExecutorID) workers := []string{"worker-1", "worker-2", "worker-3"} - persistedResources := []*resModel.ResourceMeta{} // generate mock data for _, worker := range workers { scope := internal.ResourceScope{ @@ -44,19 +60,10 @@ func TestS3ResourceController(t *testing.T) { WorkerID: worker, } - for i := 0; i < numTemporaryResources; i++ { - _, err := fm.CreateResource(ctx, internal.ResourceIdent{ - ResourceScope: scope, - Name: fmt.Sprintf("temp-resource-%d", i), - }) - require.NoError(t, err) - } - - for i := 0; i < numPersistedResources; i++ { - name := fmt.Sprintf("persisted-resource-%d", i) + for _, persistedResName := range persistedResNames { ident := internal.ResourceIdent{ ResourceScope: scope, - Name: name, + Name: persistedResName, } _, err := fm.CreateResource(ctx, ident) require.NoError(t, err) @@ -64,12 +71,20 @@ func TestS3ResourceController(t *testing.T) { err = fm.SetPersisted(ctx, ident) require.NoError(t, err) - persistedResources = append(persistedResources, &resModel.ResourceMeta{ - ID: "/s3/" + name, - Executor: ident.Executor, + persistedResMetas = append(persistedResMetas, &resModel.ResourceMeta{ + ID: resModel.BuildResourceID(resModel.ResourceTypeS3, persistedResName), + Executor: MockExecutorID, Worker: worker, }) } + + for _, tempResName := range temproraryResNames { + _, err := fm.CreateResource(ctx, internal.ResourceIdent{ + ResourceScope: scope, + Name: tempResName, + }) + require.NoError(t, err) + } } checkWorker := func(worker string, removed bool) { @@ -77,19 +92,19 @@ func TestS3ResourceController(t *testing.T) { Executor: MockExecutorID, WorkerID: worker, } - for i := 0; i < numPersistedResources; i++ { + for _, persistedResName := range persistedResNames { ident := internal.ResourceIdent{ ResourceScope: scope, - Name: fmt.Sprintf("persisted-resource-%d", i), + Name: persistedResName, } _, err := fm.GetPersistedResource(ctx, ident) require.NoError(t, err) } - for i := 0; i < numTemporaryResources; i++ { + for _, tempResName := range temproraryResNames { _, err := fm.GetPersistedResource(ctx, internal.ResourceIdent{ ResourceScope: scope, - Name: fmt.Sprintf("temp-resource-%d", i), + Name: tempResName, }) if removed { require.ErrorContains(t, err, "ResourceFilesNotFoundError") @@ -106,7 +121,7 @@ func TestS3ResourceController(t *testing.T) { fm1 := newFileManagerForUTFromSharedStorageFactory("leader-controller", factory) controller := &resourceController{fm: fm1} gcExecutor := func() { - err := controller.GCExecutor(ctx, persistedResources, MockExecutorID) + err := controller.GCExecutor(ctx, persistedResMetas, MockExecutorID) require.NoError(t, err) checkWorker(workers[0], true) checkWorker(workers[1], true) @@ -117,7 +132,7 @@ func TestS3ResourceController(t *testing.T) { gcExecutor() // test GCSingleResource - for _, res := range persistedResources { + for _, res := range persistedResMetas { _, resName, err := resModel.ParseResourceID(res.ID) require.NoError(t, err) ident := internal.ResourceIdent{ diff --git a/engine/pkg/externalresource/model/model.go b/engine/pkg/externalresource/model/model.go index fc57debbd47..2cd3177dd58 100644 --- a/engine/pkg/externalresource/model/model.go +++ b/engine/pkg/externalresource/model/model.go @@ -14,7 +14,9 @@ package model import ( + "encoding/hex" "fmt" + "log" "path" "strings" @@ -23,6 +25,7 @@ import ( ormModel "github.com/pingcap/tiflow/engine/pkg/orm/model" "github.com/pingcap/tiflow/engine/pkg/tenant" "github.com/pingcap/tiflow/pkg/errors" + "go.uber.org/zap" ) type ( @@ -165,10 +168,31 @@ func ParseResourceID(rpath ResourceID) (ResourceType, ResourceName, error) { } suffix := path.Join(segments[1:]...) - return resourceType, suffix, nil + return resourceType, EncodeResourceName(suffix), nil } // BuildResourceID returns an ResourceID based on given ResourceType and ResourceName. -func BuildResourceID(rtype ResourceType, name ResourceName) ResourceID { +func BuildResourceID(rtype ResourceType, resName ResourceName) ResourceID { + name, err := DecodeResourceName(resName) + if err != nil { + log.Panic("invalid resource name", zap.Error(err)) + } return path.Join("/"+string(rtype), name) } + +// EncodeResourceName encodes raw resource name to a valid resource name. +func EncodeResourceName(resNameStr string) ResourceName { + prefix := hex.EncodeToString([]byte(resNameStr)) + suffix := strings.ReplaceAll(resNameStr, "/", "-") + return ResourceName(prefix + "-" + suffix) +} + +// DecodeResourceName decodes resource name to raw resource name. +func DecodeResourceName(encodedName ResourceName) (string, error) { + prefix, _, _ := strings.Cut(encodedName, "-") + result, err := hex.DecodeString(prefix) + if err != nil { + return "", err + } + return string(result), nil +} diff --git a/engine/pkg/externalresource/model/model_test.go b/engine/pkg/externalresource/model/model_test.go index 6c9bc7bfeb2..89a77830fdb 100644 --- a/engine/pkg/externalresource/model/model_test.go +++ b/engine/pkg/externalresource/model/model_test.go @@ -23,14 +23,26 @@ func TestParseResource(t *testing.T) { tp, suffix, err := ParseResourceID("/local/my-local-resource/a/b/c") require.NoError(t, err) require.Equal(t, ResourceTypeLocalFile, tp) - require.Equal(t, "my-local-resource/a/b/c", suffix) + rawResName, err := DecodeResourceName(suffix) + require.NoError(t, err) + require.Equal(t, "my-local-resource/a/b/c", rawResName) require.Equal(t, "/local/my-local-resource/a/b/c", BuildResourceID(tp, suffix)) tp, suffix, err = ParseResourceID("/s3/my-local-resource/a/b/c") require.NoError(t, err) require.Equal(t, ResourceTypeS3, tp) - require.Equal(t, "my-local-resource/a/b/c", suffix) + rawResName, err = DecodeResourceName(suffix) + require.NoError(t, err) + require.Equal(t, "my-local-resource/a/b/c", rawResName) require.Equal(t, "/s3/my-local-resource/a/b/c", BuildResourceID(tp, suffix)) } + +func TestResourceName(t *testing.T) { + name := "resource-1" + resName := EncodeResourceName(name) + parsedName, err := DecodeResourceName(resName) + require.NoError(t, err) + require.Equal(t, name, parsedName) +}