Skip to content

Commit

Permalink
externalresources(engine): refactor local file manager (#7127)
Browse files Browse the repository at this point in the history
ref #7114
  • Loading branch information
CharlesCheung96 authored Sep 27, 2022
1 parent cd1064a commit ac46242
Show file tree
Hide file tree
Showing 58 changed files with 750 additions and 504 deletions.
9 changes: 5 additions & 4 deletions engine/executor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

"github.com/BurntSushi/toml"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/engine/pkg/externalresource/storagecfg"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/label"
"github.com/pingcap/tiflow/pkg/logutil"
Expand Down Expand Up @@ -64,7 +64,7 @@ type Config struct {
KeepAliveIntervalStr string `toml:"keepalive-interval" json:"keepalive-interval"`
RPCTimeoutStr string `toml:"rpc-timeout" json:"rpc-timeout"`

Storage storagecfg.Config `toml:"storage" json:"storage"`
Storage resModel.Config `toml:"storage" json:"storage"`

KeepAliveTTL time.Duration `toml:"-" json:"-"`
KeepAliveInterval time.Duration `toml:"-" json:"-"`
Expand Down Expand Up @@ -169,8 +169,9 @@ func GetDefaultExecutorConfig() *Config {
KeepAliveTTLStr: defaultKeepAliveTTL,
KeepAliveIntervalStr: defaultKeepAliveInterval,
RPCTimeoutStr: defaultRPCTimeout,
Storage: storagecfg.Config{
Local: storagecfg.LocalFileConfig{BaseDir: ""},
Storage: resModel.Config{
Local: resModel.LocalFileConfig{BaseDir: ""},
S3: resModel.S3Config{Bucket: ""},
},
}
}
6 changes: 3 additions & 3 deletions engine/framework/base_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/pingcap/tiflow/engine/model"
dcontext "github.com/pingcap/tiflow/engine/pkg/context"
"github.com/pingcap/tiflow/engine/pkg/errctx"
resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model"
metaModel "github.com/pingcap/tiflow/engine/pkg/meta/model"
"github.com/pingcap/tiflow/engine/pkg/p2p"
"github.com/pingcap/tiflow/engine/pkg/promutil"
Expand Down Expand Up @@ -59,7 +59,7 @@ type BaseJobMaster interface {
// CreateWorker requires the framework to dispatch a new worker.
// If the worker needs to access certain file system resources,
// their ID's must be passed by `resources`.
CreateWorker(workerType WorkerType, config WorkerConfig, cost model.RescUnit, resources ...resourcemeta.ResourceID) (frameModel.WorkerID, error)
CreateWorker(workerType WorkerType, config WorkerConfig, cost model.RescUnit, resources ...resModel.ResourceID) (frameModel.WorkerID, error)

// CreateWorkerV2 is the latest version of CreateWorker, but with
// a more flexible way of passing options.
Expand Down Expand Up @@ -311,7 +311,7 @@ func (d *DefaultBaseJobMaster) CreateWorker(
workerType WorkerType,
config WorkerConfig,
cost model.RescUnit,
resources ...resourcemeta.ResourceID,
resources ...resModel.ResourceID,
) (frameModel.WorkerID, error) {
return d.master.CreateWorker(workerType, config, cost, resources...)
}
Expand Down
2 changes: 1 addition & 1 deletion engine/framework/internal/master/worker_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
frameModel "github.com/pingcap/tiflow/engine/framework/model"
"github.com/pingcap/tiflow/engine/model"
"github.com/pingcap/tiflow/engine/pkg/client"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model"
pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm"
"github.com/pingcap/tiflow/engine/pkg/tenant"
schedModel "github.com/pingcap/tiflow/engine/servermaster/scheduler/model"
Expand Down
2 changes: 1 addition & 1 deletion engine/framework/internal/master/worker_creator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
frameModel "github.com/pingcap/tiflow/engine/framework/model"
"github.com/pingcap/tiflow/engine/model"
"github.com/pingcap/tiflow/engine/pkg/client"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model"
pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm"
"github.com/pingcap/tiflow/engine/pkg/tenant"
"github.com/pingcap/tiflow/pkg/label"
Expand Down
2 changes: 1 addition & 1 deletion engine/framework/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
dcontext "github.com/pingcap/tiflow/engine/pkg/context"
"github.com/pingcap/tiflow/engine/pkg/deps"
"github.com/pingcap/tiflow/engine/pkg/errctx"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model"
"github.com/pingcap/tiflow/engine/pkg/meta"
metaModel "github.com/pingcap/tiflow/engine/pkg/meta/model"
pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm"
Expand Down
9 changes: 4 additions & 5 deletions engine/framework/master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ import (
"testing"
"time"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/pingcap/tiflow/engine/framework/metadata"
frameModel "github.com/pingcap/tiflow/engine/framework/model"
"github.com/pingcap/tiflow/engine/framework/statusutil"
resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/uuid"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

const (
Expand Down Expand Up @@ -150,7 +149,7 @@ func TestMasterCreateWorker(t *testing.T) {
masterName,
workerID1,
executorNodeID1,
[]resourcemeta.ResourceID{"resource-1", "resource-2"},
[]resModel.ResourceID{"resource-1", "resource-2"},
// call GenEpoch three times, including create master meta, master init
// refresh meta, create worker.
epoch+3,
Expand Down
6 changes: 3 additions & 3 deletions engine/framework/mock_master_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/pingcap/tiflow/engine/pkg/clock"
dcontext "github.com/pingcap/tiflow/engine/pkg/context"
"github.com/pingcap/tiflow/engine/pkg/deps"
resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model"
metaMock "github.com/pingcap/tiflow/engine/pkg/meta/mock"
pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm"
"github.com/pingcap/tiflow/engine/pkg/p2p"
Expand Down Expand Up @@ -106,14 +106,14 @@ func MockBaseMasterCreateWorker(
masterID frameModel.MasterID,
workerID frameModel.WorkerID,
executorID model.ExecutorID,
resources []resourcemeta.ResourceID,
resources []resModel.ResourceID,
workerEpoch frameModel.Epoch,
) {
master.uuidGen = uuid.NewMock()
expectedSchedulerReq := &pb.ScheduleTaskRequest{
TaskId: workerID,
Cost: int64(cost),
ResourceRequirements: resourcemeta.ToResourceRequirement(masterID, resources...),
ResourceRequirements: resModel.ToResourceRequirement(masterID, resources...),
}
master.serverMasterClient.(*client.MockServerMasterClient).EXPECT().
ScheduleTask(gomock.Any(), gomock.Eq(expectedSchedulerReq)).
Expand Down
6 changes: 3 additions & 3 deletions engine/framework/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
dcontext "github.com/pingcap/tiflow/engine/pkg/context"
"github.com/pingcap/tiflow/engine/pkg/errctx"
"github.com/pingcap/tiflow/engine/pkg/externalresource/broker"
resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model"
"github.com/pingcap/tiflow/engine/pkg/meta"
metaModel "github.com/pingcap/tiflow/engine/pkg/meta/model"
pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm"
Expand Down Expand Up @@ -103,7 +103,7 @@ type BaseWorker interface {
SendMessage(ctx context.Context, topic p2p.Topic, message interface{}, nonblocking bool) error

// OpenStorage creates a resource and return the resource handle
OpenStorage(ctx context.Context, resourcePath resourcemeta.ResourceID) (broker.Handle, error)
OpenStorage(ctx context.Context, resourcePath resModel.ResourceID) (broker.Handle, error)

// Exit should be called when worker (in user logic) wants to exit.
// exitReason: ExitReasonFinished/ExitReasonCanceled/ExitReasonFailed
Expand Down Expand Up @@ -493,7 +493,7 @@ func (w *DefaultBaseWorker) SendMessage(
}

// OpenStorage implements BaseWorker.OpenStorage
func (w *DefaultBaseWorker) OpenStorage(ctx context.Context, resourcePath resourcemeta.ResourceID) (broker.Handle, error) {
func (w *DefaultBaseWorker) OpenStorage(ctx context.Context, resourcePath resModel.ResourceID) (broker.Handle, error) {
ctx, cancel := w.errCenter.WithCancelOnFirstError(ctx)
defer cancel()
return w.resourceBroker.OpenStorage(ctx, w.projectInfo, w.id, w.masterID, resourcePath)
Expand Down
2 changes: 1 addition & 1 deletion engine/jobmaster/dm/dm_jobmaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import (
"github.com/pingcap/tiflow/engine/pkg/deps"
dmpkg "github.com/pingcap/tiflow/engine/pkg/dm"
"github.com/pingcap/tiflow/engine/pkg/externalresource/broker"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model"
kvmock "github.com/pingcap/tiflow/engine/pkg/meta/mock"
metaModel "github.com/pingcap/tiflow/engine/pkg/meta/model"
pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm"
Expand Down
6 changes: 3 additions & 3 deletions engine/jobmaster/dm/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
package dm

import (
resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model"
)

// NewDMResourceID returns a ResourceID in DM's style. Currently only support local resource.
func NewDMResourceID(taskName, sourceName string) resourcemeta.ResourceID {
return "/" + string(resourcemeta.ResourceTypeLocalFile) + "/" + taskName + "/" + sourceName
func NewDMResourceID(taskName, sourceName string) resModel.ResourceID {
return "/" + string(resModel.ResourceTypeLocalFile) + "/" + taskName + "/" + sourceName
}
13 changes: 6 additions & 7 deletions engine/jobmaster/dm/worker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@ import (
"time"

dmconfig "github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/engine/model"
resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model"
"go.uber.org/zap"

"github.com/pingcap/tiflow/engine/framework"
"github.com/pingcap/tiflow/engine/framework/logutil"
frameModel "github.com/pingcap/tiflow/engine/framework/model"
"github.com/pingcap/tiflow/engine/jobmaster/dm/config"
"github.com/pingcap/tiflow/engine/jobmaster/dm/metadata"
"github.com/pingcap/tiflow/engine/jobmaster/dm/runtime"
"github.com/pingcap/tiflow/engine/jobmaster/dm/ticker"
"github.com/pingcap/tiflow/engine/model"
dmpkg "github.com/pingcap/tiflow/engine/pkg/dm"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model"
"go.uber.org/zap"
)

var (
Expand All @@ -47,7 +46,7 @@ type WorkerAgent interface {
workerType framework.WorkerType,
config framework.WorkerConfig,
cost model.RescUnit,
resources ...resourcemeta.ResourceID,
resources ...resModel.ResourceID,
) (frameModel.WorkerID, error)
}

Expand Down Expand Up @@ -249,7 +248,7 @@ func (wm *WorkerManager) checkAndScheduleWorkers(ctx context.Context, job *metad
wm.logger.Info("switch to next unit", zap.String("task_id", taskID), zap.Stringer("next_unit", runningWorker.Unit))
}

var resources []resourcemeta.ResourceID
var resources []resModel.ResourceID
// first worker don't need local resource.
// unfresh sync unit don't need local resource.(if we need to save table checkpoint for loadTableStructureFromDump in future, we can save it before saving global checkpoint.)
// TODO: storage should be created/discarded in jobmaster instead of worker.
Expand Down Expand Up @@ -332,7 +331,7 @@ func (wm *WorkerManager) createWorker(
taskID string,
unit frameModel.WorkerType,
taskCfg *config.TaskCfg,
resources ...resourcemeta.ResourceID,
resources ...resModel.ResourceID,
) error {
wm.logger.Info("start to create worker", zap.String("task_id", taskID), zap.Stringer("unit", unit))
workerID, err := wm.workerAgent.CreateWorker(unit, taskCfg, 1, resources...)
Expand Down
9 changes: 4 additions & 5 deletions engine/jobmaster/dm/worker_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,17 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
dmconfig "github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/engine/model"
resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model"
"github.com/stretchr/testify/require"

"github.com/pingcap/tiflow/engine/framework"
frameModel "github.com/pingcap/tiflow/engine/framework/model"
"github.com/pingcap/tiflow/engine/jobmaster/dm/config"
"github.com/pingcap/tiflow/engine/jobmaster/dm/metadata"
"github.com/pingcap/tiflow/engine/jobmaster/dm/runtime"
"github.com/pingcap/tiflow/engine/model"
dmpkg "github.com/pingcap/tiflow/engine/pkg/dm"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model"
kvmock "github.com/pingcap/tiflow/engine/pkg/meta/mock"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func (t *testDMJobmasterSuite) TestUpdateWorkerStatus() {
Expand Down Expand Up @@ -580,7 +579,7 @@ type MockWorkerAgent struct {
mock.Mock
}

func (mockAgent *MockWorkerAgent) CreateWorker(workerType framework.WorkerType, taskCfg interface{}, cost model.RescUnit, resources ...resourcemeta.ResourceID) (frameModel.WorkerID, error) {
func (mockAgent *MockWorkerAgent) CreateWorker(workerType framework.WorkerType, taskCfg interface{}, cost model.RescUnit, resources ...resModel.ResourceID) (frameModel.WorkerID, error) {
mockAgent.Lock()
defer mockAgent.Unlock()
args := mockAgent.Called()
Expand Down
2 changes: 1 addition & 1 deletion engine/pkg/client/broker_service_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/pingcap/tiflow/engine/enginepb"
frameModel "github.com/pingcap/tiflow/engine/framework/model"
"github.com/pingcap/tiflow/engine/pkg/client/internal"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model"
resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model"
)

// BrokerServiceClient wraps a pb.BrokerServiceClient
Expand Down
Loading

0 comments on commit ac46242

Please sign in to comment.