diff --git a/engine/executor/config.go b/engine/executor/config.go index a6ada7f64b0..a86c4715c0f 100644 --- a/engine/executor/config.go +++ b/engine/executor/config.go @@ -24,7 +24,7 @@ import ( "github.com/BurntSushi/toml" "github.com/pingcap/log" - "github.com/pingcap/tiflow/engine/pkg/externalresource/storagecfg" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/label" "github.com/pingcap/tiflow/pkg/logutil" @@ -64,7 +64,7 @@ type Config struct { KeepAliveIntervalStr string `toml:"keepalive-interval" json:"keepalive-interval"` RPCTimeoutStr string `toml:"rpc-timeout" json:"rpc-timeout"` - Storage storagecfg.Config `toml:"storage" json:"storage"` + Storage resModel.Config `toml:"storage" json:"storage"` KeepAliveTTL time.Duration `toml:"-" json:"-"` KeepAliveInterval time.Duration `toml:"-" json:"-"` @@ -169,8 +169,9 @@ func GetDefaultExecutorConfig() *Config { KeepAliveTTLStr: defaultKeepAliveTTL, KeepAliveIntervalStr: defaultKeepAliveInterval, RPCTimeoutStr: defaultRPCTimeout, - Storage: storagecfg.Config{ - Local: storagecfg.LocalFileConfig{BaseDir: ""}, + Storage: resModel.Config{ + Local: resModel.LocalFileConfig{BaseDir: ""}, + S3: resModel.S3Config{Bucket: ""}, }, } } diff --git a/engine/framework/base_jobmaster.go b/engine/framework/base_jobmaster.go index 39a90fbed80..7166a1ff021 100644 --- a/engine/framework/base_jobmaster.go +++ b/engine/framework/base_jobmaster.go @@ -29,7 +29,7 @@ import ( "github.com/pingcap/tiflow/engine/model" dcontext "github.com/pingcap/tiflow/engine/pkg/context" "github.com/pingcap/tiflow/engine/pkg/errctx" - resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" metaModel "github.com/pingcap/tiflow/engine/pkg/meta/model" "github.com/pingcap/tiflow/engine/pkg/p2p" "github.com/pingcap/tiflow/engine/pkg/promutil" @@ -59,7 +59,7 @@ type BaseJobMaster interface { // CreateWorker requires the framework to dispatch a new worker. // If the worker needs to access certain file system resources, // their ID's must be passed by `resources`. - CreateWorker(workerType WorkerType, config WorkerConfig, cost model.RescUnit, resources ...resourcemeta.ResourceID) (frameModel.WorkerID, error) + CreateWorker(workerType WorkerType, config WorkerConfig, cost model.RescUnit, resources ...resModel.ResourceID) (frameModel.WorkerID, error) // CreateWorkerV2 is the latest version of CreateWorker, but with // a more flexible way of passing options. @@ -311,7 +311,7 @@ func (d *DefaultBaseJobMaster) CreateWorker( workerType WorkerType, config WorkerConfig, cost model.RescUnit, - resources ...resourcemeta.ResourceID, + resources ...resModel.ResourceID, ) (frameModel.WorkerID, error) { return d.master.CreateWorker(workerType, config, cost, resources...) } diff --git a/engine/framework/internal/master/worker_creator.go b/engine/framework/internal/master/worker_creator.go index 94ad56057f1..645db94afc7 100644 --- a/engine/framework/internal/master/worker_creator.go +++ b/engine/framework/internal/master/worker_creator.go @@ -22,7 +22,7 @@ import ( frameModel "github.com/pingcap/tiflow/engine/framework/model" "github.com/pingcap/tiflow/engine/model" "github.com/pingcap/tiflow/engine/pkg/client" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm" "github.com/pingcap/tiflow/engine/pkg/tenant" schedModel "github.com/pingcap/tiflow/engine/servermaster/scheduler/model" diff --git a/engine/framework/internal/master/worker_creator_test.go b/engine/framework/internal/master/worker_creator_test.go index f05d049964b..d01caf5dae1 100644 --- a/engine/framework/internal/master/worker_creator_test.go +++ b/engine/framework/internal/master/worker_creator_test.go @@ -23,7 +23,7 @@ import ( frameModel "github.com/pingcap/tiflow/engine/framework/model" "github.com/pingcap/tiflow/engine/model" "github.com/pingcap/tiflow/engine/pkg/client" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm" "github.com/pingcap/tiflow/engine/pkg/tenant" "github.com/pingcap/tiflow/pkg/label" diff --git a/engine/framework/master.go b/engine/framework/master.go index d6a3bc256e5..c4ac1be79e6 100644 --- a/engine/framework/master.go +++ b/engine/framework/master.go @@ -41,7 +41,7 @@ import ( dcontext "github.com/pingcap/tiflow/engine/pkg/context" "github.com/pingcap/tiflow/engine/pkg/deps" "github.com/pingcap/tiflow/engine/pkg/errctx" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/pingcap/tiflow/engine/pkg/meta" metaModel "github.com/pingcap/tiflow/engine/pkg/meta/model" pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm" diff --git a/engine/framework/master_test.go b/engine/framework/master_test.go index cac550b8d67..1b9f40afaf0 100644 --- a/engine/framework/master_test.go +++ b/engine/framework/master_test.go @@ -19,15 +19,14 @@ import ( "testing" "time" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "github.com/pingcap/tiflow/engine/framework/metadata" frameModel "github.com/pingcap/tiflow/engine/framework/model" "github.com/pingcap/tiflow/engine/framework/statusutil" - resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/uuid" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) const ( @@ -150,7 +149,7 @@ func TestMasterCreateWorker(t *testing.T) { masterName, workerID1, executorNodeID1, - []resourcemeta.ResourceID{"resource-1", "resource-2"}, + []resModel.ResourceID{"resource-1", "resource-2"}, // call GenEpoch three times, including create master meta, master init // refresh meta, create worker. epoch+3, diff --git a/engine/framework/mock_master_util.go b/engine/framework/mock_master_util.go index 414cea99f31..7e4afa2786a 100644 --- a/engine/framework/mock_master_util.go +++ b/engine/framework/mock_master_util.go @@ -33,7 +33,7 @@ import ( "github.com/pingcap/tiflow/engine/pkg/clock" dcontext "github.com/pingcap/tiflow/engine/pkg/context" "github.com/pingcap/tiflow/engine/pkg/deps" - resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" metaMock "github.com/pingcap/tiflow/engine/pkg/meta/mock" pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm" "github.com/pingcap/tiflow/engine/pkg/p2p" @@ -106,14 +106,14 @@ func MockBaseMasterCreateWorker( masterID frameModel.MasterID, workerID frameModel.WorkerID, executorID model.ExecutorID, - resources []resourcemeta.ResourceID, + resources []resModel.ResourceID, workerEpoch frameModel.Epoch, ) { master.uuidGen = uuid.NewMock() expectedSchedulerReq := &pb.ScheduleTaskRequest{ TaskId: workerID, Cost: int64(cost), - ResourceRequirements: resourcemeta.ToResourceRequirement(masterID, resources...), + ResourceRequirements: resModel.ToResourceRequirement(masterID, resources...), } master.serverMasterClient.(*client.MockServerMasterClient).EXPECT(). ScheduleTask(gomock.Any(), gomock.Eq(expectedSchedulerReq)). diff --git a/engine/framework/worker.go b/engine/framework/worker.go index ceafa832db3..8719350f580 100644 --- a/engine/framework/worker.go +++ b/engine/framework/worker.go @@ -36,7 +36,7 @@ import ( dcontext "github.com/pingcap/tiflow/engine/pkg/context" "github.com/pingcap/tiflow/engine/pkg/errctx" "github.com/pingcap/tiflow/engine/pkg/externalresource/broker" - resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/pingcap/tiflow/engine/pkg/meta" metaModel "github.com/pingcap/tiflow/engine/pkg/meta/model" pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm" @@ -103,7 +103,7 @@ type BaseWorker interface { SendMessage(ctx context.Context, topic p2p.Topic, message interface{}, nonblocking bool) error // OpenStorage creates a resource and return the resource handle - OpenStorage(ctx context.Context, resourcePath resourcemeta.ResourceID) (broker.Handle, error) + OpenStorage(ctx context.Context, resourcePath resModel.ResourceID) (broker.Handle, error) // Exit should be called when worker (in user logic) wants to exit. // exitReason: ExitReasonFinished/ExitReasonCanceled/ExitReasonFailed @@ -493,7 +493,7 @@ func (w *DefaultBaseWorker) SendMessage( } // OpenStorage implements BaseWorker.OpenStorage -func (w *DefaultBaseWorker) OpenStorage(ctx context.Context, resourcePath resourcemeta.ResourceID) (broker.Handle, error) { +func (w *DefaultBaseWorker) OpenStorage(ctx context.Context, resourcePath resModel.ResourceID) (broker.Handle, error) { ctx, cancel := w.errCenter.WithCancelOnFirstError(ctx) defer cancel() return w.resourceBroker.OpenStorage(ctx, w.projectInfo, w.id, w.masterID, resourcePath) diff --git a/engine/jobmaster/dm/dm_jobmaster_test.go b/engine/jobmaster/dm/dm_jobmaster_test.go index 903f423548f..33d30a3008d 100644 --- a/engine/jobmaster/dm/dm_jobmaster_test.go +++ b/engine/jobmaster/dm/dm_jobmaster_test.go @@ -52,7 +52,7 @@ import ( "github.com/pingcap/tiflow/engine/pkg/deps" dmpkg "github.com/pingcap/tiflow/engine/pkg/dm" "github.com/pingcap/tiflow/engine/pkg/externalresource/broker" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" kvmock "github.com/pingcap/tiflow/engine/pkg/meta/mock" metaModel "github.com/pingcap/tiflow/engine/pkg/meta/model" pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm" diff --git a/engine/jobmaster/dm/utils.go b/engine/jobmaster/dm/utils.go index 7d8b15619f0..b4de66a4664 100644 --- a/engine/jobmaster/dm/utils.go +++ b/engine/jobmaster/dm/utils.go @@ -14,10 +14,10 @@ package dm import ( - resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" ) // NewDMResourceID returns a ResourceID in DM's style. Currently only support local resource. -func NewDMResourceID(taskName, sourceName string) resourcemeta.ResourceID { - return "/" + string(resourcemeta.ResourceTypeLocalFile) + "/" + taskName + "/" + sourceName +func NewDMResourceID(taskName, sourceName string) resModel.ResourceID { + return "/" + string(resModel.ResourceTypeLocalFile) + "/" + taskName + "/" + sourceName } diff --git a/engine/jobmaster/dm/worker_manager.go b/engine/jobmaster/dm/worker_manager.go index c1b00208bf8..f1c8a9acc21 100644 --- a/engine/jobmaster/dm/worker_manager.go +++ b/engine/jobmaster/dm/worker_manager.go @@ -19,10 +19,6 @@ import ( "time" dmconfig "github.com/pingcap/tiflow/dm/config" - "github.com/pingcap/tiflow/engine/model" - resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" - "go.uber.org/zap" - "github.com/pingcap/tiflow/engine/framework" "github.com/pingcap/tiflow/engine/framework/logutil" frameModel "github.com/pingcap/tiflow/engine/framework/model" @@ -30,7 +26,10 @@ import ( "github.com/pingcap/tiflow/engine/jobmaster/dm/metadata" "github.com/pingcap/tiflow/engine/jobmaster/dm/runtime" "github.com/pingcap/tiflow/engine/jobmaster/dm/ticker" + "github.com/pingcap/tiflow/engine/model" dmpkg "github.com/pingcap/tiflow/engine/pkg/dm" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" + "go.uber.org/zap" ) var ( @@ -47,7 +46,7 @@ type WorkerAgent interface { workerType framework.WorkerType, config framework.WorkerConfig, cost model.RescUnit, - resources ...resourcemeta.ResourceID, + resources ...resModel.ResourceID, ) (frameModel.WorkerID, error) } @@ -249,7 +248,7 @@ func (wm *WorkerManager) checkAndScheduleWorkers(ctx context.Context, job *metad wm.logger.Info("switch to next unit", zap.String("task_id", taskID), zap.Stringer("next_unit", runningWorker.Unit)) } - var resources []resourcemeta.ResourceID + var resources []resModel.ResourceID // first worker don't need local resource. // unfresh sync unit don't need local resource.(if we need to save table checkpoint for loadTableStructureFromDump in future, we can save it before saving global checkpoint.) // TODO: storage should be created/discarded in jobmaster instead of worker. @@ -332,7 +331,7 @@ func (wm *WorkerManager) createWorker( taskID string, unit frameModel.WorkerType, taskCfg *config.TaskCfg, - resources ...resourcemeta.ResourceID, + resources ...resModel.ResourceID, ) error { wm.logger.Info("start to create worker", zap.String("task_id", taskID), zap.Stringer("unit", unit)) workerID, err := wm.workerAgent.CreateWorker(unit, taskCfg, 1, resources...) diff --git a/engine/jobmaster/dm/worker_manager_test.go b/engine/jobmaster/dm/worker_manager_test.go index a4e74fd3c37..ab8f598b9ed 100644 --- a/engine/jobmaster/dm/worker_manager_test.go +++ b/engine/jobmaster/dm/worker_manager_test.go @@ -21,18 +21,17 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" dmconfig "github.com/pingcap/tiflow/dm/config" - "github.com/pingcap/tiflow/engine/model" - resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" - "github.com/stretchr/testify/require" - "github.com/pingcap/tiflow/engine/framework" frameModel "github.com/pingcap/tiflow/engine/framework/model" "github.com/pingcap/tiflow/engine/jobmaster/dm/config" "github.com/pingcap/tiflow/engine/jobmaster/dm/metadata" "github.com/pingcap/tiflow/engine/jobmaster/dm/runtime" + "github.com/pingcap/tiflow/engine/model" dmpkg "github.com/pingcap/tiflow/engine/pkg/dm" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" kvmock "github.com/pingcap/tiflow/engine/pkg/meta/mock" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) func (t *testDMJobmasterSuite) TestUpdateWorkerStatus() { @@ -580,7 +579,7 @@ type MockWorkerAgent struct { mock.Mock } -func (mockAgent *MockWorkerAgent) CreateWorker(workerType framework.WorkerType, taskCfg interface{}, cost model.RescUnit, resources ...resourcemeta.ResourceID) (frameModel.WorkerID, error) { +func (mockAgent *MockWorkerAgent) CreateWorker(workerType framework.WorkerType, taskCfg interface{}, cost model.RescUnit, resources ...resModel.ResourceID) (frameModel.WorkerID, error) { mockAgent.Lock() defer mockAgent.Unlock() args := mockAgent.Called() diff --git a/engine/pkg/client/broker_service_client.go b/engine/pkg/client/broker_service_client.go index f5e7e04fe78..8c5a894a0f4 100644 --- a/engine/pkg/client/broker_service_client.go +++ b/engine/pkg/client/broker_service_client.go @@ -19,7 +19,7 @@ import ( "github.com/pingcap/tiflow/engine/enginepb" frameModel "github.com/pingcap/tiflow/engine/framework/model" "github.com/pingcap/tiflow/engine/pkg/client/internal" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" ) // BrokerServiceClient wraps a pb.BrokerServiceClient diff --git a/engine/pkg/externalresource/broker/broker.go b/engine/pkg/externalresource/broker/broker.go index b27a31905c6..e13a1cfa91b 100644 --- a/engine/pkg/externalresource/broker/broker.go +++ b/engine/pkg/externalresource/broker/broker.go @@ -21,8 +21,9 @@ import ( "github.com/pingcap/log" pb "github.com/pingcap/tiflow/engine/enginepb" "github.com/pingcap/tiflow/engine/pkg/client" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" - "github.com/pingcap/tiflow/engine/pkg/externalresource/storagecfg" + "github.com/pingcap/tiflow/engine/pkg/externalresource/internal" + "github.com/pingcap/tiflow/engine/pkg/externalresource/internal/local" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/pingcap/tiflow/engine/pkg/rpcerror" "github.com/pingcap/tiflow/engine/pkg/tenant" derrors "github.com/pingcap/tiflow/pkg/errors" @@ -31,35 +32,43 @@ import ( "google.golang.org/grpc/status" ) -// ResourceManagerClient is a type alias for a client connecting to -// the resource manager (which is part of the Servermaster). -type ResourceManagerClient = client.ResourceManagerClient +// DefaultBroker must implement Broker. +var _ Broker = (*DefaultBroker)(nil) // DefaultBroker implements the Broker interface type DefaultBroker struct { - config *storagecfg.Config + config *resModel.Config executorID resModel.ExecutorID - client ResourceManagerClient + client client.ResourceManagerClient - fileManager FileManager + fileManagers map[resModel.ResourceType]internal.FileManager } // NewBroker creates a new Impl instance func NewBroker( - config *storagecfg.Config, + config *resModel.Config, executorID resModel.ExecutorID, - client ResourceManagerClient, + client client.ResourceManagerClient, ) *DefaultBroker { log.Info("Create new resource broker", zap.String("executor-id", string(executorID)), zap.Any("config", config)) - fm := NewLocalFileManager(config.Local) + fileManagers := make(map[resModel.ResourceType]internal.FileManager) + fileManagers[resModel.ResourceTypeLocalFile] = local.NewLocalFileManager(executorID, config.Local) + if config.S3Enabled() { + log.Info("S3 is enabled") + // TODO:add s3 file manager + // fileManagers[resModel.ResourceTypeS3] = s3.NewFileManagerWithConfig(executorID, config.S3) + } else { + log.Info("S3 config is not complete, will not use s3 as external storage") + } + return &DefaultBroker{ - config: config, - executorID: executorID, - client: client, - fileManager: fm, + config: config, + executorID: executorID, + client: client, + fileManagers: fileManagers, } } @@ -69,45 +78,91 @@ func (b *DefaultBroker) OpenStorage( projectInfo tenant.ProjectInfo, workerID resModel.WorkerID, jobID resModel.JobID, - resourcePath resModel.ResourceID, + resID resModel.ResourceID, ) (Handle, error) { - tp, _, err := resModel.ParseResourcePath(resourcePath) + // Note the semantics of ParseResourcePath: + // If resourceID is `/local/my-resource`, then tp == resModel.ResourceTypeLocalFile + // and resName == "my-resource". + tp, resName, err := resModel.PasreResourceID(resID) if err != nil { return nil, err } - switch tp { - case resModel.ResourceTypeLocalFile: - return b.newHandleForLocalFile(ctx, projectInfo, jobID, workerID, resourcePath) - case resModel.ResourceTypeS3: - log.Panic("resource type s3 is not supported for now") - default: - log.Panic("unsupported resource type", zap.String("resource-path", resourcePath)) + fm, ok := b.fileManagers[tp] + if !ok { + log.Panic("unexpected resource type", zap.String("type", string(tp))) + } + + record, exists, err := b.checkForExistingResource(ctx, + resModel.ResourceKey{JobID: jobID, ID: resID}) + if err != nil { + return nil, err + } + + var desc internal.ResourceDescriptor + if !exists { + desc, err = b.createResource(ctx, fm, projectInfo, workerID, resName) + } else { + desc, err = b.getPersistResource(ctx, fm, record, resName) + } + if err != nil { + return nil, err + } + + log.Info(fmt.Sprintf("Using %s storage with path", string(tp)), + zap.String("path", desc.URI())) + return newResourceHandle(jobID, b.executorID, fm, desc, exists, b.client) +} + +func (b *DefaultBroker) createResource( + ctx context.Context, fm internal.FileManager, + projectInfo tenant.ProjectInfo, workerID resModel.WorkerID, + resName resModel.ResourceName, +) (internal.ResourceDescriptor, error) { + ident := internal.ResourceIdent{ + Name: resName, + ResourceScope: internal.ResourceScope{ + ProjectInfo: projectInfo, + Executor: b.executorID, /* executor id where resource is created */ + WorkerID: workerID, /* creator id*/ + }, + } + desc, err := fm.CreateResource(ctx, ident) + if err != nil { + //nolint:errcheck + _ = fm.RemoveResource(ctx, ident) + return nil, err } - return nil, errors.New("unreachable") + return desc, nil } // OnWorkerClosed implements Broker.OnWorkerClosed func (b *DefaultBroker) OnWorkerClosed(ctx context.Context, workerID resModel.WorkerID, jobID resModel.JobID) { - err := b.fileManager.RemoveTemporaryFiles(workerID) - if err != nil { - // TODO when we have a cloud-based error collection service, we need - // to report this. - // However, since an error here is unlikely to indicate a correctness - // problem, we do not take further actions. - log.Warn("Failed to remove temporary files for worker", - zap.String("worker-id", workerID), - zap.String("job-id", jobID), - zap.Error(err)) + scope := internal.ResourceScope{ + Executor: b.executorID, + WorkerID: workerID, + } + for _, fm := range b.fileManagers { + err := fm.RemoveTemporaryFiles(ctx, scope) + if err != nil { + // TODO when we have a cloud-based error collection service, we need + // to report this. + // However, since an error here is unlikely to indicate a correctness + // problem, we do not take further actions. + log.Warn("Failed to remove temporary files for worker", + zap.String("worker-id", workerID), + zap.String("job-id", jobID), + zap.Error(err)) + } } } // RemoveResource implements pb.BrokerServiceServer. func (b *DefaultBroker) RemoveResource( - _ context.Context, + ctx context.Context, request *pb.RemoveLocalResourceRequest, ) (*pb.RemoveLocalResourceResponse, error) { - tp, resName, err := resModel.ParseResourcePath(request.GetResourceId()) + tp, resName, err := resModel.PasreResourceID(request.GetResourceId()) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -117,12 +172,19 @@ func (b *DefaultBroker) RemoveResource( fmt.Sprintf("unexpected resource type %s", tp)) } + fm := b.fileManagers[tp] if request.GetCreatorId() == "" { - return nil, status.Error(codes.InvalidArgument, - fmt.Sprintf("empty creatorID")) + return nil, status.Error(codes.InvalidArgument, "empty creatorID") } - err = b.fileManager.RemoveResource(request.GetCreatorId(), resName) + ident := internal.ResourceIdent{ + Name: resName, + ResourceScope: internal.ResourceScope{ + Executor: b.executorID, + WorkerID: request.GetCreatorId(), + }, + } + err = fm.RemoveResource(ctx, ident) if err != nil { if derrors.ErrResourceDoesNotExist.Equal(err) { return nil, status.Error(codes.NotFound, err.Error()) @@ -133,55 +195,6 @@ func (b *DefaultBroker) RemoveResource( return &pb.RemoveLocalResourceResponse{}, nil } -func (b *DefaultBroker) newHandleForLocalFile( - ctx context.Context, - projectInfo tenant.ProjectInfo, - jobID resModel.JobID, - workerID resModel.WorkerID, - resourceID resModel.ResourceID, -) (hdl Handle, retErr error) { - // Note the semantics of ParseResourcePath: - // If resourceID is `/local/my-resource`, then tp == resModel.ResourceTypeLocalFile - // and resName == "my-resource". - tp, resName, err := resModel.ParseResourcePath(resourceID) - if err != nil { - return nil, err - } - if tp != resModel.ResourceTypeLocalFile { - log.Panic("unexpected resource type", zap.String("type", string(tp))) - } - - record, exists, err := b.checkForExistingResource(ctx, resModel.ResourceKey{JobID: jobID, ID: resourceID}) - if err != nil { - return nil, err - } - - var desc *LocalFileResourceDescriptor - - if !exists { - desc, err = b.fileManager.CreateResource(workerID, resName) - if err != nil { - return nil, err - } - defer func() { - if retErr != nil { - //nolint:errcheck - _ = b.fileManager.RemoveResource(workerID, resName) - } - }() - } else { - desc, err = b.fileManager.GetPersistedResource(record.Worker, resName) - if err != nil { - return nil, err - } - } - - filePath := desc.AbsolutePath() - log.Info("Using local storage with path", zap.String("path", filePath)) - - return newLocalResourceHandle(projectInfo, resourceID, jobID, b.executorID, b.fileManager, desc, b.client) -} - func (b *DefaultBroker) checkForExistingResource( ctx context.Context, resourceKey resModel.ResourceKey, @@ -217,3 +230,24 @@ func (b *DefaultBroker) checkForExistingResource( return nil, false, errors.Trace(err) } } + +func (b *DefaultBroker) getPersistResource( + ctx context.Context, fm internal.FileManager, + record *resModel.ResourceMeta, + resName resModel.ResourceName, +) (internal.ResourceDescriptor, error) { + ident := internal.ResourceIdent{ + Name: resName, + ResourceScope: internal.ResourceScope{ + ProjectInfo: tenant.NewProjectInfo("", record.ProjectID), + Executor: record.Executor, /* executor id where the resource is persisted */ + WorkerID: record.Worker, /* creator id*/ + }, + } + return fm.GetPersistedResource(ctx, ident) +} + +// PreCheckConfig does a preflight check on the executor's storage configurations. +func PreCheckConfig(config resModel.Config) error { + return local.PreCheckConfig(config) +} diff --git a/engine/pkg/externalresource/broker/broker_test.go b/engine/pkg/externalresource/broker/broker_test.go index 3432993ad9c..6c3d5101ccc 100644 --- a/engine/pkg/externalresource/broker/broker_test.go +++ b/engine/pkg/externalresource/broker/broker_test.go @@ -20,8 +20,9 @@ import ( "testing" pb "github.com/pingcap/tiflow/engine/enginepb" + "github.com/pingcap/tiflow/engine/pkg/externalresource/internal/local" "github.com/pingcap/tiflow/engine/pkg/externalresource/manager" - "github.com/pingcap/tiflow/engine/pkg/externalresource/storagecfg" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/pingcap/tiflow/engine/pkg/tenant" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -29,13 +30,10 @@ import ( "google.golang.org/grpc/status" ) -// DefaultBroker must implement Broker. -var _ Broker = (*DefaultBroker)(nil) - func newBroker(t *testing.T) (*DefaultBroker, *manager.MockClient, string) { tmpDir := t.TempDir() cli := manager.NewMockClient() - broker := NewBroker(&storagecfg.Config{Local: storagecfg.LocalFileConfig{BaseDir: tmpDir}}, + broker := NewBroker(&resModel.Config{Local: resModel.LocalFileConfig{BaseDir: tmpDir}}, "executor-1", cli) return broker, cli, tmpDir @@ -74,7 +72,7 @@ func TestBrokerOpenNewStorage(t *testing.T) { cli.AssertExpectations(t) - AssertLocalFileExists(t, dir, "worker-1", "test-1", "1.txt") + local.AssertLocalFileExists(t, dir, "worker-1", "test-1", "1.txt") } func TestBrokerOpenExistingStorage(t *testing.T) { @@ -123,13 +121,13 @@ func TestBrokerOpenExistingStorage(t *testing.T) { err = f.Close(context.Background()) require.NoError(t, err) - AssertLocalFileExists(t, dir, "worker-2", "test-2", "1.txt") + local.AssertLocalFileExists(t, dir, "worker-2", "test-2", "1.txt") } func TestBrokerRemoveResource(t *testing.T) { brk, _, dir := newBroker(t) - resPath := filepath.Join(dir, "worker-1", resourceNameToFilePathName("resource-1")) + resPath := filepath.Join(dir, "worker-1", local.ResourceNameToFilePathName("resource-1")) err := os.MkdirAll(resPath, 0o700) require.NoError(t, err) diff --git a/engine/pkg/externalresource/broker/interfaces.go b/engine/pkg/externalresource/broker/interfaces.go index 8b9cb37f502..0939d5601e9 100644 --- a/engine/pkg/externalresource/broker/interfaces.go +++ b/engine/pkg/externalresource/broker/interfaces.go @@ -17,8 +17,7 @@ import ( "context" pb "github.com/pingcap/tiflow/engine/enginepb" - frameModel "github.com/pingcap/tiflow/engine/framework/model" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/pingcap/tiflow/engine/pkg/tenant" ) @@ -36,7 +35,7 @@ type Broker interface { resourcePath resModel.ResourceID, ) (Handle, error) - // OnWorkerClosed in called when a worker is closing. + // OnWorkerClosed is called when a worker is closing. // The implementation should do necessary garbage collection // for the worker, especially local temporary files. OnWorkerClosed( @@ -45,29 +44,3 @@ type Broker interface { jobID resModel.JobID, ) } - -// FileManager abstracts the operations on local resources that -// a Broker needs to perform. -type FileManager interface { - CreateResource( - creator frameModel.WorkerID, - resName resModel.ResourceName, - ) (*LocalFileResourceDescriptor, error) - - GetPersistedResource( - creator frameModel.WorkerID, - resName resModel.ResourceName, - ) (*LocalFileResourceDescriptor, error) - - RemoveTemporaryFiles(creator frameModel.WorkerID) error - - RemoveResource( - creator frameModel.WorkerID, - resName resModel.ResourceName, - ) error - - SetPersisted( - creator frameModel.WorkerID, - resName resModel.ResourceName, - ) -} diff --git a/engine/pkg/externalresource/broker/local_broker.go b/engine/pkg/externalresource/broker/local_broker.go index 189df59cf15..1ba60e176d6 100644 --- a/engine/pkg/externalresource/broker/local_broker.go +++ b/engine/pkg/externalresource/broker/local_broker.go @@ -21,18 +21,19 @@ import ( "testing" "github.com/pingcap/log" + pb "github.com/pingcap/tiflow/engine/enginepb" + "github.com/pingcap/tiflow/engine/pkg/externalresource/internal/local" + "github.com/pingcap/tiflow/engine/pkg/externalresource/manager" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" + "github.com/pingcap/tiflow/engine/pkg/tenant" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - - pb "github.com/pingcap/tiflow/engine/enginepb" - "github.com/pingcap/tiflow/engine/pkg/externalresource/manager" - resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" - "github.com/pingcap/tiflow/engine/pkg/externalresource/storagecfg" - "github.com/pingcap/tiflow/engine/pkg/tenant" ) +var _ Broker = (*LocalBroker)(nil) + // LocalBroker is a broker unit-testing other components // that depend on a Broker. type LocalBroker struct { @@ -42,16 +43,16 @@ type LocalBroker struct { client *manager.MockClient mu sync.Mutex - persistedList []resourcemeta.ResourceID + persistedList []resModel.ResourceID } // NewBrokerForTesting creates a LocalBroker instance for testing only -func NewBrokerForTesting(executorID resourcemeta.ExecutorID) *LocalBroker { +func NewBrokerForTesting(executorID resModel.ExecutorID) *LocalBroker { dir, err := os.MkdirTemp("/tmp", "*-localfiles") if err != nil { log.Panic("failed to make tempdir") } - cfg := &storagecfg.Config{Local: storagecfg.LocalFileConfig{BaseDir: dir}} + cfg := &resModel.Config{Local: resModel.LocalFileConfig{BaseDir: dir}} client := manager.NewMockClient() return &LocalBroker{ DefaultBroker: NewBroker(cfg, executorID, client), @@ -63,9 +64,9 @@ func NewBrokerForTesting(executorID resourcemeta.ExecutorID) *LocalBroker { func (b *LocalBroker) OpenStorage( ctx context.Context, projectInfo tenant.ProjectInfo, - workerID resourcemeta.WorkerID, - jobID resourcemeta.JobID, - resourcePath resourcemeta.ResourceID, + workerID resModel.WorkerID, + jobID resModel.JobID, + resourcePath resModel.ResourceID, ) (Handle, error) { b.clientMu.Lock() defer b.clientMu.Unlock() @@ -87,14 +88,14 @@ func (b *LocalBroker) OpenStorage( } // AssertPersisted checks resource is in persisted list -func (b *LocalBroker) AssertPersisted(t *testing.T, id resourcemeta.ResourceID) { +func (b *LocalBroker) AssertPersisted(t *testing.T, id resModel.ResourceID) { b.mu.Lock() defer b.mu.Unlock() require.Contains(t, b.persistedList, id) } -func (b *LocalBroker) appendPersistRecord(id resourcemeta.ResourceID) { +func (b *LocalBroker) appendPersistRecord(id resModel.ResourceID) { b.mu.Lock() defer b.mu.Unlock() @@ -104,12 +105,12 @@ func (b *LocalBroker) appendPersistRecord(id resourcemeta.ResourceID) { // AssertFileExists checks lock file exists func (b *LocalBroker) AssertFileExists( t *testing.T, - workerID resourcemeta.WorkerID, - resourceID resourcemeta.ResourceID, + workerID resModel.WorkerID, + resourceID resModel.ResourceID, fileName string, ) { suffix := strings.TrimPrefix(resourceID, "/local/") - AssertLocalFileExists(t, b.config.Local.BaseDir, workerID, suffix, fileName) + local.AssertLocalFileExists(t, b.config.Local.BaseDir, workerID, suffix, fileName) } type brExternalStorageHandleForTesting struct { diff --git a/engine/pkg/externalresource/broker/local_resource_desc.go b/engine/pkg/externalresource/broker/local_resource_desc.go deleted file mode 100644 index 509bb36fcd8..00000000000 --- a/engine/pkg/externalresource/broker/local_resource_desc.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package broker - -import ( - "path/filepath" - - frameModel "github.com/pingcap/tiflow/engine/framework/model" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" -) - -// LocalFileResourceDescriptor contains necessary data -// to access a local file resource. -type LocalFileResourceDescriptor struct { - BasePath string - Creator frameModel.WorkerID - ResourceName resModel.ResourceName -} - -// AbsolutePath returns the absolute path of the given resource -// in the local file system. -func (d *LocalFileResourceDescriptor) AbsolutePath() string { - encodedName := resourceNameToFilePathName(d.ResourceName) - return filepath.Join(d.BasePath, d.Creator, encodedName) -} diff --git a/engine/pkg/externalresource/broker/storage_handle.go b/engine/pkg/externalresource/broker/storage_handle.go index 4aecf68bac8..70a3815a19d 100644 --- a/engine/pkg/externalresource/broker/storage_handle.go +++ b/engine/pkg/externalresource/broker/storage_handle.go @@ -17,13 +17,15 @@ import ( "context" "github.com/pingcap/errors" + "github.com/pingcap/log" brStorage "github.com/pingcap/tidb/br/pkg/storage" pb "github.com/pingcap/tiflow/engine/enginepb" "github.com/pingcap/tiflow/engine/pkg/client" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" - "github.com/pingcap/tiflow/engine/pkg/tenant" + "github.com/pingcap/tiflow/engine/pkg/externalresource/internal" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" derrors "github.com/pingcap/tiflow/pkg/errors" "go.uber.org/atomic" + "go.uber.org/zap" ) // Handle defines an interface for interact with framework @@ -34,20 +36,17 @@ type Handle interface { Discard(ctx context.Context) error } -// LocalResourceHandle contains a brStorage.ExternalStorage. +// ResourceHandle contains a brStorage.ExternalStorage. // It helps Dataflow Engine reuse the external storage facilities // implemented in Br. -type LocalResourceHandle struct { - projectInfo tenant.ProjectInfo - id resModel.ResourceID - jobID resModel.JobID - executorID resModel.ExecutorID - desc *LocalFileResourceDescriptor +type ResourceHandle struct { + executorID resModel.ExecutorID + jobID resModel.JobID + client client.ResourceManagerClient - inner brStorage.ExternalStorage - client client.ResourceManagerClient - - fileManager FileManager + fileManager internal.FileManager + desc internal.ResourceDescriptor + inner brStorage.ExternalStorage // isPersisted should be set to true if the // resource has been registered with the servermaster. @@ -55,57 +54,79 @@ type LocalResourceHandle struct { isInvalid atomic.Bool } -func newLocalResourceHandle( - projectInfo tenant.ProjectInfo, - resourceID resModel.ResourceID, +func newResourceHandle( jobID resModel.JobID, executorID resModel.ExecutorID, - fm FileManager, - desc *LocalFileResourceDescriptor, - client ResourceManagerClient, -) (*LocalResourceHandle, error) { - ls, err := newBrStorageForLocalFile(desc.AbsolutePath()) + fm internal.FileManager, + desc internal.ResourceDescriptor, + isPersisted bool, + client client.ResourceManagerClient, +) (*ResourceHandle, error) { + // Use context.Background here since we only support local storage for now. + inner, err := desc.ExternalStorage(context.Background()) if err != nil { return nil, err } - return &LocalResourceHandle{ - projectInfo: projectInfo, - id: resourceID, - jobID: jobID, - executorID: executorID, - - inner: ls, - client: client, - desc: desc, + h := &ResourceHandle{ + executorID: executorID, + jobID: jobID, + client: client, + desc: desc, fileManager: fm, - }, nil + inner: inner, + } + + if isPersisted { + h.isPersisted.Store(true) + } + return h, nil } // ID implements Handle.ID -func (h *LocalResourceHandle) ID() resModel.ResourceID { - return h.id +func (h *ResourceHandle) ID() resModel.ResourceID { + return h.desc.ID() } // BrExternalStorage implements Handle.BrExternalStorage -func (h *LocalResourceHandle) BrExternalStorage() brStorage.ExternalStorage { +func (h *ResourceHandle) BrExternalStorage() brStorage.ExternalStorage { return h.inner } // Persist implements Handle.Persist -func (h *LocalResourceHandle) Persist(ctx context.Context) error { +func (h *ResourceHandle) Persist(ctx context.Context) error { if h.isInvalid.Load() { // Trying to persist invalid resource. return derrors.ErrInvalidResourceHandle.FastGenByArgs() } + creatorExecutor := h.desc.ResourceIdent().Executor + if h.isPersisted.Load() { + log.Warn("Trying to persist a resource that has already been persisted", + zap.Any("resourceDesc", h.desc), + zap.Any("creatorExecutor", string(creatorExecutor)), + zap.String("currentExecutor", string(h.executorID))) + return nil + } + + if creatorExecutor != h.executorID { + // in this case, the resource should have been persisted by the creator. + log.Panic("Trying to persist a resource that is not created by current executor", + zap.Any("resourceDesc", h.desc), + zap.Any("creator", string(creatorExecutor)), + zap.String("currentExecutor", string(h.executorID))) + } + err := h.client.CreateResource(ctx, &pb.CreateResourceRequest{ - ProjectInfo: &pb.ProjectInfo{TenantId: h.projectInfo.TenantID(), ProjectId: h.projectInfo.ProjectID()}, - ResourceId: h.id, + ProjectInfo: &pb.ProjectInfo{ + TenantId: h.desc.ResourceIdent().TenantID(), + ProjectId: h.desc.ResourceIdent().ProjectID(), + }, + ResourceId: h.desc.ID(), CreatorExecutor: string(h.executorID), JobId: h.jobID, - CreatorWorkerId: h.desc.Creator, + CreatorWorkerId: h.desc.ResourceIdent().WorkerID, }) if err != nil { // The RPC could have succeeded on server's side. @@ -115,20 +136,24 @@ func (h *LocalResourceHandle) Persist(ctx context.Context) error { // TODO proper retrying. return errors.Trace(err) } - // We only support local file resources, so fileManager is never nil. - h.fileManager.SetPersisted(h.desc.Creator, h.desc.ResourceName) + err = h.fileManager.SetPersisted(ctx, h.desc.ResourceIdent()) + if err != nil { + return errors.Trace(err) + } h.isPersisted.Store(true) return nil } // Discard implements Handle.Discard -func (h *LocalResourceHandle) Discard(ctx context.Context) error { +// Note that the current design does not allow multiple workers to hold +// persistent resources simultaneously. +func (h *ResourceHandle) Discard(ctx context.Context) error { if h.isInvalid.Load() { // Trying to discard invalid resource. return derrors.ErrInvalidResourceHandle.FastGenByArgs() } - err := h.fileManager.RemoveResource(h.desc.Creator, h.desc.ResourceName) + err := h.fileManager.RemoveResource(ctx, h.desc.ResourceIdent()) if err != nil { return err } @@ -137,7 +162,7 @@ func (h *LocalResourceHandle) Discard(ctx context.Context) error { err := h.client.RemoveResource(ctx, &pb.RemoveResourceRequest{ ResourceKey: &pb.ResourceKey{ JobId: h.jobID, - ResourceId: h.id, + ResourceId: h.desc.ID(), }, }) if err != nil { diff --git a/engine/pkg/externalresource/broker/storage_handle_test.go b/engine/pkg/externalresource/broker/storage_handle_test.go index c39af0dbe0e..52b623effc4 100644 --- a/engine/pkg/externalresource/broker/storage_handle_test.go +++ b/engine/pkg/externalresource/broker/storage_handle_test.go @@ -18,34 +18,52 @@ import ( "testing" pb "github.com/pingcap/tiflow/engine/enginepb" + "github.com/pingcap/tiflow/engine/pkg/externalresource/internal" + "github.com/pingcap/tiflow/engine/pkg/externalresource/internal/local" "github.com/pingcap/tiflow/engine/pkg/externalresource/manager" - "github.com/pingcap/tiflow/engine/pkg/externalresource/storagecfg" + "github.com/pingcap/tiflow/engine/pkg/externalresource/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/pingcap/tiflow/engine/pkg/tenant" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) +func newResourceIdentForTesting(executor, workerID, resourceName string) internal.ResourceIdent { + return internal.ResourceIdent{ + Name: resourceName, + ResourceScope: internal.ResourceScope{ + ProjectInfo: tenant.NewProjectInfo("fakeTenant", "fakeProject"), + Executor: resModel.ExecutorID(executor), + WorkerID: workerID, + }, + } +} + func TestStorageHandlePersistAndDiscard(t *testing.T) { - fakeProjectInfo := tenant.NewProjectInfo("fakeTenant", "fakeProject") dir := t.TempDir() - fm := NewLocalFileManager(storagecfg.LocalFileConfig{BaseDir: dir}) + + executor := model.ExecutorID("executor-1") + ident := newResourceIdentForTesting(string(executor), "worker-1", "test-resource") + fm := local.NewLocalFileManager(executor, resModel.LocalFileConfig{BaseDir: dir}) cli := manager.NewMockClient() - desc, err := fm.CreateResource("worker-1", "test-resource") + ctx := context.Background() + desc, err := fm.CreateResource(ctx, ident) require.NoError(t, err) - handle, err := newLocalResourceHandle( - fakeProjectInfo, - "/local/test-resource", + handle, err := newResourceHandle( "job-1", - "executor-1", - fm, desc, cli) + executor, + fm, desc, false, cli) require.NoError(t, err) cli.On("CreateResource", mock.Anything, &pb.CreateResourceRequest{ - ProjectInfo: &pb.ProjectInfo{TenantId: fakeProjectInfo.TenantID(), ProjectId: fakeProjectInfo.ProjectID()}, + ProjectInfo: &pb.ProjectInfo{ + TenantId: desc.ResourceIdent().TenantID(), + ProjectId: desc.ResourceIdent().ProjectID(), + }, ResourceId: "/local/test-resource", - CreatorExecutor: "executor-1", + CreatorExecutor: string(executor), JobId: "job-1", CreatorWorkerId: "worker-1", }).Return(nil).Once() @@ -54,7 +72,10 @@ func TestStorageHandlePersistAndDiscard(t *testing.T) { cli.AssertExpectations(t) cli.ExpectedCalls = nil - desc, err = fm.GetPersistedResource("worker-1", "test-resource") + err = handle.Persist(context.Background()) + require.NoError(t, err) + + desc, err = fm.GetPersistedResource(ctx, ident) require.NoError(t, err) require.NotNil(t, desc) @@ -69,7 +90,7 @@ func TestStorageHandlePersistAndDiscard(t *testing.T) { cli.AssertExpectations(t) cli.ExpectedCalls = nil - _, err = fm.GetPersistedResource("worker-1", "test-resource") + _, err = fm.GetPersistedResource(ctx, ident) require.Error(t, err) require.Regexp(t, ".*ErrResourceDoesNotExist.*", err) @@ -85,20 +106,18 @@ func TestStorageHandlePersistAndDiscard(t *testing.T) { } func TestStorageHandleDiscardTemporaryResource(t *testing.T) { - fakeProjectInfo := tenant.NewProjectInfo("fakeTenant", "fakeProject") dir := t.TempDir() - fm := NewLocalFileManager(storagecfg.LocalFileConfig{BaseDir: dir}) + fm := local.NewLocalFileManager("", resModel.LocalFileConfig{BaseDir: dir}) cli := manager.NewMockClient() - desc, err := fm.CreateResource("worker-1", "test-resource") + ctx := context.Background() + desc, err := fm.CreateResource(ctx, newResourceIdentForTesting("", "worker-1", "test-resource")) require.NoError(t, err) - handle, err := newLocalResourceHandle( - fakeProjectInfo, - "/local/test-resource", + handle, err := newResourceHandle( "job-1", "executor-1", - fm, desc, cli) + fm, desc, false, cli) require.NoError(t, err) err = handle.Discard(context.Background()) @@ -106,7 +125,7 @@ func TestStorageHandleDiscardTemporaryResource(t *testing.T) { cli.AssertNotCalled(t, "RemoveResource") cli.ExpectedCalls = nil - _, err = fm.GetPersistedResource("worker-1", "test-resource") + _, err = fm.GetPersistedResource(ctx, newResourceIdentForTesting("", "worker-1", "test-resource")) require.Error(t, err) require.Regexp(t, ".*ErrResourceDoesNotExist.*", err) diff --git a/engine/pkg/externalresource/integration_test/cluster_stub.go b/engine/pkg/externalresource/integration_test/cluster_stub.go index 220e6620dcc..45a86a439e0 100644 --- a/engine/pkg/externalresource/integration_test/cluster_stub.go +++ b/engine/pkg/externalresource/integration_test/cluster_stub.go @@ -21,7 +21,7 @@ import ( "github.com/pingcap/tiflow/engine/pkg/client" "github.com/pingcap/tiflow/engine/pkg/externalresource/broker" "github.com/pingcap/tiflow/engine/pkg/externalresource/manager" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/pingcap/tiflow/engine/pkg/rpcerror" ) diff --git a/engine/pkg/externalresource/integration_test/gc_test.go b/engine/pkg/externalresource/integration_test/gc_test.go index 024867daf18..4b60221927b 100644 --- a/engine/pkg/externalresource/integration_test/gc_test.go +++ b/engine/pkg/externalresource/integration_test/gc_test.go @@ -24,8 +24,8 @@ import ( frameModel "github.com/pingcap/tiflow/engine/framework/model" "github.com/pingcap/tiflow/engine/model" - "github.com/pingcap/tiflow/engine/pkg/externalresource/broker" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + "github.com/pingcap/tiflow/engine/pkg/externalresource/internal/local" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm" "github.com/pingcap/tiflow/engine/pkg/tenant" ) @@ -60,7 +60,7 @@ func TestLocalFileTriggeredByJobRemoval(t *testing.T) { resMeta, err := cluster.meta.GetResourceByID(ctx, pkgOrm.ResourceKey{JobID: "job-1", ID: "/local/resource-1"}) require.NoError(t, err) require.Equal(t, model.ExecutorID("executor-1"), resMeta.Executor) - broker.AssertLocalFileExists(t, baseDir, "worker-1", "resource-1", "1.txt") + local.AssertLocalFileExists(t, baseDir, "worker-1", "resource-1", "1.txt") // Triggers GC by removing the job cluster.jobInfo.RemoveJob("job-1") @@ -69,7 +69,7 @@ func TestLocalFileTriggeredByJobRemoval(t *testing.T) { log.Warn("GetResourceByID", zap.Error(err)) return err != nil && pkgOrm.IsNotFoundError(err) }, 1*time.Second, 5*time.Millisecond) - broker.AssertNoLocalFileExists(t, baseDir, "worker-1", "resource-1", "1.txt") + local.AssertNoLocalFileExists(t, baseDir, "worker-1", "resource-1", "1.txt") cluster.Stop() } @@ -138,7 +138,7 @@ func TestCleanUpStaleResourcesOnStartUp(t *testing.T) { _, err := cluster.meta.GetResourceByID(ctx, pkgOrm.ResourceKey{JobID: "job-1", ID: "/local/resource-2"}) return err != nil && pkgOrm.IsNotFoundError(err) }, 1*time.Second, 5*time.Millisecond) - broker.AssertNoLocalFileExists(t, baseDir, "worker-1", "resource-1", "1.txt") + local.AssertNoLocalFileExists(t, baseDir, "worker-1", "resource-1", "1.txt") cluster.Stop() } diff --git a/engine/pkg/externalresource/integration_test/mock_cluster.go b/engine/pkg/externalresource/integration_test/mock_cluster.go index 35513706101..dbf77c9ff87 100644 --- a/engine/pkg/externalresource/integration_test/mock_cluster.go +++ b/engine/pkg/externalresource/integration_test/mock_cluster.go @@ -24,9 +24,7 @@ import ( "github.com/pingcap/tiflow/engine/pkg/client" "github.com/pingcap/tiflow/engine/pkg/externalresource/broker" "github.com/pingcap/tiflow/engine/pkg/externalresource/manager" - resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" - "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcetypes" - "github.com/pingcap/tiflow/engine/pkg/externalresource/storagecfg" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm" "github.com/pingcap/tiflow/engine/pkg/rpcutil" "github.com/stretchr/testify/require" @@ -72,10 +70,7 @@ func newMockGCCluster() *mockCluster { &rate.Limiter{}, nil)) executorGroup := client.NewMockExecutorGroup() - resourceTp := resourcetypes.NewLocalFileResourceType(executorGroup) - gcRunner := manager.NewGCRunner(meta, map[resourcemeta.ResourceType]manager.GCHandlerFunc{ - "local": resourceTp.GCHandler(), - }) + gcRunner := manager.NewGCRunner(meta, executorGroup) gcCoordinator := manager.NewGCCoordinator(executorInfo, jobInfo, meta, gcRunner) return &mockCluster{ @@ -121,7 +116,7 @@ func (c *mockCluster) Stop() { } func (c *mockCluster) AddBroker(id model.ExecutorID, baseDir string) { - config := &storagecfg.Config{Local: storagecfg.LocalFileConfig{BaseDir: baseDir}} + config := &resModel.Config{Local: resModel.LocalFileConfig{BaseDir: baseDir}} cli := &resourceClientStub{service: c.service} brk := broker.NewBroker(config, id, cli) diff --git a/engine/pkg/externalresource/internal/errors.go b/engine/pkg/externalresource/internal/errors.go index 65b8402360c..579a59113f1 100644 --- a/engine/pkg/externalresource/internal/errors.go +++ b/engine/pkg/externalresource/internal/errors.go @@ -15,7 +15,7 @@ package internal import ( "github.com/pingcap/tiflow/engine/model" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/pingcap/tiflow/engine/pkg/rpcerror" ) @@ -64,3 +64,15 @@ type InvalidArgumentError struct { // ErrInvalidArgument indicates that a resource-related request has an invalid argument. var ErrInvalidArgument = rpcerror.Normalize[InvalidArgumentError]() + +// ResourceFilesNotFoundError provides details of ErrResourceFilesNotFound +type ResourceFilesNotFoundError struct { + rpcerror.Error[rpcerror.NotRetryable, rpcerror.NotFound] + + Ident ResourceIdent + Details string +} + +// ErrResourceFilesNotFound indicates that the required resource is not found +// in the underlying storage (s3, for example). +var ErrResourceFilesNotFound = rpcerror.Normalize[ResourceFilesNotFoundError]() diff --git a/engine/pkg/externalresource/internal/file_manager.go b/engine/pkg/externalresource/internal/file_manager.go new file mode 100644 index 00000000000..f6ae32537b6 --- /dev/null +++ b/engine/pkg/externalresource/internal/file_manager.go @@ -0,0 +1,72 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + + frameModel "github.com/pingcap/tiflow/engine/framework/model" + "github.com/pingcap/tiflow/engine/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" + "github.com/pingcap/tiflow/engine/pkg/tenant" +) + +// ResourceScope represents the environment in which the resource +// has been or is to be created. +type ResourceScope struct { + tenant.ProjectInfo + + // Executor denotes the executor on which the resource is created. + Executor model.ExecutorID + + // WorkerID denotes the worker that is creating or has created the resource. + // Note that it is NOT necessarily that consumes or depends on the resource. + WorkerID frameModel.WorkerID +} + +// ResourceIdent provides information for the file manager to +// uniquely determine where and how the resource is stored. +type ResourceIdent struct { + ResourceScope + + // Name is the custom part of the resourceID. + // For example, the resource name of `/local/resource-1` is `resource-1`. + Name resModel.ResourceName +} + +// Scope returns the Scope of the ResourceIdent. +func (i ResourceIdent) Scope() ResourceScope { + return i.ResourceScope +} + +// FileManager abstracts the operations on the underlying storage. +type FileManager interface { + // CreateResource creates a new resource. + CreateResource(ctx context.Context, ident ResourceIdent) (ResourceDescriptor, error) + + // GetPersistedResource returns the descriptor of an already persisted resource. + GetPersistedResource(ctx context.Context, ident ResourceIdent) (ResourceDescriptor, error) + + // RemoveTemporaryFiles cleans up all un-persisted resource files under the scope. + RemoveTemporaryFiles(ctx context.Context, scope ResourceScope) error + + // RemoveResource removes a resource's files. + RemoveResource(ctx context.Context, ident ResourceIdent) error + + // SetPersisted sets a resource as persisted. + SetPersisted(ctx context.Context, ident ResourceIdent) error + + // Close shuts down the FileManager. + Close() +} diff --git a/engine/pkg/externalresource/broker/file_manager.go b/engine/pkg/externalresource/internal/local/file_manager.go similarity index 76% rename from engine/pkg/externalresource/broker/file_manager.go rename to engine/pkg/externalresource/internal/local/file_manager.go index 1758c5345a9..57c49918fa5 100644 --- a/engine/pkg/externalresource/broker/file_manager.go +++ b/engine/pkg/externalresource/internal/local/file_manager.go @@ -11,9 +11,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -package broker +package local import ( + "context" "os" "path/filepath" "sync" @@ -21,17 +22,21 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" frameModel "github.com/pingcap/tiflow/engine/framework/model" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" - "github.com/pingcap/tiflow/engine/pkg/externalresource/storagecfg" + "github.com/pingcap/tiflow/engine/model" + "github.com/pingcap/tiflow/engine/pkg/externalresource/internal" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" derrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/fsutil" "go.uber.org/zap" ) -// LocalFileManager manages the local files resources stored in +var _ internal.FileManager = &FileManager{} + +// FileManager manages the local files resources stored in // the local file system. -type LocalFileManager struct { - config storagecfg.LocalFileConfig +type FileManager struct { + executorID model.ExecutorID + config resModel.LocalFileConfig mu sync.Mutex persistedResourcesByCreator map[frameModel.WorkerID]map[resModel.ResourceName]struct{} @@ -40,8 +45,9 @@ type LocalFileManager struct { // NewLocalFileManager returns a new NewLocalFileManager. // Note that the lifetime of the returned object should span the whole // lifetime of the executor. -func NewLocalFileManager(config storagecfg.LocalFileConfig) *LocalFileManager { - return &LocalFileManager{ +func NewLocalFileManager(executorID model.ExecutorID, config resModel.LocalFileConfig) *FileManager { + return &FileManager{ + executorID: executorID, config: config, persistedResourcesByCreator: make(map[frameModel.WorkerID]map[resModel.ResourceName]struct{}), } @@ -51,20 +57,19 @@ func NewLocalFileManager(config storagecfg.LocalFileConfig) *LocalFileManager { // and returns a LocalFileResourceDescriptor. // The resource is NOT marked as persisted by this method. // Only use it when we are sure it is a NEW resource. -func (m *LocalFileManager) CreateResource( - creator frameModel.WorkerID, - resName resModel.ResourceName, -) (*LocalFileResourceDescriptor, error) { - res := &LocalFileResourceDescriptor{ - BasePath: m.config.BaseDir, - Creator: creator, - ResourceName: resName, +func (m *FileManager) CreateResource( + ctx context.Context, ident internal.ResourceIdent, +) (internal.ResourceDescriptor, error) { + m.validateIdent(ident) + res := &FileResourceDescriptor{ + BasePath: m.config.BaseDir, + Ident: ident, } if err := os.MkdirAll(res.AbsolutePath(), 0o700); err != nil { return nil, derrors.ErrCreateLocalFileDirectoryFailed.Wrap(err) } log.Info("Created directory for local file resource", - zap.String("resource-name", resName), + zap.String("resource-name", res.ResourceIdent().Name), zap.String("path", res.AbsolutePath())) // TODO check for quota when we implement quota. return res, nil @@ -72,15 +77,17 @@ func (m *LocalFileManager) CreateResource( // GetPersistedResource checks the given resource exists in the local // file system and returns a LocalFileResourceDescriptor. -func (m *LocalFileManager) GetPersistedResource( - creator frameModel.WorkerID, - resName resModel.ResourceName, -) (*LocalFileResourceDescriptor, error) { - res := &LocalFileResourceDescriptor{ - BasePath: m.config.BaseDir, - Creator: creator, - ResourceName: resName, +func (m *FileManager) GetPersistedResource( + ctx context.Context, ident internal.ResourceIdent, +) (internal.ResourceDescriptor, error) { + // For local file, an executor can only access resources that are created by itself. + m.validateIdent(ident) + res := &FileResourceDescriptor{ + BasePath: m.config.BaseDir, + Ident: ident, } + resName := res.ResourceIdent().Name + creator := res.ResourceIdent().WorkerID if _, err := os.Stat(res.AbsolutePath()); err != nil { if os.IsNotExist(err) { return nil, derrors.ErrResourceDoesNotExist.GenWithStackByArgs(resName) @@ -104,7 +111,10 @@ func (m *LocalFileManager) GetPersistedResource( // RemoveTemporaryFiles cleans up all temporary files (i.e., unpersisted file resources), // created by `creator`. -func (m *LocalFileManager) RemoveTemporaryFiles(creator frameModel.WorkerID) error { +func (m *FileManager) RemoveTemporaryFiles( + ctx context.Context, scope internal.ResourceScope, +) error { + creator := scope.WorkerID log.Info("Start cleaning temporary files", zap.String("worker-id", creator)) @@ -138,7 +148,7 @@ func (m *LocalFileManager) RemoveTemporaryFiles(creator frameModel.WorkerID) err fullPath := filepath.Join( m.config.BaseDir, creator, - resourceNameToFilePathName(resName)) + ResourceNameToFilePathName(resName)) if err := os.RemoveAll(fullPath); err != nil { return derrors.ErrCleaningLocalTempFiles.Wrap(err) } @@ -156,7 +166,12 @@ func (m *LocalFileManager) RemoveTemporaryFiles(creator frameModel.WorkerID) err // RemoveResource removes a single resource from the local file system. // NOTE the caller should handle ErrResourceDoesNotExist appropriately. -func (m *LocalFileManager) RemoveResource(creator frameModel.WorkerID, resName resModel.ResourceName) error { +func (m *FileManager) RemoveResource( + ctx context.Context, ident internal.ResourceIdent, +) error { + m.validateIdent(ident) + resName := ident.Name + creator := ident.WorkerID if creator == "" { log.Panic("Empty creator ID is unexpected", zap.String("resource-name", resName)) @@ -203,10 +218,13 @@ func (m *LocalFileManager) RemoveResource(creator frameModel.WorkerID, resName r // NOTE it is only marked as persisted in memory, because // we assume that if the executor process crashes, the // file resources are lost. -func (m *LocalFileManager) SetPersisted( - creator frameModel.WorkerID, - resName resModel.ResourceName, -) { +func (m *FileManager) SetPersisted( + ctx context.Context, ident internal.ResourceIdent, +) error { + m.validateIdent(ident) + resName := ident.Name + creator := ident.WorkerID + m.mu.Lock() defer m.mu.Unlock() @@ -217,12 +235,12 @@ func (m *LocalFileManager) SetPersisted( } persistedResourceSet[resName] = struct{}{} - return + return nil } // isPersisted returns whether a resource has been persisted. // DO NOT hold the mu when calling this method. -func (m *LocalFileManager) isPersisted( +func (m *FileManager) isPersisted( creator frameModel.WorkerID, resName resModel.ResourceName, ) bool { @@ -261,7 +279,7 @@ func iterOverResourceDirectories(path string, fn func(relPath string) error) err } // PreCheckConfig does a preflight check on the executor's storage configurations. -func PreCheckConfig(config storagecfg.Config) error { +func PreCheckConfig(config resModel.Config) error { baseDir := config.Local.BaseDir if _, err := os.Stat(baseDir); os.IsNotExist(err) { @@ -285,3 +303,18 @@ func PreCheckConfig(config storagecfg.Config) error { // TODO implement a minimum disk space threshold. return nil } + +func (m *FileManager) validateIdent(ident internal.ResourceIdent) { + // Defensive verification to ensure that local resources are not accessible across nodes. + if ident.Executor != m.executorID { + log.Panic("inconsistent executor ID of local file", + zap.Any("fileScope", ident), + zap.Any("creator", ident.Executor), + zap.String("currentExecutor", string(m.executorID))) + } +} + +// Close shuts down the FileManager. +func (m *FileManager) Close() { + log.Info("Closing local file manager") +} diff --git a/engine/pkg/externalresource/broker/file_manager_test.go b/engine/pkg/externalresource/internal/local/file_manager_test.go similarity index 50% rename from engine/pkg/externalresource/broker/file_manager_test.go rename to engine/pkg/externalresource/internal/local/file_manager_test.go index afb003590b1..6479b761d76 100644 --- a/engine/pkg/externalresource/broker/file_manager_test.go +++ b/engine/pkg/externalresource/internal/local/file_manager_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package broker +package local import ( "context" @@ -22,14 +22,27 @@ import ( "github.com/stretchr/testify/require" - "github.com/pingcap/tiflow/engine/pkg/externalresource/storagecfg" + "github.com/pingcap/tiflow/engine/pkg/externalresource/internal" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" + "github.com/pingcap/tiflow/engine/pkg/tenant" ) +func newResourceIdentForTesting(executor, workerID, resourceName string) internal.ResourceIdent { + return internal.ResourceIdent{ + Name: resourceName, + ResourceScope: internal.ResourceScope{ + ProjectInfo: tenant.NewProjectInfo("fakeTenant", "fakeProject"), + Executor: resModel.ExecutorID(executor), + WorkerID: workerID, + }, + } +} + func TestFileManagerBasics(t *testing.T) { t.Parallel() dir := t.TempDir() - fm := NewLocalFileManager(storagecfg.LocalFileConfig{BaseDir: dir}) + fm := NewLocalFileManager("", resModel.LocalFileConfig{BaseDir: dir}) // In this test, we create resource-1 and resource-2, and only // resource-1 will be marked as persisted. @@ -38,13 +51,22 @@ func TestFileManagerBasics(t *testing.T) { // temporary files, while resource-1 can be cleaned up as a persisted // resource. + ctx := context.Background() // Creates resource-1 - res1, err := fm.CreateResource("worker-1", "resource-1") + res, err := fm.CreateResource(ctx, + newResourceIdentForTesting("", "worker-1", "resource-1")) require.NoError(t, err) - require.Equal(t, &LocalFileResourceDescriptor{ - BasePath: dir, - Creator: "worker-1", - ResourceName: "resource-1", + res1, ok := res.(*FileResourceDescriptor) + require.True(t, ok) + require.Equal(t, &FileResourceDescriptor{ + BasePath: dir, + Ident: internal.ResourceIdent{ + Name: "resource-1", + ResourceScope: internal.ResourceScope{ + ProjectInfo: tenant.NewProjectInfo("fakeTenant", "fakeProject"), + WorkerID: "worker-1", + }, + }, }, res1) storage, err := newBrStorageForLocalFile(res1.AbsolutePath()) @@ -55,15 +77,22 @@ func TestFileManagerBasics(t *testing.T) { require.NoError(t, err) require.FileExists(t, res1.AbsolutePath()+"/1.txt") - fm.SetPersisted("worker-1", "resource-1") + fm.SetPersisted(ctx, newResourceIdentForTesting("", "worker-1", "resource-1")) // Creates resource-2 - res2, err := fm.CreateResource("worker-1", "resource-2") + res, err = fm.CreateResource(ctx, newResourceIdentForTesting("", "worker-1", "resource-2")) require.NoError(t, err) - require.Equal(t, &LocalFileResourceDescriptor{ - BasePath: dir, - Creator: "worker-1", - ResourceName: "resource-2", + res2, ok := res.(*FileResourceDescriptor) + require.True(t, ok) + require.Equal(t, &FileResourceDescriptor{ + BasePath: dir, + Ident: internal.ResourceIdent{ + Name: "resource-2", + ResourceScope: internal.ResourceScope{ + ProjectInfo: tenant.NewProjectInfo("fakeTenant", "fakeProject"), + WorkerID: "worker-1", + }, + }, }, res2) storage, err = newBrStorageForLocalFile(res2.AbsolutePath()) @@ -75,19 +104,19 @@ func TestFileManagerBasics(t *testing.T) { require.FileExists(t, res2.AbsolutePath()+"/1.txt") // Clean up temporary files - err = fm.RemoveTemporaryFiles("worker-1") + err = fm.RemoveTemporaryFiles(ctx, internal.ResourceScope{WorkerID: "worker-1"}) require.NoError(t, err) require.NoDirExists(t, res2.AbsolutePath()) require.DirExists(t, res1.AbsolutePath()) // Clean up persisted resource - err = fm.RemoveResource("worker-1", "resource-1") + err = fm.RemoveResource(ctx, newResourceIdentForTesting("", "worker-1", "resource-1")) require.NoError(t, err) require.NoDirExists(t, res1.AbsolutePath()) // Test repeated removals - err = fm.RemoveResource("worker-1", "resource-1") + err = fm.RemoveResource(ctx, newResourceIdentForTesting("", "worker-1", "resource-1")) require.Error(t, err) require.Regexp(t, ".*ErrResourceDoesNotExist.*", err) } @@ -98,58 +127,64 @@ func TestFileManagerManyWorkers(t *testing.T) { const numWorkers = 10 dir := t.TempDir() - fm := NewLocalFileManager(storagecfg.LocalFileConfig{BaseDir: dir}) + fm := NewLocalFileManager("", resModel.LocalFileConfig{BaseDir: dir}) + ctx := context.Background() for i := 0; i < numWorkers; i++ { // For each worker, first create a persisted resource - res, err := fm.CreateResource( + res, err := fm.CreateResource(ctx, newResourceIdentForTesting("", fmt.Sprintf("worker-%d", i), - fmt.Sprintf("resource-%d-1", i)) + fmt.Sprintf("resource-%d-1", i))) require.NoError(t, err) + res1, ok := res.(*FileResourceDescriptor) + require.True(t, ok) - storage, err := newBrStorageForLocalFile(res.AbsolutePath()) + storage, err := newBrStorageForLocalFile(res1.AbsolutePath()) require.NoError(t, err) fwriter, err := storage.Create(context.Background(), "1.txt") require.NoError(t, err) err = fwriter.Close(context.Background()) require.NoError(t, err) - require.FileExists(t, res.AbsolutePath()+"/1.txt") + require.FileExists(t, res1.AbsolutePath()+"/1.txt") - fm.SetPersisted(fmt.Sprintf("worker-%d", i), - fmt.Sprintf("resource-%d-1", i)) + fm.SetPersisted(ctx, newResourceIdentForTesting("", + fmt.Sprintf("worker-%d", i), + fmt.Sprintf("resource-%d-1", i))) // Then create a temporary resource - res, err = fm.CreateResource( + res, err = fm.CreateResource(ctx, newResourceIdentForTesting("", fmt.Sprintf("worker-%d", i), - fmt.Sprintf("resource-%d-2", i)) + fmt.Sprintf("resource-%d-2", i))) require.NoError(t, err) + res2, ok := res.(*FileResourceDescriptor) + require.True(t, ok) - storage, err = newBrStorageForLocalFile(res.AbsolutePath()) + storage, err = newBrStorageForLocalFile(res2.AbsolutePath()) require.NoError(t, err) fwriter, err = storage.Create(context.Background(), "1.txt") require.NoError(t, err) err = fwriter.Close(context.Background()) require.NoError(t, err) - require.FileExists(t, res.AbsolutePath()+"/1.txt") + require.FileExists(t, res2.AbsolutePath()+"/1.txt") } // Garbage collects about half the workers' temporary files. for i := 0; i < numWorkers/2; i++ { workerID := fmt.Sprintf("worker-%d", i) - err := fm.RemoveTemporaryFiles(workerID) + err := fm.RemoveTemporaryFiles(ctx, internal.ResourceScope{WorkerID: workerID}) require.NoError(t, err) } for i := 0; i < numWorkers; i++ { workerID := fmt.Sprintf("worker-%d", i) resourceID1 := fmt.Sprintf("resource-%d-1", i) - require.DirExists(t, filepath.Join(dir, workerID, resourceNameToFilePathName(resourceID1))) + require.DirExists(t, filepath.Join(dir, workerID, ResourceNameToFilePathName(resourceID1))) resourceID2 := fmt.Sprintf("resource-%d-2", i) if i < numWorkers/2 { - require.NoDirExists(t, filepath.Join(dir, workerID, resourceNameToFilePathName(resourceID2))) + require.NoDirExists(t, filepath.Join(dir, workerID, ResourceNameToFilePathName(resourceID2))) } else { - require.DirExists(t, filepath.Join(dir, workerID, resourceNameToFilePathName(resourceID2))) + require.DirExists(t, filepath.Join(dir, workerID, ResourceNameToFilePathName(resourceID2))) } } } @@ -158,10 +193,11 @@ func TestCleanUpTemporaryFilesNotFound(t *testing.T) { t.Parallel() dir := t.TempDir() - fm := NewLocalFileManager(storagecfg.LocalFileConfig{BaseDir: dir}) + fm := NewLocalFileManager("", resModel.LocalFileConfig{BaseDir: dir}) // Note that worker-1 does not have any resource. - err := fm.RemoveTemporaryFiles("worker-1") + err := fm.RemoveTemporaryFiles(context.Background(), + internal.ResourceScope{WorkerID: "worker-1"}) // We expect NoError because it is normal for a worker // to never create any resource. require.NoError(t, err) @@ -171,26 +207,28 @@ func TestCreateAndGetResource(t *testing.T) { t.Parallel() dir := t.TempDir() - fm := NewLocalFileManager(storagecfg.LocalFileConfig{BaseDir: dir}) - _, err := fm.GetPersistedResource("worker-1", "resource-1") + fm := NewLocalFileManager("", resModel.LocalFileConfig{BaseDir: dir}) + ctx := context.Background() + ident := newResourceIdentForTesting("", "worker-1", "resource-1") + _, err := fm.GetPersistedResource(ctx, ident) require.Error(t, err) require.Regexp(t, ".*ErrResourceDoesNotExist.*", err) - _, err = fm.CreateResource("worker-1", "resource-1") + _, err = fm.CreateResource(ctx, ident) require.NoError(t, err) - _, err = fm.GetPersistedResource("worker-1", "resource-1") + _, err = fm.GetPersistedResource(ctx, ident) require.Error(t, err) require.Regexp(t, ".*ErrResourceDoesNotExist.*", err) - fm.SetPersisted("worker-1", "resource-1") - _, err = fm.GetPersistedResource("worker-1", "resource-1") + fm.SetPersisted(ctx, ident) + _, err = fm.GetPersistedResource(ctx, ident) require.NoError(t, err) - err = fm.RemoveResource("worker-1", "resource-1") + err = fm.RemoveResource(ctx, ident) require.NoError(t, err) - _, err = fm.GetPersistedResource("worker-1", "resource-1") + _, err = fm.GetPersistedResource(ctx, ident) require.Error(t, err) require.Regexp(t, ".*ErrResourceDoesNotExist.*", err) } @@ -199,25 +237,26 @@ func TestResourceNamesWithSlash(t *testing.T) { t.Parallel() dir := t.TempDir() - fm := NewLocalFileManager(storagecfg.LocalFileConfig{BaseDir: dir}) + fm := NewLocalFileManager("", resModel.LocalFileConfig{BaseDir: dir}) - _, err := fm.CreateResource("worker-1", "a") + ctx := context.Background() + _, err := fm.CreateResource(ctx, newResourceIdentForTesting("", "worker-1", "a")) require.NoError(t, err) - _, err = fm.CreateResource("worker-1", "a/b") + _, err = fm.CreateResource(ctx, newResourceIdentForTesting("", "worker-1", "a/b")) require.NoError(t, err) - _, err = fm.CreateResource("worker-1", "a/b/c") + _, err = fm.CreateResource(ctx, newResourceIdentForTesting("", "worker-1", "a/b/c")) require.NoError(t, err) - fm.SetPersisted("worker-1", "a/b/c") - _, err = fm.GetPersistedResource("worker-1", "a/b/c") + fm.SetPersisted(ctx, newResourceIdentForTesting("", "worker-1", "a/b/c")) + _, err = fm.GetPersistedResource(ctx, newResourceIdentForTesting("", "worker-1", "a/b/c")) require.NoError(t, err) - err = fm.RemoveTemporaryFiles("worker-1") + err = fm.RemoveTemporaryFiles(ctx, internal.ResourceScope{WorkerID: "worker-1"}) require.NoError(t, err) - _, err = fm.GetPersistedResource("worker-1", "a/b/c") + _, err = fm.GetPersistedResource(ctx, newResourceIdentForTesting("", "worker-1", "a/b/c")) require.NoError(t, err) } @@ -226,18 +265,18 @@ func TestPreCheckConfig(t *testing.T) { // Happy path dir := t.TempDir() - err := PreCheckConfig(storagecfg.Config{Local: storagecfg.LocalFileConfig{BaseDir: dir}}) + err := PreCheckConfig(resModel.Config{Local: resModel.LocalFileConfig{BaseDir: dir}}) require.NoError(t, err) // Directory does not exist but can be created. baseDir := filepath.Join(dir, "not-exist") - err = PreCheckConfig(storagecfg.Config{Local: storagecfg.LocalFileConfig{BaseDir: baseDir}}) + err = PreCheckConfig(resModel.Config{Local: resModel.LocalFileConfig{BaseDir: baseDir}}) require.NoError(t, err) // Directory exists but not writable baseDir = filepath.Join(dir, "not-writable") require.NoError(t, os.MkdirAll(baseDir, 0o400)) - err = PreCheckConfig(storagecfg.Config{Local: storagecfg.LocalFileConfig{BaseDir: baseDir}}) + err = PreCheckConfig(resModel.Config{Local: resModel.LocalFileConfig{BaseDir: baseDir}}) require.Error(t, err) require.Regexp(t, ".*ErrLocalFileDirNotWritable.*", err) } diff --git a/engine/pkg/externalresource/broker/local_file_utils.go b/engine/pkg/externalresource/internal/local/file_utils.go similarity index 81% rename from engine/pkg/externalresource/broker/local_file_utils.go rename to engine/pkg/externalresource/internal/local/file_utils.go index d56c9493808..d91c4da2509 100644 --- a/engine/pkg/externalresource/broker/local_file_utils.go +++ b/engine/pkg/externalresource/internal/local/file_utils.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package broker +package local import ( "context" @@ -24,7 +24,7 @@ import ( "github.com/pingcap/errors" brStorage "github.com/pingcap/tidb/br/pkg/storage" frameModel "github.com/pingcap/tiflow/engine/framework/model" - "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" derrors "github.com/pingcap/tiflow/pkg/errors" ) @@ -40,25 +40,26 @@ func newBrStorageForLocalFile(filePath string) (brStorage.ExternalStorage, error return ls, nil } -func resourceNameToFilePathName(resName model.ResourceName) string { +// ResourceNameToFilePathName converts a resource name to a file path name. +func ResourceNameToFilePathName(resName resModel.ResourceName) string { return hex.EncodeToString([]byte(resName)) } -func filePathNameToResourceName(filePath string) (model.ResourceName, error) { +func filePathNameToResourceName(filePath string) (resModel.ResourceName, error) { result, err := hex.DecodeString(filePath) if err != nil { return "", errors.Trace(err) } - return model.ResourceName(result), nil + return resModel.ResourceName(result), nil } func localPathWithEncoding(baseDir string, creator frameModel.WorkerID, - resName model.ResourceName, + resName resModel.ResourceName, suffixes ...string, ) string { joinSegments := []string{ - baseDir, creator, resourceNameToFilePathName(resName), + baseDir, creator, ResourceNameToFilePathName(resName), } joinSegments = append(joinSegments, suffixes...) return filepath.Join(joinSegments...) @@ -69,7 +70,7 @@ func AssertLocalFileExists( t *testing.T, baseDir string, creator frameModel.WorkerID, - resName model.ResourceName, + resName resModel.ResourceName, suffixes ...string, ) { require.FileExistsf(t, localPathWithEncoding(baseDir, creator, resName, suffixes...), @@ -82,7 +83,7 @@ func AssertNoLocalFileExists( t *testing.T, baseDir string, creator frameModel.WorkerID, - resName model.ResourceName, + resName resModel.ResourceName, suffixes ...string, ) { require.NoFileExists(t, localPathWithEncoding(baseDir, creator, resName, suffixes...), diff --git a/engine/pkg/externalresource/resourcetypes/main_test.go b/engine/pkg/externalresource/internal/local/main_test.go similarity index 96% rename from engine/pkg/externalresource/resourcetypes/main_test.go rename to engine/pkg/externalresource/internal/local/main_test.go index b5b276726bd..159a9b000d0 100644 --- a/engine/pkg/externalresource/resourcetypes/main_test.go +++ b/engine/pkg/externalresource/internal/local/main_test.go @@ -10,7 +10,7 @@ // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. -package resourcetypes +package local import ( "testing" diff --git a/engine/pkg/externalresource/resourcetypes/local_file.go b/engine/pkg/externalresource/internal/local/resource_controller.go similarity index 63% rename from engine/pkg/externalresource/resourcetypes/local_file.go rename to engine/pkg/externalresource/internal/local/resource_controller.go index 095236f18d3..4543a9bd2fd 100644 --- a/engine/pkg/externalresource/resourcetypes/local_file.go +++ b/engine/pkg/externalresource/internal/local/resource_controller.go @@ -11,34 +11,33 @@ // See the License for the specific language governing permissions and // limitations under the License. -package resourcetypes +package local import ( "context" perrors "github.com/pingcap/errors" "github.com/pingcap/tiflow/engine/pkg/client" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" ) -// LocalFileResourceController defines operations specific to -// the local file type. -type LocalFileResourceController struct { +// FileResourceController defines operations specific to the local file type. +type FileResourceController struct { // clientManager is used to communicate with executors. clientGroup client.ExecutorGroup } -// NewLocalFileResourceType creates a new LocalFileResourceController. -func NewLocalFileResourceType(clientGroup client.ExecutorGroup) *LocalFileResourceController { - return &LocalFileResourceController{clientGroup: clientGroup} +// NewFileResourceController creates a new LocalFileResourceController. +func NewFileResourceController(clientGroup client.ExecutorGroup) *FileResourceController { + return &FileResourceController{clientGroup: clientGroup} } // GCHandler returns a closure to the invoker to perform GC. -func (r *LocalFileResourceController) GCHandler() func(context.Context, *resModel.ResourceMeta) error { +func (r *FileResourceController) GCHandler() func(context.Context, *resModel.ResourceMeta) error { return r.removeFilesOnExecutor } -func (r *LocalFileResourceController) removeFilesOnExecutor(ctx context.Context, resource *resModel.ResourceMeta) error { +func (r *FileResourceController) removeFilesOnExecutor(ctx context.Context, resource *resModel.ResourceMeta) error { cli, err := r.clientGroup.GetExecutorClientB(ctx, resource.Executor) if err != nil { return perrors.Annotate(err, "removeFilesOnExecutor") diff --git a/engine/pkg/externalresource/resourcetypes/local_file_test.go b/engine/pkg/externalresource/internal/local/resource_controller_test.go similarity index 93% rename from engine/pkg/externalresource/resourcetypes/local_file_test.go rename to engine/pkg/externalresource/internal/local/resource_controller_test.go index ab1bc3ee24f..51340150e36 100644 --- a/engine/pkg/externalresource/resourcetypes/local_file_test.go +++ b/engine/pkg/externalresource/internal/local/resource_controller_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package resourcetypes +package local import ( "context" @@ -19,7 +19,7 @@ import ( "github.com/golang/mock/gomock" "github.com/pingcap/tiflow/engine/pkg/client" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/stretchr/testify/require" ) @@ -28,7 +28,7 @@ func TestRemoveFileOnExecutor(t *testing.T) { mockCli := client.NewMockExecutorClient(gomock.NewController(t)) clientGroup.AddClient("executor-1", mockCli) - resourceTp := NewLocalFileResourceType(clientGroup) + resourceTp := NewFileResourceController(clientGroup) gcHandler := resourceTp.GCHandler() resMeta := &resModel.ResourceMeta{ diff --git a/engine/pkg/externalresource/internal/local/resource_desc.go b/engine/pkg/externalresource/internal/local/resource_desc.go new file mode 100644 index 00000000000..1f3e5b348dd --- /dev/null +++ b/engine/pkg/externalresource/internal/local/resource_desc.go @@ -0,0 +1,70 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package local + +import ( + "context" + "path/filepath" + + "github.com/pingcap/errors" + brStorage "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tiflow/engine/pkg/externalresource/internal" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" +) + +var _ internal.ResourceDescriptor = (*FileResourceDescriptor)(nil) + +// FileResourceDescriptor contains necessary data +// to access a local file resource. +type FileResourceDescriptor struct { + BasePath string + Ident internal.ResourceIdent + + storage brStorage.ExternalStorage +} + +// AbsolutePath returns the absolute path of the given resource +// in the local file system. +func (d *FileResourceDescriptor) AbsolutePath() string { + encodedName := ResourceNameToFilePathName(d.Ident.Name) + return filepath.Join(d.BasePath, d.Ident.WorkerID, encodedName) +} + +// ExternalStorage creates the storage object if one has not been created yet, and returns the +// created storage object. +func (d *FileResourceDescriptor) ExternalStorage(ctx context.Context) (brStorage.ExternalStorage, error) { + if d.storage == nil { + storage, err := newBrStorageForLocalFile(d.AbsolutePath()) + if err != nil { + return nil, errors.Annotate(err, "creating ExternalStorage for local file") + } + d.storage = storage + } + return d.storage, nil +} + +// URI returns the URI of the local file resource. +func (d *FileResourceDescriptor) URI() string { + return d.AbsolutePath() +} + +// ID returns the resource ID of the local file resource. +func (d *FileResourceDescriptor) ID() resModel.ResourceID { + return resModel.BuildResourceID(resModel.ResourceTypeLocalFile, d.Ident.Name) +} + +// ResourceIdent returns the resource identity of the local file resource. +func (d *FileResourceDescriptor) ResourceIdent() internal.ResourceIdent { + return d.Ident +} diff --git a/engine/pkg/externalresource/resourcetypes/interfaces.go b/engine/pkg/externalresource/internal/resource_controller.go similarity index 81% rename from engine/pkg/externalresource/resourcetypes/interfaces.go rename to engine/pkg/externalresource/internal/resource_controller.go index 60c45772dd7..01ca68493f7 100644 --- a/engine/pkg/externalresource/resourcetypes/interfaces.go +++ b/engine/pkg/externalresource/internal/resource_controller.go @@ -11,16 +11,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -package resourcetypes +package internal import ( "context" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" ) -// ResourceTypeController is an interface providing relevant operations related +// ResourceController is an interface providing relevant operations related // to one resource type. -type ResourceTypeController interface { +type ResourceController interface { GCHandler() func(context.Context, *resModel.ResourceMeta) error } diff --git a/engine/pkg/externalresource/internal/resource_desc.go b/engine/pkg/externalresource/internal/resource_desc.go new file mode 100644 index 00000000000..c7ee73fafe1 --- /dev/null +++ b/engine/pkg/externalresource/internal/resource_desc.go @@ -0,0 +1,30 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + + brStorage "github.com/pingcap/tidb/br/pkg/storage" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" +) + +// ResourceDescriptor is an object used internally by the broker +// to manage resources. +type ResourceDescriptor interface { + URI() string + ID() resModel.ResourceID + ResourceIdent() ResourceIdent + ExternalStorage(ctx context.Context) (brStorage.ExternalStorage, error) +} diff --git a/engine/pkg/externalresource/manager/client.go b/engine/pkg/externalresource/manager/client.go deleted file mode 100644 index be69bff2ddc..00000000000 --- a/engine/pkg/externalresource/manager/client.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package manager - -import ( - "context" - "time" - - pb "github.com/pingcap/tiflow/engine/enginepb" - "github.com/pingcap/tiflow/engine/pkg/rpcutil" - "github.com/pingcap/tiflow/pkg/errors" - "google.golang.org/grpc" -) - -const dialTimeout = 5 * time.Second - -var dialImpl = func(ctx context.Context, addr string) (pb.ResourceManagerClient, rpcutil.CloseableConnIface, error) { - ctx, cancel := context.WithTimeout(ctx, dialTimeout) - defer cancel() - conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithBlock()) - if err != nil { - return nil, nil, errors.WrapError(errors.ErrGrpcBuildConn, err) - } - return pb.NewResourceManagerClient(conn), conn, nil -} - -// NewResourceClient creates a new resource manager rpc client -func NewResourceClient(ctx context.Context, join []string, -) (*rpcutil.FailoverRPCClients[pb.ResourceManagerClient], error) { - clients, err := rpcutil.NewFailoverRPCClients(ctx, join, dialImpl) - if err != nil { - return nil, err - } - return clients, nil -} diff --git a/engine/pkg/externalresource/manager/gc_coordinator.go b/engine/pkg/externalresource/manager/gc_coordinator.go index 1bcfbfd62fe..6984950e82b 100644 --- a/engine/pkg/externalresource/manager/gc_coordinator.go +++ b/engine/pkg/externalresource/manager/gc_coordinator.go @@ -26,7 +26,7 @@ import ( frameModel "github.com/pingcap/tiflow/engine/framework/model" "github.com/pingcap/tiflow/engine/model" engineModel "github.com/pingcap/tiflow/engine/model" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/pingcap/tiflow/engine/pkg/notifier" pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm" ) diff --git a/engine/pkg/externalresource/manager/gc_coordinator_test.go b/engine/pkg/externalresource/manager/gc_coordinator_test.go index 8ceb06f24b9..e34e3e6f07a 100644 --- a/engine/pkg/externalresource/manager/gc_coordinator_test.go +++ b/engine/pkg/externalresource/manager/gc_coordinator_test.go @@ -22,7 +22,7 @@ import ( "github.com/stretchr/testify/require" frameModel "github.com/pingcap/tiflow/engine/framework/model" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm" ) diff --git a/engine/pkg/externalresource/manager/gc_runner.go b/engine/pkg/externalresource/manager/gc_runner.go index c823562ccf5..4e0a7df50a2 100644 --- a/engine/pkg/externalresource/manager/gc_runner.go +++ b/engine/pkg/externalresource/manager/gc_runner.go @@ -23,8 +23,10 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/pingcap/tiflow/engine/pkg/client" "github.com/pingcap/tiflow/engine/pkg/clock" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + "github.com/pingcap/tiflow/engine/pkg/externalresource/internal/local" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm" "github.com/pingcap/tiflow/pkg/retry" ) @@ -49,15 +51,14 @@ type DefaultGCRunner struct { } // NewGCRunner returns a new GCRunner. -func NewGCRunner( - client pkgOrm.ResourceClient, - gcHandlers map[resModel.ResourceType]GCHandlerFunc, -) *DefaultGCRunner { +func NewGCRunner(resClient pkgOrm.ResourceClient, executorClients client.ExecutorGroup) *DefaultGCRunner { return &DefaultGCRunner{ - client: client, - gcHandlers: gcHandlers, - notifyCh: make(chan struct{}, 1), - clock: clock.New(), + client: resClient, + gcHandlers: map[resModel.ResourceType]GCHandlerFunc{ + "local": local.NewFileResourceController(executorClients).GCHandler(), + }, + notifyCh: make(chan struct{}, 1), + clock: clock.New(), } } @@ -126,7 +127,7 @@ func (r *DefaultGCRunner) gcOnce( log.Panic("unexpected gc_pending = false") } - tp, _, err := resModel.ParseResourcePath(res.ID) + tp, _, err := resModel.PasreResourceID(res.ID) if err != nil { return err } diff --git a/engine/pkg/externalresource/manager/gc_runner_test.go b/engine/pkg/externalresource/manager/gc_runner_test.go index 4416b968717..91d21589755 100644 --- a/engine/pkg/externalresource/manager/gc_runner_test.go +++ b/engine/pkg/externalresource/manager/gc_runner_test.go @@ -24,7 +24,7 @@ import ( "github.com/stretchr/testify/require" "github.com/pingcap/tiflow/engine/pkg/clock" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm" ) @@ -59,7 +59,8 @@ func newGCRunnerTestHelperWithMeta(meta pkgOrm.ResourceClient) *gcRunnerTestHelp } return nil } - runner := NewGCRunner(meta, map[resModel.ResourceType]GCHandlerFunc{"local": mockHandler}) + runner := NewGCRunner(meta, nil) + runner.gcHandlers = map[resModel.ResourceType]GCHandlerFunc{"local": mockHandler} clk := clock.NewMock() runner.clock = clk ctx, cancel := context.WithCancel(context.Background()) diff --git a/engine/pkg/externalresource/manager/interfaces.go b/engine/pkg/externalresource/manager/interfaces.go index f3d3a58439b..db7e4136072 100644 --- a/engine/pkg/externalresource/manager/interfaces.go +++ b/engine/pkg/externalresource/manager/interfaces.go @@ -18,7 +18,7 @@ import ( frameModel "github.com/pingcap/tiflow/engine/framework/model" "github.com/pingcap/tiflow/engine/model" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/pingcap/tiflow/engine/pkg/notifier" ) diff --git a/engine/pkg/externalresource/manager/service.go b/engine/pkg/externalresource/manager/service.go index 3f8e0e8d8b1..05cb6839f53 100644 --- a/engine/pkg/externalresource/manager/service.go +++ b/engine/pkg/externalresource/manager/service.go @@ -25,7 +25,7 @@ import ( pb "github.com/pingcap/tiflow/engine/enginepb" "github.com/pingcap/tiflow/engine/model" "github.com/pingcap/tiflow/engine/pkg/externalresource/internal" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm" "github.com/pingcap/tiflow/engine/pkg/rpcutil" "github.com/pingcap/tiflow/engine/pkg/tenant" @@ -197,7 +197,7 @@ func (s *Service) GetPlacementConstraint( zap.String("job-id", resourceKey.JobID), zap.String("resource-id", resourceKey.ID)) - rType, _, err := resModel.ParseResourcePath(resourceKey.ID) + rType, _, err := resModel.PasreResourceID(resourceKey.ID) if err != nil { return "", false, err } diff --git a/engine/pkg/externalresource/manager/service_test.go b/engine/pkg/externalresource/manager/service_test.go index 0637cf57a0c..066234f9943 100644 --- a/engine/pkg/externalresource/manager/service_test.go +++ b/engine/pkg/externalresource/manager/service_test.go @@ -19,7 +19,7 @@ import ( "testing" pb "github.com/pingcap/tiflow/engine/enginepb" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm" "github.com/pingcap/tiflow/engine/pkg/rpcerror" "github.com/pingcap/tiflow/engine/pkg/rpcutil" diff --git a/engine/pkg/externalresource/manager/test_utils.go b/engine/pkg/externalresource/manager/test_utils.go index d1a3807d3a4..57ff169e3c0 100644 --- a/engine/pkg/externalresource/manager/test_utils.go +++ b/engine/pkg/externalresource/manager/test_utils.go @@ -21,7 +21,7 @@ import ( frameModel "github.com/pingcap/tiflow/engine/framework/model" "github.com/pingcap/tiflow/engine/model" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/pingcap/tiflow/engine/pkg/notifier" "github.com/stretchr/testify/require" ) diff --git a/engine/pkg/externalresource/storagecfg/config.go b/engine/pkg/externalresource/model/config.go similarity index 62% rename from engine/pkg/externalresource/storagecfg/config.go rename to engine/pkg/externalresource/model/config.go index 2adb3be3e6a..1a2dd755f9f 100644 --- a/engine/pkg/externalresource/storagecfg/config.go +++ b/engine/pkg/externalresource/model/config.go @@ -11,14 +11,31 @@ // See the License for the specific language governing permissions and // limitations under the License. -package storagecfg +package model + +import ( + brStorage "github.com/pingcap/tidb/br/pkg/storage" +) // Config defines configurations for an external storage resource type Config struct { Local LocalFileConfig `json:"local" toml:"local"` + S3 S3Config `json:"s3" toml:"s3"` +} + +// S3Enabled returns true if the S3 storage is enabled +func (c *Config) S3Enabled() bool { + return c.S3.Bucket != "" && c.S3.Endpoint != "" && + c.S3.AccessKey != "" && c.S3.SecretAccessKey != "" } // LocalFileConfig defines configurations for a local file based resource type LocalFileConfig struct { BaseDir string `json:"base-dir" toml:"base-dir"` } + +// S3Config defines configurations for s3 based resources +type S3Config struct { + brStorage.S3BackendOptions + Bucket string `json:"bucket" toml:"bucket"` +} diff --git a/engine/pkg/externalresource/resourcemeta/model/main_test.go b/engine/pkg/externalresource/model/main_test.go similarity index 100% rename from engine/pkg/externalresource/resourcemeta/model/main_test.go rename to engine/pkg/externalresource/model/main_test.go diff --git a/engine/pkg/externalresource/resourcemeta/model/model.go b/engine/pkg/externalresource/model/model.go similarity index 93% rename from engine/pkg/externalresource/resourcemeta/model/model.go rename to engine/pkg/externalresource/model/model.go index 37fed82db96..d741867b10a 100644 --- a/engine/pkg/externalresource/resourcemeta/model/model.go +++ b/engine/pkg/externalresource/model/model.go @@ -133,8 +133,8 @@ const ( ResourceTypeS3 = ResourceType("s3") ) -// ParseResourcePath returns the ResourceType and the path suffix. -func ParseResourcePath(rpath ResourceID) (ResourceType, ResourceName, error) { +// PasreResourceID returns the ResourceType and the path suffix. +func PasreResourceID(rpath ResourceID) (ResourceType, ResourceName, error) { if !strings.HasPrefix(rpath, "/") { return "", "", errors.ErrIllegalResourcePath.GenWithStackByArgs(rpath) } @@ -157,3 +157,8 @@ func ParseResourcePath(rpath ResourceID) (ResourceType, ResourceName, error) { suffix := path.Join(segments[1:]...) return resourceType, suffix, nil } + +// BuildResourceID returns an ResourceID based on given ResourceType and ResourceName. +func BuildResourceID(rtype ResourceType, name ResourceName) ResourceID { + return path.Join("/"+string(rtype), name) +} diff --git a/engine/pkg/externalresource/resourcemeta/model/model_test.go b/engine/pkg/externalresource/model/model_test.go similarity index 61% rename from engine/pkg/externalresource/resourcemeta/model/model_test.go rename to engine/pkg/externalresource/model/model_test.go index a547539787b..f2457f2ecf3 100644 --- a/engine/pkg/externalresource/resourcemeta/model/model_test.go +++ b/engine/pkg/externalresource/model/model_test.go @@ -19,9 +19,18 @@ import ( "github.com/stretchr/testify/require" ) -func TestParseResourcePath(t *testing.T) { - tp, suffix, err := ParseResourcePath("/local/my-local-resource/a/b/c") +func TestParseResource(t *testing.T) { + tp, suffix, err := PasreResourceID("/local/my-local-resource/a/b/c") require.NoError(t, err) require.Equal(t, ResourceTypeLocalFile, tp) require.Equal(t, "my-local-resource/a/b/c", suffix) + + require.Equal(t, "/local/my-local-resource/a/b/c", BuildResourceID(tp, suffix)) + + tp, suffix, err = PasreResourceID("/s3/my-local-resource/a/b/c") + require.NoError(t, err) + require.Equal(t, ResourceTypeS3, tp) + require.Equal(t, "my-local-resource/a/b/c", suffix) + + require.Equal(t, "/s3/my-local-resource/a/b/c", BuildResourceID(tp, suffix)) } diff --git a/engine/pkg/orm/client.go b/engine/pkg/orm/client.go index ed3d011616c..00bafd38488 100644 --- a/engine/pkg/orm/client.go +++ b/engine/pkg/orm/client.go @@ -24,7 +24,7 @@ import ( frameModel "github.com/pingcap/tiflow/engine/framework/model" engineModel "github.com/pingcap/tiflow/engine/model" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" metaModel "github.com/pingcap/tiflow/engine/pkg/meta/model" "github.com/pingcap/tiflow/engine/pkg/orm/model" execModel "github.com/pingcap/tiflow/engine/servermaster/executormeta/model" diff --git a/engine/pkg/orm/client_test.go b/engine/pkg/orm/client_test.go index 9a69394aaf4..e4f9e64c5d5 100644 --- a/engine/pkg/orm/client_test.go +++ b/engine/pkg/orm/client_test.go @@ -33,7 +33,7 @@ import ( frameModel "github.com/pingcap/tiflow/engine/framework/model" engineModel "github.com/pingcap/tiflow/engine/model" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" metaMock "github.com/pingcap/tiflow/engine/pkg/meta/mock" metaModel "github.com/pingcap/tiflow/engine/pkg/meta/model" "github.com/pingcap/tiflow/engine/pkg/orm/model" diff --git a/engine/pkg/orm/mock/client_mock.go b/engine/pkg/orm/mock/client_mock.go index 0864aec29c9..42ff82f9113 100644 --- a/engine/pkg/orm/mock/client_mock.go +++ b/engine/pkg/orm/mock/client_mock.go @@ -11,7 +11,7 @@ import ( gomock "github.com/golang/mock/gomock" model "github.com/pingcap/tiflow/engine/framework/model" model0 "github.com/pingcap/tiflow/engine/model" - model1 "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + model1 "github.com/pingcap/tiflow/engine/pkg/externalresource/model" orm "github.com/pingcap/tiflow/engine/pkg/orm" model2 "github.com/pingcap/tiflow/engine/pkg/orm/model" ) diff --git a/engine/pkg/orm/mock_test.go b/engine/pkg/orm/mock_test.go index cd43d838aac..3ef1e761606 100644 --- a/engine/pkg/orm/mock_test.go +++ b/engine/pkg/orm/mock_test.go @@ -20,7 +20,7 @@ import ( "time" frameModel "github.com/pingcap/tiflow/engine/framework/model" - resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/pingcap/tiflow/engine/pkg/orm/model" "github.com/pingcap/tiflow/pkg/errors" "github.com/stretchr/testify/require" @@ -589,7 +589,7 @@ func TestResourceMock(t *testing.T) { { fn: "UpsertResource", inputs: []interface{}{ - &resourcemeta.ResourceMeta{ + &resModel.ResourceMeta{ Model: model.Model{ CreatedAt: createdAt, UpdatedAt: updatedAt, @@ -606,7 +606,7 @@ func TestResourceMock(t *testing.T) { { fn: "UpsertResource", inputs: []interface{}{ - &resourcemeta.ResourceMeta{ + &resModel.ResourceMeta{ Model: model.Model{ CreatedAt: createdAt, UpdatedAt: updatedAt, @@ -652,7 +652,7 @@ func TestResourceMock(t *testing.T) { ID: "r333", }, }, - output: &resourcemeta.ResourceMeta{ + output: &resModel.ResourceMeta{ Model: model.Model{ SeqID: 1, CreatedAt: createdAt, @@ -681,7 +681,7 @@ func TestResourceMock(t *testing.T) { inputs: []interface{}{ "j111", }, - output: []*resourcemeta.ResourceMeta{ + output: []*resModel.ResourceMeta{ { Model: model.Model{ SeqID: 1, @@ -702,14 +702,14 @@ func TestResourceMock(t *testing.T) { inputs: []interface{}{ "j112", }, - output: []*resourcemeta.ResourceMeta{}, + output: []*resModel.ResourceMeta{}, }, { fn: "QueryResourcesByExecutorID", inputs: []interface{}{ "e444", }, - output: []*resourcemeta.ResourceMeta{ + output: []*resModel.ResourceMeta{ { Model: model.Model{ SeqID: 1, @@ -730,7 +730,7 @@ func TestResourceMock(t *testing.T) { inputs: []interface{}{ "e445", }, - output: []*resourcemeta.ResourceMeta{}, + output: []*resModel.ResourceMeta{}, }, } diff --git a/engine/servermaster/jobmanager_test.go b/engine/servermaster/jobmanager_test.go index 0ac4d182a2b..3752c1f0093 100644 --- a/engine/servermaster/jobmanager_test.go +++ b/engine/servermaster/jobmanager_test.go @@ -36,7 +36,7 @@ import ( "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/engine/pkg/ctxmu" resManager "github.com/pingcap/tiflow/engine/pkg/externalresource/manager" - resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" jobMock "github.com/pingcap/tiflow/engine/pkg/httputil/mock" "github.com/pingcap/tiflow/engine/pkg/notifier" pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm" @@ -120,7 +120,7 @@ func (m *mockBaseMasterCreateWorkerFailed) CreateWorker( workerType framework.WorkerType, config framework.WorkerConfig, cost model.RescUnit, - resources ...resourcemeta.ResourceID, + resources ...resModel.ResourceID, ) (frameModel.WorkerID, error) { return "", errors.ErrMasterConcurrencyExceeded.FastGenByArgs() } diff --git a/engine/servermaster/scheduler/errors.go b/engine/servermaster/scheduler/errors.go index 35e8ca0c210..7d508ada58c 100644 --- a/engine/servermaster/scheduler/errors.go +++ b/engine/servermaster/scheduler/errors.go @@ -15,7 +15,7 @@ package scheduler import ( "github.com/pingcap/tiflow/engine/model" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/pingcap/tiflow/engine/pkg/rpcerror" "github.com/pingcap/tiflow/pkg/label" ) diff --git a/engine/servermaster/scheduler/model/request.go b/engine/servermaster/scheduler/model/request.go index 41c8c91778d..4e51705d2b5 100644 --- a/engine/servermaster/scheduler/model/request.go +++ b/engine/servermaster/scheduler/model/request.go @@ -17,7 +17,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiflow/engine/enginepb" "github.com/pingcap/tiflow/engine/model" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/pingcap/tiflow/engine/pkg/rpcerror" "github.com/pingcap/tiflow/pkg/label" ) diff --git a/engine/servermaster/scheduler/resource_filter.go b/engine/servermaster/scheduler/resource_filter.go index 7d13e540636..ed77b9e5403 100644 --- a/engine/servermaster/scheduler/resource_filter.go +++ b/engine/servermaster/scheduler/resource_filter.go @@ -18,7 +18,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/engine/model" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" schedModel "github.com/pingcap/tiflow/engine/servermaster/scheduler/model" "github.com/pingcap/tiflow/pkg/errors" "go.uber.org/zap" diff --git a/engine/servermaster/scheduler/resource_filter_test.go b/engine/servermaster/scheduler/resource_filter_test.go index 15b80196cd6..970945259c1 100644 --- a/engine/servermaster/scheduler/resource_filter_test.go +++ b/engine/servermaster/scheduler/resource_filter_test.go @@ -18,7 +18,7 @@ import ( "testing" "github.com/pingcap/tiflow/engine/model" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" schedModel "github.com/pingcap/tiflow/engine/servermaster/scheduler/model" "github.com/stretchr/testify/require" ) diff --git a/engine/servermaster/scheduler/scheduler_test.go b/engine/servermaster/scheduler/scheduler_test.go index b4eab45eb77..cda395f7f07 100644 --- a/engine/servermaster/scheduler/scheduler_test.go +++ b/engine/servermaster/scheduler/scheduler_test.go @@ -18,7 +18,7 @@ import ( "testing" "github.com/pingcap/tiflow/engine/model" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" schedModel "github.com/pingcap/tiflow/engine/servermaster/scheduler/model" "github.com/stretchr/testify/require" ) diff --git a/engine/servermaster/server.go b/engine/servermaster/server.go index 3f7d4fef918..8ad531590d4 100644 --- a/engine/servermaster/server.go +++ b/engine/servermaster/server.go @@ -49,8 +49,6 @@ import ( "github.com/pingcap/tiflow/engine/pkg/deps" "github.com/pingcap/tiflow/engine/pkg/election" externRescManager "github.com/pingcap/tiflow/engine/pkg/externalresource/manager" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" - "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcetypes" "github.com/pingcap/tiflow/engine/pkg/meta" metaModel "github.com/pingcap/tiflow/engine/pkg/meta/model" "github.com/pingcap/tiflow/engine/pkg/openapi" @@ -887,9 +885,7 @@ func (s *Server) runLeaderService(ctx context.Context) (err error) { log.Info("job manager exited") }() - s.gcRunner = externRescManager.NewGCRunner(s.frameMetaClient, map[resModel.ResourceType]externRescManager.GCHandlerFunc{ - "local": resourcetypes.NewLocalFileResourceType(executorClients).GCHandler(), - }) + s.gcRunner = externRescManager.NewGCRunner(s.frameMetaClient, executorClients) s.gcCoordinator = externRescManager.NewGCCoordinator(s.executorManager, s.jobManager, s.frameMetaClient, s.gcRunner) // TODO refactor this method to make it more readable and maintainable.