Skip to content

Commit

Permalink
encode resName
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Oct 16, 2022
1 parent 2626382 commit 3694c50
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 47 deletions.
34 changes: 21 additions & 13 deletions engine/pkg/externalresource/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,16 @@ func TestBrokerOpenNewStorage(t *testing.T) {
brk, cli, dir := newBroker(t)
defer brk.Close()

resID := "/local/test-1"
_, resName, err := resModel.ParseResourceID(resID)
require.NoError(t, err)

cli.On("QueryResource", mock.Anything,
&pb.QueryResourceRequest{ResourceKey: &pb.ResourceKey{JobId: "job-1", ResourceId: "/local/test-1"}}, mock.Anything).
&pb.QueryResourceRequest{ResourceKey: &pb.ResourceKey{JobId: "job-1", ResourceId: resID}}, mock.Anything).
Return((*pb.QueryResourceResponse)(nil), status.Error(codes.NotFound, "resource manager error"))
hdl, err := brk.OpenStorage(context.Background(), fakeProjectInfo, "worker-1", "job-1", "/local/test-1")
hdl, err := brk.OpenStorage(context.Background(), fakeProjectInfo, "worker-1", "job-1", resID)
require.NoError(t, err)
require.Equal(t, "/local/test-1", hdl.ID())
require.Equal(t, resID, hdl.ID())

cli.AssertExpectations(t)
cli.ExpectedCalls = nil
Expand All @@ -64,7 +68,7 @@ func TestBrokerOpenNewStorage(t *testing.T) {

cli.On("CreateResource", mock.Anything, &pb.CreateResourceRequest{
ProjectInfo: &pb.ProjectInfo{TenantId: fakeProjectInfo.TenantID(), ProjectId: fakeProjectInfo.ProjectID()},
ResourceId: "/local/test-1",
ResourceId: resID,
CreatorExecutor: "executor-1",
JobId: "job-1",
CreatorWorkerId: "worker-1",
Expand All @@ -75,7 +79,7 @@ func TestBrokerOpenNewStorage(t *testing.T) {

cli.AssertExpectations(t)

local.AssertLocalFileExists(t, dir, "worker-1", "test-1", "1.txt")
local.AssertLocalFileExists(t, dir, "worker-1", resName, "1.txt")
}

func TestBrokerOpenExistingStorage(t *testing.T) {
Expand All @@ -84,12 +88,15 @@ func TestBrokerOpenExistingStorage(t *testing.T) {
brk, cli, dir := newBroker(t)
defer brk.Close()

resID := "/local/test-2"
_, resName, err := resModel.ParseResourceID(resID)
require.NoError(t, err)
cli.On("QueryResource", mock.Anything,
&pb.QueryResourceRequest{ResourceKey: &pb.ResourceKey{JobId: "job-1", ResourceId: "/local/test-2"}}, mock.Anything).
&pb.QueryResourceRequest{ResourceKey: &pb.ResourceKey{JobId: "job-1", ResourceId: resID}}, mock.Anything).
Return((*pb.QueryResourceResponse)(nil), status.Error(codes.NotFound, "resource manager error")).Once()
cli.On("CreateResource", mock.Anything, &pb.CreateResourceRequest{
ProjectInfo: &pb.ProjectInfo{TenantId: fakeProjectInfo.TenantID(), ProjectId: fakeProjectInfo.ProjectID()},
ResourceId: "/local/test-2",
ResourceId: resID,
CreatorExecutor: "executor-1",
JobId: "job-1",
CreatorWorkerId: "worker-2",
Expand All @@ -100,23 +107,23 @@ func TestBrokerOpenExistingStorage(t *testing.T) {
fakeProjectInfo,
"worker-2",
"job-1",
"/local/test-2")
resID)
require.NoError(t, err)

err = hdl.Persist(context.Background())
require.NoError(t, err)

cli.On("QueryResource", mock.Anything,
&pb.QueryResourceRequest{ResourceKey: &pb.ResourceKey{JobId: "job-1", ResourceId: "/local/test-2"}}, mock.Anything).
&pb.QueryResourceRequest{ResourceKey: &pb.ResourceKey{JobId: "job-1", ResourceId: resID}}, mock.Anything).
Return(&pb.QueryResourceResponse{
CreatorExecutor: "executor-1",
JobId: "job-1",
CreatorWorkerId: "worker-2",
}, nil)

hdl, err = brk.OpenStorage(context.Background(), fakeProjectInfo, "worker-1", "job-1", "/local/test-2")
hdl, err = brk.OpenStorage(context.Background(), fakeProjectInfo, "worker-1", "job-1", resID)
require.NoError(t, err)
require.Equal(t, "/local/test-2", hdl.ID())
require.Equal(t, resID, hdl.ID())

cli.AssertExpectations(t)

Expand All @@ -126,15 +133,16 @@ func TestBrokerOpenExistingStorage(t *testing.T) {
err = f.Close(context.Background())
require.NoError(t, err)

local.AssertLocalFileExists(t, dir, "worker-2", "test-2", "1.txt")
local.AssertLocalFileExists(t, dir, "worker-2", resName, "1.txt")
}

func TestBrokerRemoveResource(t *testing.T) {
t.Parallel()
brk, _, dir := newBroker(t)
defer brk.Close()

resPath := filepath.Join(dir, "worker-1", local.ResourceNameToFilePathName("resource-1"))
resName := resModel.EncodeResourceName("resource-1")
resPath := filepath.Join(dir, "worker-1", local.ResourceNameToFilePathName(resName))
err := os.MkdirAll(resPath, 0o700)
require.NoError(t, err)

Expand Down
3 changes: 2 additions & 1 deletion engine/pkg/externalresource/broker/storage_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ import (
"github.com/pingcap/tiflow/engine/pkg/externalresource/internal/local"
"github.com/pingcap/tiflow/engine/pkg/externalresource/manager"
"github.com/pingcap/tiflow/engine/pkg/externalresource/model"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model"
"github.com/pingcap/tiflow/engine/pkg/tenant"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func newResourceIdentForTesting(executor, workerID, resourceName string) internal.ResourceIdent {
return internal.ResourceIdent{
Name: resourceName,
Name: resModel.EncodeResourceName(resourceName),
ResourceScope: internal.ResourceScope{
ProjectInfo: tenant.NewProjectInfo("fakeTenant", "fakeProject"),
Executor: model.ExecutorID(executor),
Expand Down
4 changes: 2 additions & 2 deletions engine/pkg/externalresource/internal/s3/file_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (m *FileManager) removeTemporaryFilesForExecutor(
ctx context.Context, scope internal.ResourceScope,
) error {
// Get all persisted files which is created by current executor.
persistedResSet := make(map[resModel.ResourceName]struct{})
persistedResSet := make(map[string]struct{})

m.mu.RLock()
for workerID, resources := range m.persistedResMap {
Expand All @@ -183,7 +183,7 @@ func (m *FileManager) removeTemporaryFilesForExecutor(
func (m *FileManager) removeAllTemporaryFilesByMeta(
ctx context.Context,
scope internal.ResourceScope,
persistedResSet map[resModel.ResourceName]struct{},
persistedResSet map[string]struct{},
) error {
log.Info("Removing temporary resources for executor", zap.Any("scope", scope))

Expand Down
12 changes: 6 additions & 6 deletions engine/pkg/externalresource/internal/s3/file_path_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,24 @@ func getPathPredByName(target string) pathPredFunc {
}

func getPathPredByPersistedResources(
resources persistedResources, prefixCnt int,
resourcePaths map[string]struct{}, prefixCnt int,
) pathPredFunc {
return func(path string) bool {
resName := ""
resPath := ""
for i := 0; i < prefixCnt; i++ {
prefix, newPath, ok := strings.Cut(path, "/")
if !ok {
return false
}

if resName == "" {
resName = prefix
if resPath == "" {
resPath = prefix
} else {
resName = fmt.Sprintf("%s/%s", resName, prefix)
resPath = fmt.Sprintf("%s/%s", resPath, prefix)
}
path = newPath
}
_, ok := resources[resName]
_, ok := resourcePaths[resPath]
return !ok
}
}
57 changes: 36 additions & 21 deletions engine/pkg/externalresource/internal/s3/resource_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,62 +34,77 @@ func TestS3ResourceController(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), caseTimeout)
defer cancel()

temproraryResNames := make([]resModel.ResourceName, numTemporaryResources)
for i := 0; i < numTemporaryResources; i++ {
resID := fmt.Sprintf("/s3/temporary-resource-%d", i)
_, resName, err := resModel.ParseResourceID(resID)
require.NoError(t, err)
temproraryResNames[i] = resName
}

persistedResNames := make([]resModel.ResourceName, numPersistedResources)
persistedResMetas := []*resModel.ResourceMeta{}
for i := 0; i < numPersistedResources; i++ {
resID := fmt.Sprintf("/s3/persisted-resource-%d", i)
_, resName, err := resModel.ParseResourceID(resID)
require.NoError(t, err)
persistedResNames[i] = resName
}

fm, factory := NewFileManagerForUT(t.TempDir(), MockExecutorID)
workers := []string{"worker-1", "worker-2", "worker-3"}
persistedResources := []*resModel.ResourceMeta{}
// generate mock data
for _, worker := range workers {
scope := internal.ResourceScope{
Executor: MockExecutorID,
WorkerID: worker,
}

for i := 0; i < numTemporaryResources; i++ {
_, err := fm.CreateResource(ctx, internal.ResourceIdent{
ResourceScope: scope,
Name: fmt.Sprintf("temp-resource-%d", i),
})
require.NoError(t, err)
}

for i := 0; i < numPersistedResources; i++ {
name := fmt.Sprintf("persisted-resource-%d", i)
for _, persistedResName := range persistedResNames {
ident := internal.ResourceIdent{
ResourceScope: scope,
Name: name,
Name: persistedResName,
}
_, err := fm.CreateResource(ctx, ident)
require.NoError(t, err)

err = fm.SetPersisted(ctx, ident)
require.NoError(t, err)

persistedResources = append(persistedResources, &resModel.ResourceMeta{
ID: "/s3/" + name,
Executor: ident.Executor,
persistedResMetas = append(persistedResMetas, &resModel.ResourceMeta{
ID: resModel.BuildResourceID(resModel.ResourceTypeS3, persistedResName),
Executor: MockExecutorID,
Worker: worker,
})
}

for _, tempResName := range temproraryResNames {
_, err := fm.CreateResource(ctx, internal.ResourceIdent{
ResourceScope: scope,
Name: tempResName,
})
require.NoError(t, err)
}
}

checkWorker := func(worker string, removed bool) {
scope := internal.ResourceScope{
Executor: MockExecutorID,
WorkerID: worker,
}
for i := 0; i < numPersistedResources; i++ {
for _, persistedResName := range persistedResNames {
ident := internal.ResourceIdent{
ResourceScope: scope,
Name: fmt.Sprintf("persisted-resource-%d", i),
Name: persistedResName,
}
_, err := fm.GetPersistedResource(ctx, ident)
require.NoError(t, err)
}

for i := 0; i < numTemporaryResources; i++ {
for _, tempResName := range temproraryResNames {
_, err := fm.GetPersistedResource(ctx, internal.ResourceIdent{
ResourceScope: scope,
Name: fmt.Sprintf("temp-resource-%d", i),
Name: tempResName,
})
if removed {
require.ErrorContains(t, err, "ResourceFilesNotFoundError")
Expand All @@ -106,7 +121,7 @@ func TestS3ResourceController(t *testing.T) {
fm1 := newFileManagerForUTFromSharedStorageFactory("leader-controller", factory)
controller := &resourceController{fm: fm1}
gcExecutor := func() {
err := controller.GCExecutor(ctx, persistedResources, MockExecutorID)
err := controller.GCExecutor(ctx, persistedResMetas, MockExecutorID)
require.NoError(t, err)
checkWorker(workers[0], true)
checkWorker(workers[1], true)
Expand All @@ -117,7 +132,7 @@ func TestS3ResourceController(t *testing.T) {
gcExecutor()

// test GCSingleResource
for _, res := range persistedResources {
for _, res := range persistedResMetas {
_, resName, err := resModel.ParseResourceID(res.ID)
require.NoError(t, err)
ident := internal.ResourceIdent{
Expand Down
28 changes: 26 additions & 2 deletions engine/pkg/externalresource/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package model

import (
"encoding/hex"
"fmt"
"log"
"path"
"strings"

Expand All @@ -23,6 +25,7 @@ import (
ormModel "github.com/pingcap/tiflow/engine/pkg/orm/model"
"github.com/pingcap/tiflow/engine/pkg/tenant"
"github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

type (
Expand Down Expand Up @@ -165,10 +168,31 @@ func ParseResourceID(rpath ResourceID) (ResourceType, ResourceName, error) {
}

suffix := path.Join(segments[1:]...)
return resourceType, suffix, nil
return resourceType, EncodeResourceName(suffix), nil
}

// BuildResourceID returns an ResourceID based on given ResourceType and ResourceName.
func BuildResourceID(rtype ResourceType, name ResourceName) ResourceID {
func BuildResourceID(rtype ResourceType, resName ResourceName) ResourceID {
name, err := DecodeResourceName(resName)
if err != nil {
log.Panic("invalid resource name", zap.Error(err))
}
return path.Join("/"+string(rtype), name)
}

// EncodeResourceName encodes raw resource name to a valid resource name.
func EncodeResourceName(resNameStr string) ResourceName {
prefix := hex.EncodeToString([]byte(resNameStr))
suffix := strings.ReplaceAll(resNameStr, "/", "-")
return ResourceName(prefix + "-" + suffix)
}

// DecodeResourceName decodes resource name to raw resource name.
func DecodeResourceName(encodedName ResourceName) (string, error) {
prefix, _, _ := strings.Cut(encodedName, "-")
result, err := hex.DecodeString(prefix)
if err != nil {
return "", err
}
return string(result), nil
}
16 changes: 14 additions & 2 deletions engine/pkg/externalresource/model/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,26 @@ func TestParseResource(t *testing.T) {
tp, suffix, err := ParseResourceID("/local/my-local-resource/a/b/c")
require.NoError(t, err)
require.Equal(t, ResourceTypeLocalFile, tp)
require.Equal(t, "my-local-resource/a/b/c", suffix)
rawResName, err := DecodeResourceName(suffix)
require.NoError(t, err)
require.Equal(t, "my-local-resource/a/b/c", rawResName)

require.Equal(t, "/local/my-local-resource/a/b/c", BuildResourceID(tp, suffix))

tp, suffix, err = ParseResourceID("/s3/my-local-resource/a/b/c")
require.NoError(t, err)
require.Equal(t, ResourceTypeS3, tp)
require.Equal(t, "my-local-resource/a/b/c", suffix)
rawResName, err = DecodeResourceName(suffix)
require.NoError(t, err)
require.Equal(t, "my-local-resource/a/b/c", rawResName)

require.Equal(t, "/s3/my-local-resource/a/b/c", BuildResourceID(tp, suffix))
}

func TestResourceName(t *testing.T) {
name := "resource-1"
resName := EncodeResourceName(name)
parsedName, err := DecodeResourceName(resName)
require.NoError(t, err)
require.Equal(t, name, parsedName)
}

0 comments on commit 3694c50

Please sign in to comment.