From 68210edfdd5567aa86480c766ab8458b424e3bcb Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Sun, 10 Sep 2023 17:56:59 +0900 Subject: [PATCH] minor refactoring --- cmd/retry_test.go | 2 +- internal/engine/engine.go | 99 +++++++++++----------------- internal/engine/engine_test.go | 22 +++---- internal/persistence/interface.go | 17 +++++ internal/persistence/model/status.go | 21 ++++-- service/frontend/handlers/dag.go | 8 +-- 6 files changed, 87 insertions(+), 82 deletions(-) diff --git a/cmd/retry_test.go b/cmd/retry_test.go index f689334b..a298cc95 100644 --- a/cmd/retry_test.go +++ b/cmd/retry_test.go @@ -20,7 +20,7 @@ func TestRetryCommand(t *testing.T) { testRunCommand(t, startCmd(), cmdTest{args: []string{"start", `--params="foo"`, dagFile}}) // Find the request ID. - s, err := e.ReadStatus(dagFile, false) + s, err := e.ReadStatus(dagFile) require.NoError(t, err) require.Equal(t, s.Status.Status, scheduler.SchedulerStatus_Success) require.NotNil(t, s.Status) diff --git a/internal/engine/engine.go b/internal/engine/engine.go index 432a17cf..f69521be 100644 --- a/internal/engine/engine.go +++ b/internal/engine/engine.go @@ -33,8 +33,8 @@ type Engine interface { UpdateStatus(dag *dag.DAG, status *model.Status) error UpdateDAGSpec(d *dag.DAG, spec string) error DeleteDAG(dag *dag.DAG) error - ReadAllStatus(DAGsDir string) (statuses []*persistence.DAGStatus, errs []string, err error) - ReadStatus(dagLocation string, loadMetadataOnly bool) (*persistence.DAGStatus, error) + ReadStatusAll(DAGsDir string) (statuses []*persistence.DAGStatus, errs []string, err error) + ReadStatus(dagLocation string) (*persistence.DAGStatus, error) } type engineImpl struct { @@ -154,16 +154,12 @@ func (e *engineImpl) GetStatus(dag *dag.DAG) (*model.Status, error) { if errors.Is(err, sock.ErrTimeout) { return nil, err } else { - return defaultStatus(dag), nil + return model.NewStatusDefault(dag), nil } } return model.StatusFromJson(ret) } -func defaultStatus(d *dag.DAG) *model.Status { - return model.NewStatus(d, nil, scheduler.SchedulerStatus_None, int(model.PidNotRunning), nil, nil) -} - func (e *engineImpl) GetStatusByRequestId(dag *dag.DAG, requestId string) (*model.Status, error) { ret, err := e.dataStoreFactory.NewHistoryStore().FindByRequestId(dag.Location, requestId) if err != nil { @@ -177,33 +173,33 @@ func (e *engineImpl) GetStatusByRequestId(dag *dag.DAG, requestId string) (*mode return ret.Status, err } -func (e *engineImpl) GetLastStatus(dag *dag.DAG) (*model.Status, error) { +func (e *engineImpl) getCurrentStatus(dag *dag.DAG) (*model.Status, error) { client := sock.Client{Addr: dag.SockAddr()} ret, err := client.Request("GET", "/status") - if err == nil { - return model.StatusFromJson(ret) + if err != nil { + return nil, fmt.Errorf("failed to get status: %s", err) } + return model.StatusFromJson(ret) +} - if err == nil || !errors.Is(err, sock.ErrTimeout) { - status, err := e.dataStoreFactory.NewHistoryStore().ReadStatusToday(dag.Location) - if err != nil { - var readErr error = nil - if !errors.Is(err, persistence.ErrNoStatusDataToday) && !errors.Is(err, persistence.ErrNoStatusData) { - fmt.Printf("read status failed : %s", err) - readErr = err - } - return defaultStatus(dag), readErr - } - // it is wrong status if the status is running - status.CorrectRunningStatus() - return status, nil +func (e *engineImpl) GetLastStatus(dag *dag.DAG) (*model.Status, error) { + currStatus, _ := e.getCurrentStatus(dag) + if currStatus != nil { + return currStatus, nil } - return nil, err + status, err := e.dataStoreFactory.NewHistoryStore().ReadStatusToday(dag.Location) + if errors.Is(err, persistence.ErrNoStatusDataToday) || errors.Is(err, persistence.ErrNoStatusData) { + return model.NewStatusDefault(dag), nil + } + if err != nil { + return model.NewStatusDefault(dag), err + } + status.CorrectRunningStatus() + return status, nil } func (e *engineImpl) GetRecentStatuses(dag *dag.DAG, n int) []*model.StatusFile { - ret := e.dataStoreFactory.NewHistoryStore().ReadStatusHist(dag.Location, n) - return ret + return e.dataStoreFactory.NewHistoryStore().ReadStatusHist(dag.Location, n) } func (e *engineImpl) UpdateStatus(dag *dag.DAG, status *model.Status) error { @@ -247,7 +243,7 @@ func (e *engineImpl) DeleteDAG(dag *dag.DAG) error { return os.Remove(dag.Location) } -func (e *engineImpl) ReadAllStatus(DAGsDir string) (statuses []*persistence.DAGStatus, errs []string, err error) { +func (e *engineImpl) ReadStatusAll(DAGsDir string) (statuses []*persistence.DAGStatus, errs []string, err error) { statuses = []*persistence.DAGStatus{} errs = []string{} if !utils.FileExists(DAGsDir) { @@ -260,7 +256,7 @@ func (e *engineImpl) ReadAllStatus(DAGsDir string) (statuses []*persistence.DAGS utils.LogErr("read DAGs directory", err) for _, fi := range fis { if utils.MatchExtension(fi.Name(), dag.EXTENSIONS) { - d, err := e.ReadStatus(filepath.Join(DAGsDir, fi.Name()), true) + d, err := e.readStatus(filepath.Join(DAGsDir, fi.Name()), true) utils.LogErr("read DAG config", err) if d != nil { statuses = append(statuses, d) @@ -272,51 +268,34 @@ func (e *engineImpl) ReadAllStatus(DAGsDir string) (statuses []*persistence.DAGS return statuses, errs, nil } -func (e *engineImpl) getDAG(name string, loadMetadataOnly bool) (*dag.DAG, error) { +func (e *engineImpl) getDAG(name string, metadataOnly bool) (*dag.DAG, error) { ds := e.dataStoreFactory.NewDAGStore() - if loadMetadataOnly { + if metadataOnly { return ds.GetMetadata(name) } else { return ds.GetDetails(name) } } -// ReadStatus loads DAG from config file. -func (e *engineImpl) ReadStatus(dagLocation string, loadMetadataOnly bool) (*persistence.DAGStatus, error) { - d, err := e.getDAG(dagLocation, loadMetadataOnly) +func (e *engineImpl) ReadStatus(dagLocation string) (*persistence.DAGStatus, error) { + return e.readStatus(dagLocation, false) +} - if err != nil && d != nil { - return e.newDAGStatus(d, defaultStatus(d), err), err +func (e *engineImpl) readStatus(dagLocation string, metadataOnly bool) (*persistence.DAGStatus, error) { + d, err := e.getDAG(dagLocation, metadataOnly) + if d == nil { + d = &dag.DAG{Location: dagLocation} } - if err != nil { - d := &dag.DAG{Location: dagLocation} - return e.newDAGStatus(d, defaultStatus(d), err), err + return persistence.NewDAGStatus(d, model.NewStatusDefault(d), e.isSuspended(d), err), err } - - if !loadMetadataOnly { - if _, err := scheduler.NewExecutionGraph(d.Steps...); err != nil { - return e.newDAGStatus(d, nil, err), err - } + if !metadataOnly { + _, err = scheduler.NewExecutionGraph(d.Steps...) } - status, err := e.GetLastStatus(d) - - return e.newDAGStatus(d, status, err), err + return persistence.NewDAGStatus(d, status, e.isSuspended(d), err), err } -func (e *engineImpl) newDAGStatus(d *dag.DAG, s *model.Status, err error) *persistence.DAGStatus { - ret := &persistence.DAGStatus{ - File: filepath.Base(d.Location), - Dir: filepath.Dir(d.Location), - DAG: d, - Status: s, - Suspended: e.suspendChecker.IsSuspended(d), - Error: err, - } - if err != nil { - errT := err.Error() - ret.ErrorT = &errT - } - return ret +func (e *engineImpl) isSuspended(d *dag.DAG) bool { + return e.suspendChecker.IsSuspended(d) } diff --git a/internal/engine/engine_test.go b/internal/engine/engine_test.go index eead4d45..8d50ea62 100644 --- a/internal/engine/engine_test.go +++ b/internal/engine/engine_test.go @@ -52,7 +52,7 @@ func TestGetStatusRunningAndDone(t *testing.T) { }() file := testDAG("get_status.yaml") - ds, err := e.ReadStatus(file, false) + ds, err := e.ReadStatus(file) require.NoError(t, err) socketServer, _ := sock.NewServer( @@ -97,7 +97,7 @@ func TestUpdateStatus(t *testing.T) { now = time.Now() ) - d, err := e.ReadStatus(file, false) + d, err := e.ReadStatus(file) require.NoError(t, err) hs := hf.NewHistoryStore() @@ -142,7 +142,7 @@ func TestUpdateStatusError(t *testing.T) { requestId = "test-update-status-failure" ) - d, err := e.ReadStatus(file, false) + d, err := e.ReadStatus(file) require.NoError(t, err) status := testNewStatus(d.DAG, requestId, @@ -164,7 +164,7 @@ func TestStart(t *testing.T) { }() file := testDAG("start.yaml") - d, err := e.ReadStatus(file, false) + d, err := e.ReadStatus(file) require.NoError(t, err) err = e.Start(d.DAG, "") @@ -183,7 +183,7 @@ func TestStop(t *testing.T) { file := testDAG("stop.yaml") - d, err := e.ReadStatus(file, false) + d, err := e.ReadStatus(file) require.NoError(t, err) e.StartAsync(d.DAG, "") @@ -209,7 +209,7 @@ func TestRestart(t *testing.T) { file := testDAG("restart.yaml") - d, err := e.ReadStatus(file, false) + d, err := e.ReadStatus(file) require.NoError(t, err) err = e.Restart(d.DAG) @@ -228,7 +228,7 @@ func TestRetry(t *testing.T) { file := testDAG("retry.yaml") - d, err := e.ReadStatus(file, false) + d, err := e.ReadStatus(file) require.NoError(t, err) err = e.Start(d.DAG, "x y z") @@ -377,7 +377,7 @@ func TestRenameDAG(t *testing.T) { // TODO: fixme to use mock loc := path.Join(tmpDir, ".dagu", "dags", id+".yaml") - _, err = e.ReadStatus(loc, false) + _, err = e.ReadStatus(loc) require.NoError(t, err) // TODO: fixme @@ -396,7 +396,7 @@ func TestLoadConfig(t *testing.T) { file := testDAG("invalid_dag.yaml") - d, err := e.ReadStatus(file, false) + d, err := e.ReadStatus(file) require.Error(t, err) require.NotNil(t, d) @@ -410,7 +410,7 @@ func TestReadAll(t *testing.T) { _ = os.RemoveAll(tmpDir) }() - dags, _, err := e.ReadAllStatus(testdataDir) + dags, _, err := e.ReadStatusAll(testdataDir) require.NoError(t, err) require.Greater(t, len(dags), 0) @@ -430,7 +430,7 @@ func TestReadDAGStatus(t *testing.T) { file := testDAG("read_status.yaml") - _, err := e.ReadStatus(file, false) + _, err := e.ReadStatus(file) require.NoError(t, err) } diff --git a/internal/persistence/interface.go b/internal/persistence/interface.go index 4cf43014..86b5fafe 100644 --- a/internal/persistence/interface.go +++ b/internal/persistence/interface.go @@ -5,6 +5,7 @@ import ( "github.com/dagu-dev/dagu/internal/dag" "github.com/dagu-dev/dagu/internal/grep" "github.com/dagu-dev/dagu/internal/persistence/model" + "path/filepath" "time" ) @@ -59,3 +60,19 @@ type ( ErrorT *string } ) + +func NewDAGStatus(d *dag.DAG, s *model.Status, suspended bool, err error) *DAGStatus { + ret := &DAGStatus{ + File: filepath.Base(d.Location), + Dir: filepath.Dir(d.Location), + DAG: d, + Status: s, + Suspended: suspended, + Error: err, + } + if err != nil { + errT := err.Error() + ret.ErrorT = &errT + } + return ret +} diff --git a/internal/persistence/model/status.go b/internal/persistence/model/status.go index 6021a1e8..1c616f2a 100644 --- a/internal/persistence/model/status.go +++ b/internal/persistence/model/status.go @@ -61,14 +61,23 @@ func StatusFromJson(s string) (*Status, error) { return status, err } -func NewStatus(d *dag.DAG, nodes []*scheduler.Node, status scheduler.SchedulerStatus, - pid int, s, e *time.Time) *Status { +func NewStatusDefault(d *dag.DAG) *Status { + return NewStatus(d, nil, scheduler.SchedulerStatus_None, int(PidNotRunning), nil, nil) +} + +func NewStatus( + d *dag.DAG, + nodes []*scheduler.Node, + status scheduler.SchedulerStatus, + pid int, + startTime, endTIme *time.Time, +) *Status { finish, start := time.Time{}, time.Time{} - if s != nil { - start = *s + if startTime != nil { + start = *startTime } - if e != nil { - finish = *e + if endTIme != nil { + finish = *endTIme } var models []*Node if len(nodes) != 0 { diff --git a/service/frontend/handlers/dag.go b/service/frontend/handlers/dag.go index 1c749a16..39d53cc2 100644 --- a/service/frontend/handlers/dag.go +++ b/service/frontend/handlers/dag.go @@ -128,7 +128,7 @@ func (h *DAGHandler) Delete(params operations.DeleteDagParams) *response.CodedEr filename := filepath.Join(cfg.DAGs, fmt.Sprintf("%s.yaml", params.DagID)) e := h.engineFactory.Create() - dagStatus, err := e.ReadStatus(filename, false) + dagStatus, err := e.ReadStatus(filename) if err != nil { return response.NewNotFoundError(err) } @@ -144,7 +144,7 @@ func (h *DAGHandler) GetList(_ operations.ListDagsParams) (*models.ListDagsRespo dir := filepath.Join(cfg.DAGs) e := h.engineFactory.Create() - dags, errs, err := e.ReadAllStatus(dir) + dags, errs, err := e.ReadStatusAll(dir) if err != nil { return nil, response.NewInternalError(err) } @@ -179,7 +179,7 @@ func (h *DAGHandler) GetDetail(params operations.GetDagDetailsParams) (*models.G file := filepath.Join(cfg.DAGs, fmt.Sprintf("%s.yaml", dagID)) e := h.engineFactory.Create() - dagStatus, err := e.ReadStatus(file, false) + dagStatus, err := e.ReadStatus(file) if dagStatus == nil { return nil, response.NewNotFoundError(err) } @@ -338,7 +338,7 @@ func (h *DAGHandler) PostAction(params operations.PostDagActionParams) (*models. file := filepath.Join(cfg.DAGs, fmt.Sprintf("%s.yaml", params.DagID)) e := h.engineFactory.Create() - d, err := e.ReadStatus(file, false) + d, err := e.ReadStatus(file) if err != nil && params.Body.Action != "save" { return nil, response.NewBadRequestError(err)