Skip to content

Commit

Permalink
minor refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
yohamta committed Sep 10, 2023
1 parent e72eb35 commit 68210ed
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 82 deletions.
2 changes: 1 addition & 1 deletion cmd/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestRetryCommand(t *testing.T) {
testRunCommand(t, startCmd(), cmdTest{args: []string{"start", `--params="foo"`, dagFile}})

// Find the request ID.
s, err := e.ReadStatus(dagFile, false)
s, err := e.ReadStatus(dagFile)
require.NoError(t, err)
require.Equal(t, s.Status.Status, scheduler.SchedulerStatus_Success)
require.NotNil(t, s.Status)
Expand Down
99 changes: 39 additions & 60 deletions internal/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type Engine interface {
UpdateStatus(dag *dag.DAG, status *model.Status) error
UpdateDAGSpec(d *dag.DAG, spec string) error
DeleteDAG(dag *dag.DAG) error
ReadAllStatus(DAGsDir string) (statuses []*persistence.DAGStatus, errs []string, err error)
ReadStatus(dagLocation string, loadMetadataOnly bool) (*persistence.DAGStatus, error)
ReadStatusAll(DAGsDir string) (statuses []*persistence.DAGStatus, errs []string, err error)
ReadStatus(dagLocation string) (*persistence.DAGStatus, error)
}

type engineImpl struct {
Expand Down Expand Up @@ -154,16 +154,12 @@ func (e *engineImpl) GetStatus(dag *dag.DAG) (*model.Status, error) {
if errors.Is(err, sock.ErrTimeout) {
return nil, err
} else {
return defaultStatus(dag), nil
return model.NewStatusDefault(dag), nil
}
}
return model.StatusFromJson(ret)
}

func defaultStatus(d *dag.DAG) *model.Status {
return model.NewStatus(d, nil, scheduler.SchedulerStatus_None, int(model.PidNotRunning), nil, nil)
}

func (e *engineImpl) GetStatusByRequestId(dag *dag.DAG, requestId string) (*model.Status, error) {
ret, err := e.dataStoreFactory.NewHistoryStore().FindByRequestId(dag.Location, requestId)
if err != nil {
Expand All @@ -177,33 +173,33 @@ func (e *engineImpl) GetStatusByRequestId(dag *dag.DAG, requestId string) (*mode
return ret.Status, err
}

func (e *engineImpl) GetLastStatus(dag *dag.DAG) (*model.Status, error) {
func (e *engineImpl) getCurrentStatus(dag *dag.DAG) (*model.Status, error) {
client := sock.Client{Addr: dag.SockAddr()}
ret, err := client.Request("GET", "/status")
if err == nil {
return model.StatusFromJson(ret)
if err != nil {
return nil, fmt.Errorf("failed to get status: %s", err)
}
return model.StatusFromJson(ret)
}

if err == nil || !errors.Is(err, sock.ErrTimeout) {
status, err := e.dataStoreFactory.NewHistoryStore().ReadStatusToday(dag.Location)
if err != nil {
var readErr error = nil
if !errors.Is(err, persistence.ErrNoStatusDataToday) && !errors.Is(err, persistence.ErrNoStatusData) {
fmt.Printf("read status failed : %s", err)
readErr = err
}
return defaultStatus(dag), readErr
}
// it is wrong status if the status is running
status.CorrectRunningStatus()
return status, nil
func (e *engineImpl) GetLastStatus(dag *dag.DAG) (*model.Status, error) {
currStatus, _ := e.getCurrentStatus(dag)
if currStatus != nil {
return currStatus, nil
}
return nil, err
status, err := e.dataStoreFactory.NewHistoryStore().ReadStatusToday(dag.Location)
if errors.Is(err, persistence.ErrNoStatusDataToday) || errors.Is(err, persistence.ErrNoStatusData) {
return model.NewStatusDefault(dag), nil
}
if err != nil {
return model.NewStatusDefault(dag), err
}
status.CorrectRunningStatus()
return status, nil
}

func (e *engineImpl) GetRecentStatuses(dag *dag.DAG, n int) []*model.StatusFile {
ret := e.dataStoreFactory.NewHistoryStore().ReadStatusHist(dag.Location, n)
return ret
return e.dataStoreFactory.NewHistoryStore().ReadStatusHist(dag.Location, n)
}

func (e *engineImpl) UpdateStatus(dag *dag.DAG, status *model.Status) error {
Expand Down Expand Up @@ -247,7 +243,7 @@ func (e *engineImpl) DeleteDAG(dag *dag.DAG) error {
return os.Remove(dag.Location)
}

func (e *engineImpl) ReadAllStatus(DAGsDir string) (statuses []*persistence.DAGStatus, errs []string, err error) {
func (e *engineImpl) ReadStatusAll(DAGsDir string) (statuses []*persistence.DAGStatus, errs []string, err error) {
statuses = []*persistence.DAGStatus{}
errs = []string{}
if !utils.FileExists(DAGsDir) {
Expand All @@ -260,7 +256,7 @@ func (e *engineImpl) ReadAllStatus(DAGsDir string) (statuses []*persistence.DAGS
utils.LogErr("read DAGs directory", err)
for _, fi := range fis {
if utils.MatchExtension(fi.Name(), dag.EXTENSIONS) {
d, err := e.ReadStatus(filepath.Join(DAGsDir, fi.Name()), true)
d, err := e.readStatus(filepath.Join(DAGsDir, fi.Name()), true)
utils.LogErr("read DAG config", err)
if d != nil {
statuses = append(statuses, d)
Expand All @@ -272,51 +268,34 @@ func (e *engineImpl) ReadAllStatus(DAGsDir string) (statuses []*persistence.DAGS
return statuses, errs, nil
}

func (e *engineImpl) getDAG(name string, loadMetadataOnly bool) (*dag.DAG, error) {
func (e *engineImpl) getDAG(name string, metadataOnly bool) (*dag.DAG, error) {
ds := e.dataStoreFactory.NewDAGStore()
if loadMetadataOnly {
if metadataOnly {
return ds.GetMetadata(name)
} else {
return ds.GetDetails(name)
}
}

// ReadStatus loads DAG from config file.
func (e *engineImpl) ReadStatus(dagLocation string, loadMetadataOnly bool) (*persistence.DAGStatus, error) {
d, err := e.getDAG(dagLocation, loadMetadataOnly)
func (e *engineImpl) ReadStatus(dagLocation string) (*persistence.DAGStatus, error) {
return e.readStatus(dagLocation, false)
}

if err != nil && d != nil {
return e.newDAGStatus(d, defaultStatus(d), err), err
func (e *engineImpl) readStatus(dagLocation string, metadataOnly bool) (*persistence.DAGStatus, error) {
d, err := e.getDAG(dagLocation, metadataOnly)
if d == nil {
d = &dag.DAG{Location: dagLocation}
}

if err != nil {
d := &dag.DAG{Location: dagLocation}
return e.newDAGStatus(d, defaultStatus(d), err), err
return persistence.NewDAGStatus(d, model.NewStatusDefault(d), e.isSuspended(d), err), err
}

if !loadMetadataOnly {
if _, err := scheduler.NewExecutionGraph(d.Steps...); err != nil {
return e.newDAGStatus(d, nil, err), err
}
if !metadataOnly {
_, err = scheduler.NewExecutionGraph(d.Steps...)
}

status, err := e.GetLastStatus(d)

return e.newDAGStatus(d, status, err), err
return persistence.NewDAGStatus(d, status, e.isSuspended(d), err), err
}

func (e *engineImpl) newDAGStatus(d *dag.DAG, s *model.Status, err error) *persistence.DAGStatus {
ret := &persistence.DAGStatus{
File: filepath.Base(d.Location),
Dir: filepath.Dir(d.Location),
DAG: d,
Status: s,
Suspended: e.suspendChecker.IsSuspended(d),
Error: err,
}
if err != nil {
errT := err.Error()
ret.ErrorT = &errT
}
return ret
func (e *engineImpl) isSuspended(d *dag.DAG) bool {
return e.suspendChecker.IsSuspended(d)
}
22 changes: 11 additions & 11 deletions internal/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestGetStatusRunningAndDone(t *testing.T) {
}()
file := testDAG("get_status.yaml")

ds, err := e.ReadStatus(file, false)
ds, err := e.ReadStatus(file)
require.NoError(t, err)

socketServer, _ := sock.NewServer(
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestUpdateStatus(t *testing.T) {
now = time.Now()
)

d, err := e.ReadStatus(file, false)
d, err := e.ReadStatus(file)
require.NoError(t, err)

hs := hf.NewHistoryStore()
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestUpdateStatusError(t *testing.T) {
requestId = "test-update-status-failure"
)

d, err := e.ReadStatus(file, false)
d, err := e.ReadStatus(file)
require.NoError(t, err)

status := testNewStatus(d.DAG, requestId,
Expand All @@ -164,7 +164,7 @@ func TestStart(t *testing.T) {
}()
file := testDAG("start.yaml")

d, err := e.ReadStatus(file, false)
d, err := e.ReadStatus(file)
require.NoError(t, err)

err = e.Start(d.DAG, "")
Expand All @@ -183,7 +183,7 @@ func TestStop(t *testing.T) {

file := testDAG("stop.yaml")

d, err := e.ReadStatus(file, false)
d, err := e.ReadStatus(file)
require.NoError(t, err)

e.StartAsync(d.DAG, "")
Expand All @@ -209,7 +209,7 @@ func TestRestart(t *testing.T) {

file := testDAG("restart.yaml")

d, err := e.ReadStatus(file, false)
d, err := e.ReadStatus(file)
require.NoError(t, err)

err = e.Restart(d.DAG)
Expand All @@ -228,7 +228,7 @@ func TestRetry(t *testing.T) {

file := testDAG("retry.yaml")

d, err := e.ReadStatus(file, false)
d, err := e.ReadStatus(file)
require.NoError(t, err)

err = e.Start(d.DAG, "x y z")
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestRenameDAG(t *testing.T) {
// TODO: fixme to use mock
loc := path.Join(tmpDir, ".dagu", "dags", id+".yaml")

_, err = e.ReadStatus(loc, false)
_, err = e.ReadStatus(loc)
require.NoError(t, err)

// TODO: fixme
Expand All @@ -396,7 +396,7 @@ func TestLoadConfig(t *testing.T) {

file := testDAG("invalid_dag.yaml")

d, err := e.ReadStatus(file, false)
d, err := e.ReadStatus(file)
require.Error(t, err)
require.NotNil(t, d)

Expand All @@ -410,7 +410,7 @@ func TestReadAll(t *testing.T) {
_ = os.RemoveAll(tmpDir)
}()

dags, _, err := e.ReadAllStatus(testdataDir)
dags, _, err := e.ReadStatusAll(testdataDir)
require.NoError(t, err)
require.Greater(t, len(dags), 0)

Expand All @@ -430,7 +430,7 @@ func TestReadDAGStatus(t *testing.T) {

file := testDAG("read_status.yaml")

_, err := e.ReadStatus(file, false)
_, err := e.ReadStatus(file)
require.NoError(t, err)
}

Expand Down
17 changes: 17 additions & 0 deletions internal/persistence/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/dagu-dev/dagu/internal/dag"
"github.com/dagu-dev/dagu/internal/grep"
"github.com/dagu-dev/dagu/internal/persistence/model"
"path/filepath"
"time"
)

Expand Down Expand Up @@ -59,3 +60,19 @@ type (
ErrorT *string
}
)

func NewDAGStatus(d *dag.DAG, s *model.Status, suspended bool, err error) *DAGStatus {
ret := &DAGStatus{
File: filepath.Base(d.Location),
Dir: filepath.Dir(d.Location),
DAG: d,
Status: s,
Suspended: suspended,
Error: err,
}
if err != nil {
errT := err.Error()
ret.ErrorT = &errT
}
return ret
}
21 changes: 15 additions & 6 deletions internal/persistence/model/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,23 @@ func StatusFromJson(s string) (*Status, error) {
return status, err
}

func NewStatus(d *dag.DAG, nodes []*scheduler.Node, status scheduler.SchedulerStatus,
pid int, s, e *time.Time) *Status {
func NewStatusDefault(d *dag.DAG) *Status {
return NewStatus(d, nil, scheduler.SchedulerStatus_None, int(PidNotRunning), nil, nil)
}

func NewStatus(
d *dag.DAG,
nodes []*scheduler.Node,
status scheduler.SchedulerStatus,
pid int,
startTime, endTIme *time.Time,
) *Status {
finish, start := time.Time{}, time.Time{}
if s != nil {
start = *s
if startTime != nil {
start = *startTime
}
if e != nil {
finish = *e
if endTIme != nil {
finish = *endTIme
}
var models []*Node
if len(nodes) != 0 {
Expand Down
8 changes: 4 additions & 4 deletions service/frontend/handlers/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (h *DAGHandler) Delete(params operations.DeleteDagParams) *response.CodedEr

filename := filepath.Join(cfg.DAGs, fmt.Sprintf("%s.yaml", params.DagID))
e := h.engineFactory.Create()
dagStatus, err := e.ReadStatus(filename, false)
dagStatus, err := e.ReadStatus(filename)
if err != nil {
return response.NewNotFoundError(err)
}
Expand All @@ -144,7 +144,7 @@ func (h *DAGHandler) GetList(_ operations.ListDagsParams) (*models.ListDagsRespo

dir := filepath.Join(cfg.DAGs)
e := h.engineFactory.Create()
dags, errs, err := e.ReadAllStatus(dir)
dags, errs, err := e.ReadStatusAll(dir)
if err != nil {
return nil, response.NewInternalError(err)
}
Expand Down Expand Up @@ -179,7 +179,7 @@ func (h *DAGHandler) GetDetail(params operations.GetDagDetailsParams) (*models.G

file := filepath.Join(cfg.DAGs, fmt.Sprintf("%s.yaml", dagID))
e := h.engineFactory.Create()
dagStatus, err := e.ReadStatus(file, false)
dagStatus, err := e.ReadStatus(file)
if dagStatus == nil {
return nil, response.NewNotFoundError(err)
}
Expand Down Expand Up @@ -338,7 +338,7 @@ func (h *DAGHandler) PostAction(params operations.PostDagActionParams) (*models.

file := filepath.Join(cfg.DAGs, fmt.Sprintf("%s.yaml", params.DagID))
e := h.engineFactory.Create()
d, err := e.ReadStatus(file, false)
d, err := e.ReadStatus(file)

if err != nil && params.Body.Action != "save" {
return nil, response.NewBadRequestError(err)
Expand Down

0 comments on commit 68210ed

Please sign in to comment.