diff --git a/internal/engine/engine.go b/internal/engine/engine.go index d9fcfde8..432a17cf 100644 --- a/internal/engine/engine.go +++ b/internal/engine/engine.go @@ -247,7 +247,6 @@ func (e *engineImpl) DeleteDAG(dag *dag.DAG) error { return os.Remove(dag.Location) } -// ReadAllStatus reads all DAGStatus func (e *engineImpl) ReadAllStatus(DAGsDir string) (statuses []*persistence.DAGStatus, errs []string, err error) { statuses = []*persistence.DAGStatus{} errs = []string{} @@ -273,24 +272,24 @@ func (e *engineImpl) ReadAllStatus(DAGsDir string) (statuses []*persistence.DAGS return statuses, errs, nil } +func (e *engineImpl) getDAG(name string, loadMetadataOnly bool) (*dag.DAG, error) { + ds := e.dataStoreFactory.NewDAGStore() + if loadMetadataOnly { + return ds.GetMetadata(name) + } else { + return ds.GetDetails(name) + } +} + // ReadStatus loads DAG from config file. func (e *engineImpl) ReadStatus(dagLocation string, loadMetadataOnly bool) (*persistence.DAGStatus, error) { - var ( - cl = dag.Loader{} - d *dag.DAG - err error - ) + d, err := e.getDAG(dagLocation, loadMetadataOnly) - if loadMetadataOnly { - d, err = cl.LoadMetadataOnly(dagLocation) - } else { - d, err = cl.LoadWithoutEval(dagLocation) + if err != nil && d != nil { + return e.newDAGStatus(d, defaultStatus(d), err), err } if err != nil { - if d != nil { - return e.newDAGStatus(d, defaultStatus(d), err), err - } d := &dag.DAG{Location: dagLocation} return e.newDAGStatus(d, defaultStatus(d), err), err } diff --git a/internal/persistence/interface.go b/internal/persistence/interface.go index a9b656c5..4cf43014 100644 --- a/internal/persistence/interface.go +++ b/internal/persistence/interface.go @@ -35,7 +35,9 @@ type ( DAGStore interface { Create(name string, tmpl []byte) (string, error) - List() ([]dag.DAG, error) + List() (ret []*dag.DAG, errs []string, err error) + GetMetadata(name string) (*dag.DAG, error) + GetDetails(name string) (*dag.DAG, error) Grep(pattern string) (ret []*GrepResult, errs []string, err error) Load(name string) (*dag.DAG, error) Rename(oldName, newName string) error diff --git a/internal/persistence/local/local.go b/internal/persistence/local/dag_store.go similarity index 66% rename from internal/persistence/local/local.go rename to internal/persistence/local/dag_store.go index 4f3e9887..d4fc7c0f 100644 --- a/internal/persistence/local/local.go +++ b/internal/persistence/local/dag_store.go @@ -22,6 +22,32 @@ func NewDAGStore(dir string) persistence.DAGStore { } } +func (d *dagStoreImpl) GetMetadata(name string) (*dag.DAG, error) { + loc, err := d.fileLocation(name) + if err != nil { + return nil, fmt.Errorf("invalid name: %s", name) + } + cl := dag.Loader{} + dat, err := cl.LoadMetadataOnly(loc) + if err != nil { + return nil, err + } + return dat, nil +} + +func (d *dagStoreImpl) GetDetails(name string) (*dag.DAG, error) { + loc, err := d.fileLocation(name) + if err != nil { + return nil, fmt.Errorf("invalid name: %s", name) + } + cl := dag.Loader{} + dat, err := cl.LoadWithoutEval(loc) + if err != nil { + return nil, err + } + return dat, nil +} + func (d *dagStoreImpl) Create(name string, tmpl []byte) (string, error) { if err := d.ensureDirExist(); err != nil { return "", fmt.Errorf("failed to create DAGs directory %s", d.dir) @@ -42,17 +68,17 @@ func (d *dagStoreImpl) exists(file string) bool { } func (d *dagStoreImpl) fileLocation(name string) (string, error) { + if strings.Contains(name, "/") { + // this is for backward compatibility + return name, nil + } loc := path.Join(d.dir, name) return d.normalizeFilename(loc) } func (d *dagStoreImpl) normalizeFilename(file string) (string, error) { - f, err := filepath.Abs(file) - if err != nil { - return "", err - } - a := strings.TrimSuffix(f, ".yaml") - a = strings.TrimSuffix(f, ".yml") + a := strings.TrimSuffix(file, ".yaml") + a = strings.TrimSuffix(a, ".yml") return fmt.Sprintf("%s.yaml", a), nil } @@ -65,9 +91,39 @@ func (d *dagStoreImpl) ensureDirExist() error { return nil } -func (d *dagStoreImpl) List() ([]dag.DAG, error) { - //TODO implement me - panic("implement me") +func (d *dagStoreImpl) List() (ret []*dag.DAG, errs []string, err error) { + if err = d.ensureDirExist(); err != nil { + errs = append(errs, err.Error()) + return + } + fis, err := os.ReadDir(d.dir) + if err != nil { + errs = append(errs, err.Error()) + return + } + for _, fi := range fis { + if checkExtension(fi.Name()) { + dat, err := d.GetMetadata(fi.Name()) + if err == nil { + ret = append(ret, dat) + } else { + errs = append(errs, fmt.Sprintf("reading %s failed: %s", fi.Name(), err)) + } + } + } + return ret, errs, nil +} + +var extensions = []string{".yaml", ".yml"} + +func checkExtension(file string) bool { + ext := filepath.Ext(file) + for _, e := range extensions { + if e == ext { + return true + } + } + return false } func (d *dagStoreImpl) Grep(pattern string) (ret []*persistence.GrepResult, errs []string, err error) { diff --git a/service/frontend/handlers/dag.go b/service/frontend/handlers/dag.go index 5d04401d..1c749a16 100644 --- a/service/frontend/handlers/dag.go +++ b/service/frontend/handlers/dag.go @@ -142,7 +142,6 @@ func (h *DAGHandler) Delete(params operations.DeleteDagParams) *response.CodedEr func (h *DAGHandler) GetList(_ operations.ListDagsParams) (*models.ListDagsResponse, *response.CodedError) { cfg := config.Get() - // TODO: fix this to use dags store & history store dir := filepath.Join(cfg.DAGs) e := h.engineFactory.Create() dags, errs, err := e.ReadAllStatus(dir) @@ -433,11 +432,6 @@ func (h *DAGHandler) PostAction(params operations.PostDagActionParams) (*models. return &models.PostDagActionResponse{}, nil } -func nameWithExt(name string) string { - s := strings.TrimSuffix(name, ".yaml") - return fmt.Sprintf("%s.yaml", s) -} - func (h *DAGHandler) updateStatus(dag *dag.DAG, reqId, step string, to scheduler.NodeStatus) error { e := h.engineFactory.Create() status, err := e.GetStatusByRequestId(dag, reqId)