Skip to content

Commit

Permalink
generate resource id by storage config
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Oct 21, 2022
1 parent 4fa939d commit 12038ea
Show file tree
Hide file tree
Showing 12 changed files with 66 additions and 23 deletions.
2 changes: 1 addition & 1 deletion engine/executor/dm/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (w *dmWorker) CloseImpl(ctx context.Context) {

// setupStorage opens and configs external storage
func (w *dmWorker) setupStorage(ctx context.Context) error {
rid := dm.NewDMResourceID(w.cfg.Name, w.cfg.SourceID)
rid := dm.NewDMResourceID(w.cfg.Name, w.cfg.SourceID, w.IsS3StorageEnabled())
opts := []broker.OpenStorageOption{}
if w.workerType == frameModel.WorkerDMDump {
// always use an empty storage for dumpling task
Expand Down
8 changes: 8 additions & 0 deletions engine/framework/base_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ type BaseJobMaster interface {
// IsBaseJobMaster is an empty function used to prevent accidental implementation
// of this interface.
IsBaseJobMaster()

// IsS3StorageEnabled returns whether the s3 storage is enabled
IsS3StorageEnabled() bool
}

// BaseJobMasterExt extends BaseJobMaster with some extra methods.
Expand Down Expand Up @@ -355,6 +358,11 @@ func (d *DefaultBaseJobMaster) CurrentEpoch() frameModel.Epoch {
func (d *DefaultBaseJobMaster) IsBaseJobMaster() {
}

// IsS3StorageEnabled implements BaseJobMaster.IsS3StorageEnabled
func (d *DefaultBaseJobMaster) IsS3StorageEnabled() bool {
return d.worker.IsS3StorageEnabled()
}

// SendMessage delegates the SendMessage or inner worker
func (d *DefaultBaseJobMaster) SendMessage(ctx context.Context, topic p2p.Topic, message interface{}, nonblocking bool) error {
ctx, cancel := d.errCenter.WithCancelOnFirstError(ctx)
Expand Down
8 changes: 8 additions & 0 deletions engine/framework/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ type BaseWorker interface {
ctx context.Context, resourcePath resModel.ResourceID, opts ...broker.OpenStorageOption,
) (broker.Handle, error)

// IsS3StorageEnabled returns whether the s3 storage is enabled
IsS3StorageEnabled() bool

// Exit should be called when worker (in user logic) wants to exit.
// exitReason: ExitReasonFinished/ExitReasonCanceled/ExitReasonFailed
Exit(ctx context.Context, exitReason ExitReason, err error, extBytes []byte) error
Expand Down Expand Up @@ -515,6 +518,11 @@ func (w *DefaultBaseWorker) OpenStorage(
return w.resourceBroker.OpenStorage(ctx, w.projectInfo, w.id, w.masterID, resourcePath, opts...)
}

// IsS3StorageEnabled implements BaseWorker.IsS3StorageEnabled
func (w *DefaultBaseWorker) IsS3StorageEnabled() bool {
return w.resourceBroker.IsS3StorageEnabled()
}

// Exit implements BaseWorker.Exit
func (w *DefaultBaseWorker) Exit(ctx context.Context, exitReason ExitReason, err error, extBytes []byte) (errRet error) {
// Set the errCenter to prevent user from forgetting to return directly after calling 'Exit'
Expand Down
4 changes: 2 additions & 2 deletions engine/jobmaster/dm/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestQueryStatusAPI(t *testing.T) {
)
messageAgent := &dmpkg.MockMessageAgent{}
jm.messageAgent = messageAgent
jm.workerManager = NewWorkerManager(mockBaseJobmaster.ID(), nil, jm.metadata.JobStore(), nil, nil, nil, jm.Logger())
jm.workerManager = NewWorkerManager(mockBaseJobmaster.ID(), nil, jm.metadata.JobStore(), nil, nil, nil, jm.Logger(), false)
jm.taskManager = NewTaskManager(nil, nil, nil, jm.Logger())
jm.workerManager.UpdateWorkerStatus(runtime.NewWorkerStatus("task2", frameModel.WorkerDMLoad, "worker2", runtime.WorkerFinished, 3))
jm.workerManager.UpdateWorkerStatus(runtime.NewWorkerStatus("task3", frameModel.WorkerDMDump, "worker3", runtime.WorkerOnline, 4))
Expand Down Expand Up @@ -332,7 +332,7 @@ func TestUpdateJobCfg(t *testing.T) {
}
)
jm.taskManager = NewTaskManager(nil, jobStore, messageAgent, jm.Logger())
jm.workerManager = NewWorkerManager(mockBaseJobmaster.ID(), nil, jobStore, jm, messageAgent, mockCheckpointAgent, jm.Logger())
jm.workerManager = NewWorkerManager(mockBaseJobmaster.ID(), nil, jobStore, jm, messageAgent, mockCheckpointAgent, jm.Logger(), false)
funcBackup := master.CheckAndAdjustSourceConfigFunc
master.CheckAndAdjustSourceConfigFunc = func(ctx context.Context, cfg *dmconfig.SourceConfig) error { return nil }
defer func() {
Expand Down
3 changes: 2 additions & 1 deletion engine/jobmaster/dm/dm_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ func (jm *JobMaster) initComponents() error {
jm.messageAgent = dmpkg.NewMessageAgent(jm.ID(), jm, jm.messageHandlerManager, jm.Logger())
jm.checkpointAgent = checkpoint.NewCheckpointAgent(jm.ID(), jm.Logger())
jm.taskManager = NewTaskManager(taskStatus, jm.metadata.JobStore(), jm.messageAgent, jm.Logger())
jm.workerManager = NewWorkerManager(jm.ID(), workerStatus, jm.metadata.JobStore(), jm, jm.messageAgent, jm.checkpointAgent, jm.Logger())
jm.workerManager = NewWorkerManager(jm.ID(), workerStatus, jm.metadata.JobStore(),
jm, jm.messageAgent, jm.checkpointAgent, jm.Logger(), jm.IsS3StorageEnabled())
return err
}

Expand Down
2 changes: 1 addition & 1 deletion engine/jobmaster/dm/openapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (t *testDMOpenAPISuite) SetupSuite() {
}
)
jm.taskManager = NewTaskManager(nil, jm.metadata.JobStore(), jm.messageAgent, jm.Logger())
jm.workerManager = NewWorkerManager(mockBaseJobmaster.ID(), nil, jm.metadata.JobStore(), nil, jm.messageAgent, nil, jm.Logger())
jm.workerManager = NewWorkerManager(mockBaseJobmaster.ID(), nil, jm.metadata.JobStore(), nil, jm.messageAgent, nil, jm.Logger(), false)

engine := gin.New()
apiGroup := engine.Group(baseURL)
Expand Down
8 changes: 6 additions & 2 deletions engine/jobmaster/dm/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
)

// NewDMResourceID returns a ResourceID in DM's style. Currently only support s3 resource.
func NewDMResourceID(taskName, sourceName string) resModel.ResourceID {
return "/" + string(resModel.ResourceTypeS3) + "/" + taskName + "-" + sourceName
func NewDMResourceID(taskName, sourceName string, isS3Enabled bool) resModel.ResourceID {
resType := resModel.ResourceTypeLocalFile
if isS3Enabled {
resType = resModel.ResourceTypeS3
}
return "/" + string(resType) + "/" + taskName + "-" + sourceName
}
31 changes: 22 additions & 9 deletions engine/jobmaster/dm/worker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,33 @@ type WorkerManager struct {
checkpointAgent CheckpointAgent
logger *zap.Logger

isS3StorageEnabled bool

// workerStatusMap record the runtime worker status
// taskID -> WorkerStatus
workerStatusMap sync.Map
}

// NewWorkerManager creates a new WorkerManager instance
func NewWorkerManager(jobID string, initWorkerStatus []runtime.WorkerStatus, jobStore *metadata.JobStore, workerAgent WorkerAgent, messageAgent dmpkg.MessageAgent, checkpointAgent CheckpointAgent, pLogger *zap.Logger) *WorkerManager {
func NewWorkerManager(
jobID string,
initWorkerStatus []runtime.WorkerStatus,
jobStore *metadata.JobStore,
workerAgent WorkerAgent,
messageAgent dmpkg.MessageAgent,
checkpointAgent CheckpointAgent,
pLogger *zap.Logger,
isS3StorageEnabled bool,
) *WorkerManager {
workerManager := &WorkerManager{
DefaultTicker: ticker.NewDefaultTicker(WorkerNormalInterval, WorkerErrorInterval),
jobID: jobID,
jobStore: jobStore,
workerAgent: workerAgent,
messageAgent: messageAgent,
checkpointAgent: checkpointAgent,
logger: pLogger.With(zap.String("component", "worker_manager")),
DefaultTicker: ticker.NewDefaultTicker(WorkerNormalInterval, WorkerErrorInterval),
jobID: jobID,
jobStore: jobStore,
workerAgent: workerAgent,
messageAgent: messageAgent,
checkpointAgent: checkpointAgent,
logger: pLogger.With(zap.String("component", "worker_manager")),
isS3StorageEnabled: isS3StorageEnabled,
}
workerManager.DefaultTicker.Ticker = workerManager

Expand Down Expand Up @@ -254,7 +266,8 @@ func (wm *WorkerManager) checkAndScheduleWorkers(ctx context.Context, job *metad
// unfresh sync unit don't need local resource.(if we need to save table checkpoint for loadTableStructureFromDump in future, we can save it before saving global checkpoint.)
// TODO: storage should be created/discarded in jobmaster instead of worker.
if workerIdxInSeq(persistentTask.Cfg.TaskMode, nextUnit) != 0 && !(nextUnit == frameModel.WorkerDMSync && !isFresh) {
resources = append(resources, NewDMResourceID(wm.jobID, persistentTask.Cfg.Upstreams[0].SourceID))
resId := NewDMResourceID(wm.jobID, persistentTask.Cfg.Upstreams[0].SourceID, wm.isS3StorageEnabled)
resources = append(resources, resId)
}

// FIXME: remove this after fix https://github.com/pingcap/tiflow/issues/7304
Expand Down
14 changes: 7 additions & 7 deletions engine/jobmaster/dm/worker_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (t *testDMJobmasterSuite) TestUpdateWorkerStatus() {
job := metadata.NewJob(jobCfg)
jobStore := metadata.NewJobStore(kvmock.NewMetaMock(), log.L())
require.NoError(t.T(), jobStore.Put(context.Background(), job))
workerManager := NewWorkerManager("job_id", nil, jobStore, nil, nil, nil, log.L())
workerManager := NewWorkerManager("job_id", nil, jobStore, nil, nil, nil, log.L(), false)

require.Len(t.T(), workerManager.WorkerStatus(), 0)

Expand Down Expand Up @@ -100,7 +100,7 @@ func (t *testDMJobmasterSuite) TestUpdateWorkerStatus() {
workerStatus1.Stage = runtime.WorkerOnline
workerStatus2.Stage = runtime.WorkerOnline
workerStatusList := []runtime.WorkerStatus{workerStatus1, workerStatus2}
workerManager = NewWorkerManager("job_id", workerStatusList, jobStore, nil, nil, nil, log.L())
workerManager = NewWorkerManager("job_id", workerStatusList, jobStore, nil, nil, nil, log.L(), false)
workerStatusMap = workerManager.WorkerStatus()
require.Len(t.T(), workerStatusMap, 2)
require.Contains(t.T(), workerStatusMap, source1)
Expand Down Expand Up @@ -138,7 +138,7 @@ func (t *testDMJobmasterSuite) TestClearWorkerStatus() {
workerStatus1 := runtime.InitWorkerStatus(source1, frameModel.WorkerDMDump, "worker-id-1")
workerStatus2 := runtime.InitWorkerStatus(source2, frameModel.WorkerDMDump, "worker-id-2")

workerManager := NewWorkerManager("job_id", []runtime.WorkerStatus{workerStatus1, workerStatus2}, nil, nil, messageAgent, nil, log.L())
workerManager := NewWorkerManager("job_id", []runtime.WorkerStatus{workerStatus1, workerStatus2}, nil, nil, messageAgent, nil, log.L(), false)
require.Len(t.T(), workerManager.WorkerStatus(), 2)

workerManager.removeOfflineWorkers()
Expand Down Expand Up @@ -222,7 +222,7 @@ func (t *testDMJobmasterSuite) TestClearWorkerStatus() {

func (t *testDMJobmasterSuite) TestCreateWorker() {
mockAgent := &MockWorkerAgent{}
workerManager := NewWorkerManager("job_id", nil, nil, mockAgent, nil, nil, log.L())
workerManager := NewWorkerManager("job_id", nil, nil, mockAgent, nil, nil, log.L(), false)

jobCfg := &config.JobCfg{}
require.NoError(t.T(), jobCfg.DecodeFile(jobTemplatePath))
Expand Down Expand Up @@ -261,7 +261,7 @@ func (t *testDMJobmasterSuite) TestGetUnit() {
mockAgent := &MockCheckpointAgent{}
task := &metadata.Task{Cfg: &config.TaskCfg{}}
task.Cfg.TaskMode = dmconfig.ModeFull
workerManager := NewWorkerManager("job_id", nil, nil, nil, nil, mockAgent, log.L())
workerManager := NewWorkerManager("job_id", nil, nil, nil, nil, mockAgent, log.L(), false)

workerStatus := runtime.NewWorkerStatus("source", frameModel.WorkerDMDump, "worker-id-1", runtime.WorkerOnline, 0)
require.Equal(t.T(), getNextUnit(task, workerStatus), frameModel.WorkerDMDump)
Expand Down Expand Up @@ -340,7 +340,7 @@ func (t *testDMJobmasterSuite) TestCheckAndScheduleWorkers() {
job := metadata.NewJob(jobCfg)
checkpointAgent := &MockCheckpointAgent{}
workerAgent := &MockWorkerAgent{}
workerManager := NewWorkerManager("job_id", nil, nil, workerAgent, nil, checkpointAgent, log.L())
workerManager := NewWorkerManager("job_id", nil, nil, workerAgent, nil, checkpointAgent, log.L(), false)

// new tasks
worker1 := "worker1"
Expand Down Expand Up @@ -431,7 +431,7 @@ func (t *testDMJobmasterSuite) TestWorkerManager() {
checkpointAgent := &MockCheckpointAgent{}
workerAgent := &MockWorkerAgent{}
messageAgent := &dmpkg.MockMessageAgent{}
workerManager := NewWorkerManager("job_id", nil, jobStore, workerAgent, messageAgent, checkpointAgent, log.L())
workerManager := NewWorkerManager("job_id", nil, jobStore, workerAgent, messageAgent, checkpointAgent, log.L(), false)
source1 := jobCfg.Upstreams[0].SourceID
source2 := jobCfg.Upstreams[1].SourceID

Expand Down
5 changes: 5 additions & 0 deletions engine/pkg/externalresource/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,11 @@ func (b *DefaultBroker) Close() {
}
}

func (b *DefaultBroker) IsS3StorageEnabled() bool {
_, ok := b.fileManagers[resModel.ResourceTypeS3]
return ok
}

// PreCheckConfig checks the configuration of external storage.
func PreCheckConfig(config resModel.Config) error {
if config.LocalEnabled() {
Expand Down
2 changes: 2 additions & 0 deletions engine/pkg/externalresource/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,10 @@ func TestBrokerOpenExistingStorageWithOption(t *testing.T) {
fakeProjectInfo := tenant.NewProjectInfo("fakeTenant", "fakeProject")
brk, cli, _ := newBroker(t)
defer brk.Close()
require.False(t, brk.IsS3StorageEnabled())
mockS3FileManager, _ := s3.NewFileManagerForUT(t.TempDir(), brk.executorID)
brk.fileManagers[resModel.ResourceTypeS3] = mockS3FileManager
require.True(t, brk.IsS3StorageEnabled())

openStorageWithClean := func(resID resModel.ResourceID) {
// resource metadata exists
Expand Down
2 changes: 2 additions & 0 deletions engine/pkg/externalresource/broker/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type Broker interface {
jobID resModel.JobID,
)

IsS3StorageEnabled() bool

Close()
}

Expand Down

0 comments on commit 12038ea

Please sign in to comment.