Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support continue command #34

Merged
merged 9 commits into from
Sep 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ jobs:
mongodb-password: pwd

- name: Test
run: go test -coverprofile coverage.txt -covermode=atomic -v ./...
run: make g-test

- name: Upload Coverage report to CodeCov
uses: codecov/codecov-action@v2
with:
file: ./coverage.txt
file: ./coverage.out
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ test:
# include integration tests
.PHONY: g-test
g-test:
go test -race -coverprofile=coverage.out ./... -tags=integration
go test -tags=integration -race -coverprofile=coverage.out ./...

# usage
# you must run `make install` to install necessary tools
# make mock dir=path/to/mock
# make mock
.PHONY: mock
mock:
for file in `find . -type d \( -path ./.git -o -path ./.github \) -prune -o -name '*.go' -print | xargs grep --files-with-matches -e '//go:generate mockgen'`; do \
Expand Down
9 changes: 5 additions & 4 deletions keeper/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Keeper struct {

wg sync.WaitGroup
firstInitWg sync.WaitGroup
initCompleted bool
initCompleted atomic.Value
closeCh chan struct{}
}

Expand All @@ -63,6 +63,7 @@ func NewKeeper(opt *KeeperOption) *Keeper {
closeCh: make(chan struct{}),
}
k.leaderFlag.Store(false)
k.initCompleted.Store(false)
return k
}

Expand Down Expand Up @@ -103,7 +104,7 @@ func (k *Keeper) Init() error {
go k.goHeartBeat()

k.firstInitWg.Wait()
k.initCompleted = true
k.initCompleted.Store(true)
return nil
}

Expand Down Expand Up @@ -325,7 +326,7 @@ func (k *Keeper) elect() {
}
}

if !k.initCompleted {
if !k.initCompleted.Load().(bool) {
k.firstInitWg.Done()
}
}
Expand Down Expand Up @@ -423,7 +424,7 @@ func (k *Keeper) goHeartBeat() {
continue
}
}
if !k.initCompleted {
if !k.initCompleted.Load().(bool) {
k.firstInitWg.Done()
}
}
Expand Down
5 changes: 3 additions & 2 deletions keeper/mongo/mongo_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var mongoConn = "mongodb://root:[email protected]:27017/fastflow?authSource=admin"
Expand Down Expand Up @@ -73,7 +74,7 @@ func TestKeeper_Concurrency(t *testing.T) {
}
}()

curCnt := 40
curCnt := 20
initCompleted := sync.WaitGroup{}
initCompleted.Add(curCnt)
closeCh := make(chan struct{})
Expand Down Expand Up @@ -121,7 +122,7 @@ func initWorker(t *testing.T, key string) *Keeper {
ConnStr: mongoConn,
})
err := w.Init()
assert.NoError(t, err)
require.NoError(t, err)
return w
}

Expand Down
1 change: 0 additions & 1 deletion keeper/mongo/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func (m *MongoMutex) spinLock(ctx context.Context, opt *mod.LockOption) error {
return nil
}

//
func (m *MongoMutex) Unlock(ctx context.Context) error {
if m.lockDetail == nil {
return fmt.Errorf("the mutex is not locked")
Expand Down
37 changes: 27 additions & 10 deletions pkg/entity/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,12 @@ type DagInstanceHookFunc func(dagIns *DagInstance)

// DagInstanceLifecycleHook
type DagInstanceLifecycleHook struct {
BeforeRun DagInstanceHookFunc
BeforeSuccess DagInstanceHookFunc
BeforeFail DagInstanceHookFunc
BeforeBlock DagInstanceHookFunc
BeforeRetry DagInstanceHookFunc
BeforeRun DagInstanceHookFunc
BeforeSuccess DagInstanceHookFunc
BeforeFail DagInstanceHookFunc
BeforeBlock DagInstanceHookFunc
BeforeRetry DagInstanceHookFunc
BeforeContinue DagInstanceHookFunc
}

// VarsGetter
Expand Down Expand Up @@ -243,15 +244,30 @@ func (dagIns *DagInstance) Block(reason string) {
dagIns.Status = DagInstanceStatusBlocked
}

// Retry a task, it is just set a command, command will execute by Parser
// Retry tasks, it is just set a command, command will execute by Parser
func (dagIns *DagInstance) Retry(taskInsIds []string) error {
return dagIns.genCmd(taskInsIds, CommandNameRetry)
}

// Continue tasks, it is just set a command, command will execute by Parser
func (dagIns *DagInstance) Continue(taskInsIds []string) error {
return dagIns.genCmd(taskInsIds, CommandNameContinue)
}

func (dagIns *DagInstance) genCmd(taskInsIds []string, cmdName CommandName) error {
if dagIns.Cmd != nil {
return fmt.Errorf("dag instance have a incomplete command")
}

dagIns.executeHook(HookDagInstance.BeforeRetry)
switch cmdName {
case CommandNameRetry:
dagIns.executeHook(HookDagInstance.BeforeRetry)
case CommandNameContinue:
dagIns.executeHook(HookDagInstance.BeforeContinue)
}

dagIns.Cmd = &Command{
Name: CommandNameRetry,
Name: cmdName,
TargetTaskInsIDs: taskInsIds,
}
return nil
Expand Down Expand Up @@ -290,8 +306,9 @@ type Command struct {
type CommandName string

const (
CommandNameRetry = "retry"
CommandNameCancel = "cancel"
CommandNameRetry = "retry"
CommandNameCancel = "cancel"
CommandNameContinue = "continue"
)

// DagInstanceStatus
Expand Down
21 changes: 21 additions & 0 deletions pkg/entity/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,23 @@ func TestDagInstance_Block(t *testing.T) {
})
}

func TestDagInstance_Continue(t *testing.T) {
dagIns := &DagInstance{
Status: DagInstanceStatusBlocked,
}
testHook(t, dagIns, "continue", DagInstanceStatusBlocked, func() {
err := dagIns.Continue([]string{"testId"})
assert.NoError(t, err)
})

incompleteDagIns := &DagInstance{
Status: DagInstanceStatusBlocked,
Cmd: &Command{},
}
err := incompleteDagIns.Continue([]string{"testId"})
assert.Equal(t, fmt.Errorf("dag instance have a incomplete command"), err)
}

func testHook(t *testing.T, dagIns *DagInstance, wantRet string, wantStatus DagInstanceStatus, call func()) {
ret := ""
HookDagInstance = DagInstanceLifecycleHook{
Expand All @@ -128,6 +145,10 @@ func testHook(t *testing.T, dagIns *DagInstance, wantRet string, wantStatus DagI
assert.NotNil(t, dagIns)
ret = string(DagInstanceStatusBlocked)
},
BeforeContinue: func(dagIns *DagInstance) {
assert.NotNil(t, dagIns)
ret = "continue"
},
BeforeRetry: func(dagIns *DagInstance) {
assert.NotNil(t, dagIns)
ret = "retry"
Expand Down
7 changes: 4 additions & 3 deletions pkg/entity/run/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ type LoopDoOption struct {
}

// LoopDo help you to complete loop action,for example
// LoopDo(ctx, func(){
// log.Println("check status")
// })
//
// LoopDo(ctx, func(){
// log.Println("check status")
// })
func LoopDo(ctx ExecuteContext, do func() error, ops ...LoopDoOptionOp) error {
opt := &LoopDoOption{
interval: time.Second,
Expand Down
1 change: 1 addition & 0 deletions pkg/entity/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func NewDefExecuteContext(
}

// ExecuteContext is a context using by action
//
//go:generate mockery --name=ExecuteContext --output=. --inpackage --filename=run_mock.go
type ExecuteContext interface {
Context() context.Context
Expand Down
3 changes: 2 additions & 1 deletion pkg/entity/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (t *TaskInstance) Run(params interface{}, act run.Action) (err error) {

}()

if t.Status == TaskInstanceStatusInit {
if t.Status == TaskInstanceStatusInit || t.Status == TaskInstanceStatusContinue {
beforeAct, ok := act.(run.BeforeAction)
if ok {
if err := beforeAct.RunBefore(t.Context, params); err != nil {
Expand Down Expand Up @@ -335,5 +335,6 @@ const (
TaskInstanceStatusRetrying TaskInstanceStatus = "retrying"
TaskInstanceStatusSuccess TaskInstanceStatus = "success"
TaskInstanceStatusBlocked TaskInstanceStatus = "blocked"
TaskInstanceStatusContinue TaskInstanceStatus = "continue"
TaskInstanceStatusSkipped TaskInstanceStatus = "skipped"
)
33 changes: 33 additions & 0 deletions pkg/entity/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,39 @@ func TestTaskInstance_Run(t *testing.T) {
wantRunAfterCalled: true,
wantRunBeforeCalled: true,
},
{
caseDesc: "continue",
giveTask: &TaskInstance{
BaseInfo: BaseInfo{
ID: "continue-task",
},
Status: TaskInstanceStatusContinue,
},
giveParams: "",
wantSaveTasks: []TaskInstance{
{
BaseInfo: BaseInfo{
ID: "continue-task",
},
Status: TaskInstanceStatusRunning,
},
{
BaseInfo: BaseInfo{
ID: "continue-task",
},
Status: TaskInstanceStatusEnding,
},
{
BaseInfo: BaseInfo{
ID: "continue-task",
},
Status: TaskInstanceStatusSuccess,
},
},
wantRunCalled: true,
wantRunAfterCalled: true,
wantRunBeforeCalled: true,
},
{
caseDesc: "panic",
giveTask: &TaskInstance{
Expand Down
4 changes: 3 additions & 1 deletion pkg/exporter/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ func (c *LeaderCollector) Collect(ch chan<- prometheus.Metric) {

// HttpHandler used to handle metrics request
// you can use it like that
// http.Handle("/metrics", exporter.HttpHandler)
//
// http.Handle("/metrics", exporter.HttpHandler)
//
// because it depend on Keeper, so you should call this function after keeper start
func HttpHandler() http.Handler {
execCollector := &ExecutorCollector{}
Expand Down
74 changes: 56 additions & 18 deletions pkg/mod/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
"github.com/shiningrush/fastflow/pkg/entity"
)

var _ Commander = (*DefCommander)(nil)

// DefCommander used to execute command
type DefCommander struct {
}
Expand All @@ -34,24 +36,11 @@

// RetryDagIns
func (c *DefCommander) RetryDagIns(dagInsId string, ops ...CommandOptSetter) error {
taskIns, err := GetStore().ListTaskInstance(&ListTaskInstanceInput{
DagInsID: dagInsId,
Status: []entity.TaskInstanceStatus{entity.TaskInstanceStatusFailed, entity.TaskInstanceStatusCanceled},
})
if err != nil {
return err
}

if len(taskIns) == 0 {
return fmt.Errorf("no failed and canceled task instance")
}

var taskIds []string
for _, t := range taskIns {
taskIds = append(taskIds, t.ID)
}

return c.RetryTask(taskIds, ops...)
return c.autoLoopDagTasks(
dagInsId,
[]entity.TaskInstanceStatus{entity.TaskInstanceStatusFailed, entity.TaskInstanceStatusCanceled},
c.RetryTask,
ops...)
}

// RetryTask
Expand Down Expand Up @@ -80,6 +69,55 @@
}, opt)
}

// ContinueDagIns using to continue a blocked dag instance
func (c *DefCommander) ContinueDagIns(dagInsId string, ops ...CommandOptSetter) error {
return c.autoLoopDagTasks(
dagInsId,
[]entity.TaskInstanceStatus{entity.TaskInstanceStatusBlocked},
c.ContinueTask,
ops...)
}

// ContinueTask using to continue many blocked task instances
func (c *DefCommander) ContinueTask(taskInsIds []string, ops ...CommandOptSetter) error {
opt := initOption(ops)
return executeCommand(taskInsIds, func(dagIns *entity.DagInstance, isWorkerAlive bool) error {
if !isWorkerAlive {
aliveNodes, err := GetKeeper().AliveNodes()
if err != nil {
return err
}
dagIns.Worker = aliveNodes[rand.Intn(len(aliveNodes))]

Check warning on line 90 in pkg/mod/commander.go

View check run for this annotation

Codecov / codecov/patch

pkg/mod/commander.go#L86-L90

Added lines #L86 - L90 were not covered by tests
}
return dagIns.Continue(taskInsIds)
}, opt)
}

func (c *DefCommander) autoLoopDagTasks(
dagInsId string,
status []entity.TaskInstanceStatus,
cmdOp func(taskInsIds []string, ops ...CommandOptSetter) error,
ops ...CommandOptSetter) error {
taskIns, err := GetStore().ListTaskInstance(&ListTaskInstanceInput{
DagInsID: dagInsId,
Status: status,
})
if err != nil {
return err
}

if len(taskIns) == 0 {
return fmt.Errorf("no %+v task instance", status)
}

var taskIds []string
for _, t := range taskIns {
taskIds = append(taskIds, t.ID)
}

return cmdOp(taskIds, ops...)
}

func initOption(opSetter []CommandOptSetter) (opt CommandOption) {
opt.syncTimeout = 5 * time.Second
opt.syncInterval = 500 * time.Millisecond
Expand Down
Loading
Loading