Skip to content

Commit

Permalink
implement DAG Store Get() method
Browse files Browse the repository at this point in the history
  • Loading branch information
yohamta committed Sep 10, 2023
1 parent a1735fb commit e72eb35
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 29 deletions.
25 changes: 12 additions & 13 deletions internal/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ func (e *engineImpl) DeleteDAG(dag *dag.DAG) error {
return os.Remove(dag.Location)
}

// ReadAllStatus reads all DAGStatus
func (e *engineImpl) ReadAllStatus(DAGsDir string) (statuses []*persistence.DAGStatus, errs []string, err error) {
statuses = []*persistence.DAGStatus{}
errs = []string{}
Expand All @@ -273,24 +272,24 @@ func (e *engineImpl) ReadAllStatus(DAGsDir string) (statuses []*persistence.DAGS
return statuses, errs, nil
}

func (e *engineImpl) getDAG(name string, loadMetadataOnly bool) (*dag.DAG, error) {
ds := e.dataStoreFactory.NewDAGStore()
if loadMetadataOnly {
return ds.GetMetadata(name)
} else {
return ds.GetDetails(name)
}
}

// ReadStatus loads DAG from config file.
func (e *engineImpl) ReadStatus(dagLocation string, loadMetadataOnly bool) (*persistence.DAGStatus, error) {
var (
cl = dag.Loader{}
d *dag.DAG
err error
)
d, err := e.getDAG(dagLocation, loadMetadataOnly)

if loadMetadataOnly {
d, err = cl.LoadMetadataOnly(dagLocation)
} else {
d, err = cl.LoadWithoutEval(dagLocation)
if err != nil && d != nil {
return e.newDAGStatus(d, defaultStatus(d), err), err
}

if err != nil {
if d != nil {
return e.newDAGStatus(d, defaultStatus(d), err), err
}
d := &dag.DAG{Location: dagLocation}
return e.newDAGStatus(d, defaultStatus(d), err), err
}
Expand Down
4 changes: 3 additions & 1 deletion internal/persistence/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ type (

DAGStore interface {
Create(name string, tmpl []byte) (string, error)
List() ([]dag.DAG, error)
List() (ret []*dag.DAG, errs []string, err error)
GetMetadata(name string) (*dag.DAG, error)
GetDetails(name string) (*dag.DAG, error)
Grep(pattern string) (ret []*GrepResult, errs []string, err error)
Load(name string) (*dag.DAG, error)
Rename(oldName, newName string) error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,32 @@ func NewDAGStore(dir string) persistence.DAGStore {
}
}

func (d *dagStoreImpl) GetMetadata(name string) (*dag.DAG, error) {
loc, err := d.fileLocation(name)
if err != nil {
return nil, fmt.Errorf("invalid name: %s", name)
}
cl := dag.Loader{}
dat, err := cl.LoadMetadataOnly(loc)
if err != nil {
return nil, err
}
return dat, nil
}

func (d *dagStoreImpl) GetDetails(name string) (*dag.DAG, error) {
loc, err := d.fileLocation(name)
if err != nil {
return nil, fmt.Errorf("invalid name: %s", name)
}
cl := dag.Loader{}
dat, err := cl.LoadWithoutEval(loc)
if err != nil {
return nil, err
}
return dat, nil
}

func (d *dagStoreImpl) Create(name string, tmpl []byte) (string, error) {
if err := d.ensureDirExist(); err != nil {
return "", fmt.Errorf("failed to create DAGs directory %s", d.dir)
Expand All @@ -42,17 +68,17 @@ func (d *dagStoreImpl) exists(file string) bool {
}

func (d *dagStoreImpl) fileLocation(name string) (string, error) {
if strings.Contains(name, "/") {
// this is for backward compatibility
return name, nil
}
loc := path.Join(d.dir, name)
return d.normalizeFilename(loc)
}

func (d *dagStoreImpl) normalizeFilename(file string) (string, error) {
f, err := filepath.Abs(file)
if err != nil {
return "", err
}
a := strings.TrimSuffix(f, ".yaml")
a = strings.TrimSuffix(f, ".yml")
a := strings.TrimSuffix(file, ".yaml")
a = strings.TrimSuffix(a, ".yml")
return fmt.Sprintf("%s.yaml", a), nil
}

Expand All @@ -65,9 +91,39 @@ func (d *dagStoreImpl) ensureDirExist() error {
return nil
}

func (d *dagStoreImpl) List() ([]dag.DAG, error) {
//TODO implement me
panic("implement me")
func (d *dagStoreImpl) List() (ret []*dag.DAG, errs []string, err error) {
if err = d.ensureDirExist(); err != nil {
errs = append(errs, err.Error())
return
}
fis, err := os.ReadDir(d.dir)
if err != nil {
errs = append(errs, err.Error())
return
}
for _, fi := range fis {
if checkExtension(fi.Name()) {
dat, err := d.GetMetadata(fi.Name())
if err == nil {
ret = append(ret, dat)
} else {
errs = append(errs, fmt.Sprintf("reading %s failed: %s", fi.Name(), err))
}
}
}
return ret, errs, nil
}

var extensions = []string{".yaml", ".yml"}

func checkExtension(file string) bool {
ext := filepath.Ext(file)
for _, e := range extensions {
if e == ext {
return true
}
}
return false
}

func (d *dagStoreImpl) Grep(pattern string) (ret []*persistence.GrepResult, errs []string, err error) {
Expand Down
6 changes: 0 additions & 6 deletions service/frontend/handlers/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ func (h *DAGHandler) Delete(params operations.DeleteDagParams) *response.CodedEr
func (h *DAGHandler) GetList(_ operations.ListDagsParams) (*models.ListDagsResponse, *response.CodedError) {
cfg := config.Get()

// TODO: fix this to use dags store & history store
dir := filepath.Join(cfg.DAGs)
e := h.engineFactory.Create()
dags, errs, err := e.ReadAllStatus(dir)
Expand Down Expand Up @@ -433,11 +432,6 @@ func (h *DAGHandler) PostAction(params operations.PostDagActionParams) (*models.
return &models.PostDagActionResponse{}, nil
}

func nameWithExt(name string) string {
s := strings.TrimSuffix(name, ".yaml")
return fmt.Sprintf("%s.yaml", s)
}

func (h *DAGHandler) updateStatus(dag *dag.DAG, reqId, step string, to scheduler.NodeStatus) error {
e := h.engineFactory.Create()
status, err := e.GetStatusByRequestId(dag, reqId)
Expand Down

0 comments on commit e72eb35

Please sign in to comment.