Skip to content

Commit

Permalink
fix engine interfaces to be simpler
Browse files Browse the repository at this point in the history
  • Loading branch information
yohamta committed Sep 10, 2023
1 parent a17b3bf commit e9d9d72
Show file tree
Hide file tree
Showing 15 changed files with 65 additions and 72 deletions.
2 changes: 1 addition & 1 deletion cmd/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func setupTest(t *testing.T) (string, engine.Engine, persistence.DataStoreFactor
DataDir: path.Join(tmpDir, ".dagu", "data"),
})

e := engine.NewFactory(ds).Create()
e := engine.NewFactory(ds, nil).Create()

return tmpDir, e, ds
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/dry.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func dryCmd() *cobra.Command {
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
df := client.NewDataStoreFactory(config.Get())
e := engine.NewFactory(df).Create()
e := engine.NewFactory(df, config.Get()).Create()
execDAG(cmd.Context(), e, cmd, args, true)
},
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func restartCmd() *cobra.Command {
checkError(err)

df := client.NewDataStoreFactory(config.Get())
e := engine.NewFactory(df).Create()
e := engine.NewFactory(df, config.Get()).Create()

// Check the current status and stop the DAG if it is running.
stopDAGIfRunning(e, loadedDAG)
Expand Down
2 changes: 1 addition & 1 deletion cmd/restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestRestartCommand(t *testing.T) {
require.NoError(t, err)

df := client.NewDataStoreFactory(config.Get())
e = engine.NewFactory(df).Create()
e = engine.NewFactory(df, nil).Create()

sts := e.GetRecentStatuses(d, 2)
require.Len(t, sts, 2)
Expand Down
2 changes: 1 addition & 1 deletion cmd/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func retryCmd() *cobra.Command {

// TODO: use engine.Engine instead of client.DataStoreFactory
df := client.NewDataStoreFactory(config.Get())
e := engine.NewFactory(df).Create()
e := engine.NewFactory(df, nil).Create()

hs := df.NewHistoryStore()

Expand Down
2 changes: 1 addition & 1 deletion cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func startCmd() *cobra.Command {
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
ds := client.NewDataStoreFactory(config.Get())
e := engine.NewFactory(ds).Create()
e := engine.NewFactory(ds, config.Get()).Create()
execDAG(cmd.Context(), e, cmd, args, false)
},
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func createStatusCommand() *cobra.Command {
checkError(err)

df := client.NewDataStoreFactory(config.Get())
e := engine.NewFactory(df).Create()
e := engine.NewFactory(df, config.Get()).Create()

status, err := e.GetStatus(loadedDAG)
checkError(err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func stopCmd() *cobra.Command {
log.Printf("Stopping...")

df := client.NewDataStoreFactory(config.Get())
e := engine.NewFactory(df).Create()
e := engine.NewFactory(df, config.Get()).Create()
checkError(e.Stop(loadedDAG))
},
}
Expand Down
2 changes: 1 addition & 1 deletion internal/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func setupTest(t *testing.T) (string, engine.Engine, persistence.DataStoreFactor
DataDir: path.Join(tmpDir, ".dagu", "data"),
})

e := engine.NewFactory(ds).Create()
e := engine.NewFactory(ds, nil).Create()

return tmpDir, e, ds
}
Expand Down
6 changes: 0 additions & 6 deletions internal/dag/dag_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dag

import (
"fmt"
"os"
"path"
"testing"
Expand All @@ -14,15 +13,10 @@ import (
var (
testdataDir = path.Join(utils.MustGetwd(), "testdata")
testHomeDir = path.Join(utils.MustGetwd(), "testdata/home")
testEnv = []string{}
)

func TestMain(m *testing.M) {
changeHomeDir(testHomeDir)
testEnv = []string{
fmt.Sprintf("LOG_DIR=%s", path.Join(testHomeDir, "/logs")),
fmt.Sprintf("PATH=%s", os.Getenv("PATH")),
}
code := m.Run()
os.Exit(code)
}
Expand Down
57 changes: 18 additions & 39 deletions internal/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ package engine
import (
"errors"
"fmt"
"github.com/dagu-dev/dagu/internal/config"
"github.com/dagu-dev/dagu/internal/dag"
"github.com/dagu-dev/dagu/internal/persistence"
"github.com/dagu-dev/dagu/internal/persistence/model"
"github.com/dagu-dev/dagu/internal/scheduler"
"github.com/dagu-dev/dagu/internal/sock"
"github.com/dagu-dev/dagu/internal/storage"
"github.com/dagu-dev/dagu/internal/suspend"
"github.com/dagu-dev/dagu/internal/utils"
"os"
Expand All @@ -20,21 +18,14 @@ import (
)

type Engine interface {
// CreateDAG creates a new DAG and returns the ID of the DAG.
CreateDAG(name string) (string, error)
// Grep greps DAGs by the pattern.
Grep(pattern string) ([]*persistence.GrepResult, []string, error)
// Rename renames DAG.
Rename(oldDAGPath, newDAGPath string) error
Stop(dag *dag.DAG) error
// TODO: fix params
StartAsync(dag *dag.DAG, binPath string, workDir string, params string)
// TODO: fix params
Start(dag *dag.DAG, binPath string, workDir string, params string) error
// TODO: fix params
Restart(dag *dag.DAG, bin string, workDir string) error
// TODO: fix params
Retry(dag *dag.DAG, binPath string, workDir string, reqId string) error
StartAsync(dag *dag.DAG, params string)
Start(dag *dag.DAG, params string) error
Restart(dag *dag.DAG) error
Retry(dag *dag.DAG, reqId string) error
GetStatus(dag *dag.DAG) (*model.Status, error)
GetStatusByRequestId(dag *dag.DAG, requestId string) (*model.Status, error)
GetLastStatus(dag *dag.DAG) (*model.Status, error)
Expand All @@ -48,22 +39,12 @@ type Engine interface {

type engineImpl struct {
dataStoreFactory persistence.DataStoreFactory
// TODO: fix this to inject
suspendChecker *suspend.SuspendChecker
}

func New(ds persistence.DataStoreFactory) Engine {
return &engineImpl{
dataStoreFactory: ds,
// TODO: fix this to inject
suspendChecker: suspend.NewSuspendChecker(
storage.NewStorage(config.Get().SuspendFlagsDir),
),
}
suspendChecker *suspend.SuspendChecker
executable string
workDir string
}

// TODO: this should not be here.
// DAGStatus is the struct to contain DAGStatus spec and status.
type DAGStatus struct {
File string
Dir string
Expand Down Expand Up @@ -115,25 +96,23 @@ func (e *engineImpl) Stop(dag *dag.DAG) error {
return err
}

// TODO: fix params
func (e *engineImpl) StartAsync(dag *dag.DAG, binPath string, workDir string, params string) {
func (e *engineImpl) StartAsync(dag *dag.DAG, params string) {
go func() {
err := e.Start(dag, binPath, workDir, params)
err := e.Start(dag, params)
utils.LogErr("starting a DAG", err)
}()
}

// TODO: fix params
func (e *engineImpl) Start(dag *dag.DAG, binPath string, workDir string, params string) error {
func (e *engineImpl) Start(dag *dag.DAG, params string) error {
args := []string{"start"}
if params != "" {
args = append(args, "-p")
args = append(args, fmt.Sprintf(`"%s"`, utils.EscapeArg(params, false)))
}
args = append(args, dag.Location)
cmd := exec.Command(binPath, args...)
cmd := exec.Command(e.executable, args...)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 0}
cmd.Dir = workDir
cmd.Dir = e.workDir
cmd.Env = os.Environ()
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
Expand All @@ -146,11 +125,11 @@ func (e *engineImpl) Start(dag *dag.DAG, binPath string, workDir string, params
}

// TODO: fix params
func (e *engineImpl) Restart(dag *dag.DAG, bin string, workDir string) error {
func (e *engineImpl) Restart(dag *dag.DAG) error {
args := []string{"restart", dag.Location}
cmd := exec.Command(bin, args...)
cmd := exec.Command(e.executable, args...)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 0}
cmd.Dir = workDir
cmd.Dir = e.workDir
cmd.Env = os.Environ()
err := cmd.Start()
if err != nil {
Expand All @@ -160,14 +139,14 @@ func (e *engineImpl) Restart(dag *dag.DAG, bin string, workDir string) error {
}

// TODO: fix params
func (e *engineImpl) Retry(dag *dag.DAG, binPath string, workDir string, reqId string) (err error) {
func (e *engineImpl) Retry(dag *dag.DAG, reqId string) (err error) {
go func() {
args := []string{"retry"}
args = append(args, fmt.Sprintf("--req=%s", reqId))
args = append(args, dag.Location)
cmd := exec.Command(binPath, args...)
cmd := exec.Command(e.executable, args...)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 0}
cmd.Dir = workDir
cmd.Dir = e.workDir
cmd.Env = os.Environ()
defer func() {
_ = cmd.Wait()
Expand Down
19 changes: 9 additions & 10 deletions internal/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ func setupTest(t *testing.T) (string, engine.Engine, persistence.DataStoreFactor
DAGs: path.Join(tmpDir, ".dagu", "dags"),
})

e := engine.NewFactory(ds).Create()
e := engine.NewFactory(ds, &config.Config{
Command: path.Join(utils.MustGetwd(), "../../bin/dagu"),
}).Create()

return tmpDir, e, ds
}
Expand Down Expand Up @@ -165,7 +167,7 @@ func TestStart(t *testing.T) {
d, err := e.ReadStatus(file, false)
require.NoError(t, err)

err = e.Start(d.DAG, path.Join(utils.MustGetwd(), "../../bin/dagu"), "", "")
err = e.Start(d.DAG, "")
require.Error(t, err)

status, err := e.GetLastStatus(d.DAG)
Expand All @@ -184,7 +186,7 @@ func TestStop(t *testing.T) {
d, err := e.ReadStatus(file, false)
require.NoError(t, err)

e.StartAsync(d.DAG, path.Join(utils.MustGetwd(), "../../bin/dagu"), "", "")
e.StartAsync(d.DAG, "")

require.Eventually(t, func() bool {
st, _ := e.GetStatus(d.DAG)
Expand All @@ -210,7 +212,7 @@ func TestRestart(t *testing.T) {
d, err := e.ReadStatus(file, false)
require.NoError(t, err)

err = e.Restart(d.DAG, path.Join(utils.MustGetwd(), "../../bin/dagu"), "")
err = e.Restart(d.DAG)
require.NoError(t, err)

status, err := e.GetLastStatus(d.DAG)
Expand All @@ -229,7 +231,7 @@ func TestRetry(t *testing.T) {
d, err := e.ReadStatus(file, false)
require.NoError(t, err)

err = e.Start(d.DAG, path.Join(utils.MustGetwd(), "../../bin/dagu"), "", "x y z")
err = e.Start(d.DAG, "x y z")
require.NoError(t, err)

status, err := e.GetLastStatus(d.DAG)
Expand All @@ -239,7 +241,7 @@ func TestRetry(t *testing.T) {
requestId := status.RequestId
params := status.Params

err = e.Retry(d.DAG, path.Join(utils.MustGetwd(), "../../bin/dagu"), "", requestId)
err = e.Retry(d.DAG, requestId)
require.NoError(t, err)
status, err = e.GetLastStatus(d.DAG)
require.NoError(t, err)
Expand All @@ -262,10 +264,7 @@ func TestUpdate(t *testing.T) {
}()

loc := path.Join(tmpDir, "test.yaml")
d := &dag.DAG{
Name: "test",
Location: loc,
}
d := &dag.DAG{Name: "test", Location: loc}

// invalid DAG
invalidDAG := `name: test DAG`
Expand Down
29 changes: 25 additions & 4 deletions internal/engine/factory.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,42 @@
package engine

import "github.com/dagu-dev/dagu/internal/persistence"
import (
"github.com/dagu-dev/dagu/internal/config"
"github.com/dagu-dev/dagu/internal/persistence"
"github.com/dagu-dev/dagu/internal/storage"
"github.com/dagu-dev/dagu/internal/suspend"
)

type Factory interface {
Create() Engine
}

type factoryImpl struct {
dataStoreFactory persistence.DataStoreFactory
executable string
workDir string
suspendChecker *suspend.SuspendChecker
}

func NewFactory(ds persistence.DataStoreFactory) Factory {
return &factoryImpl{
func NewFactory(ds persistence.DataStoreFactory, cfg *config.Config) Factory {
impl := &factoryImpl{
dataStoreFactory: ds,
}
if cfg == nil {
cfg = config.Get()
}
impl.executable = cfg.Command
impl.suspendChecker = suspend.NewSuspendChecker(
storage.NewStorage(cfg.SuspendFlagsDir),
)
return impl
}

func (f *factoryImpl) Create() Engine {
return New(f.dataStoreFactory)
return &engineImpl{
dataStoreFactory: f.dataStoreFactory,
executable: f.executable,
workDir: f.workDir,
suspendChecker: f.suspendChecker,
}
}
4 changes: 2 additions & 2 deletions service/core/scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (j *Job) Start() error {
}
// should not be here
}
return e.Start(j.DAG, j.Command, j.WorkDir, "")
return e.Start(j.DAG, "")
}

func (j *Job) Stop() error {
Expand All @@ -68,7 +68,7 @@ func (j *Job) Stop() error {

func (j *Job) Restart() error {
e := j.EngineFactory.Create()
return e.Restart(j.DAG, j.Command, j.WorkDir)
return e.Restart(j.DAG)
}

func (j *Job) String() string {
Expand Down
4 changes: 2 additions & 2 deletions service/frontend/handlers/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (h *DAGHandler) PostAction(params operations.PostDagActionParams) (*models.
return nil, response.NewBadRequestError(errInvalidArgs)
}
e := h.engineFactory.Create()
e.StartAsync(d.DAG, cfg.Command, cfg.WorkDir, params.Body.Params)
e.StartAsync(d.DAG, params.Body.Params)

case "suspend":
sc := suspend.NewSuspendChecker(storage.NewStorage(config.Get().SuspendFlagsDir))
Expand All @@ -370,7 +370,7 @@ func (h *DAGHandler) PostAction(params operations.PostDagActionParams) (*models.
return nil, response.NewBadRequestError(fmt.Errorf("request-id is required: %w", errInvalidArgs))
}
e := h.engineFactory.Create()
err = e.Retry(d.DAG, cfg.Command, cfg.WorkDir, params.Body.RequestID)
err = e.Retry(d.DAG, params.Body.RequestID)
if err != nil {
return nil, response.NewInternalError(fmt.Errorf("error trying to retry the DAG: %w", err))
}
Expand Down

0 comments on commit e9d9d72

Please sign in to comment.