Skip to content

Commit

Permalink
enable task delivery method to reach all workers (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
xh3b4sd authored Oct 21, 2023
1 parent 90f4ad6 commit c144f8e
Show file tree
Hide file tree
Showing 38 changed files with 3,083 additions and 295 deletions.
30 changes: 23 additions & 7 deletions engine/create.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package engine

import (
"time"

"github.com/xh3b4sd/rescue/task"
"github.com/xh3b4sd/rescue/ticker"
"github.com/xh3b4sd/tracer"
Expand Down Expand Up @@ -61,16 +59,26 @@ func (e *Engine) create(tas *task.Task) error {
}()
}

var met string
if tas.Core != nil {
met = tas.Core.Get().Method()
}

if met == "" {
met = task.MthdAny
}

var tid int64
{
tid = time.Now().UTC().UnixNano()
tid = e.tim.Create().UnixNano()
}

{
if tas.Core == nil {
tas.Core = &task.Core{}
}

{
tas.Core.Set().Method(met)
tas.Core.Set().Object(tid)
}

Expand Down Expand Up @@ -98,14 +106,22 @@ func (e *Engine) verCre(tas *task.Task) (*ticker.Ticker, error) {
if tas == nil {
return nil, tracer.Maskf(taskEmptyError, "Task must not be empty")
}
if tas.Core != nil {
return nil, tracer.Maskf(taskCoreError, "Task.Core must be empty")
}
if tas.Meta == nil || tas.Meta.Emp() {
return nil, tracer.Maskf(taskMetaEmptyError, "Task.Meta must not be empty")
}
}

if tas.Core != nil {
for k, v := range *tas.Core {
if k != task.Method {
return nil, tracer.Maskf(taskCoreError, "Task.Core can only contain one of the reserved labels [%s]", task.Method)
}
if v != task.MthdAll && v != task.MthdAny {
return nil, tracer.Maskf(labelValueError, "Task.Core must only contain one of the reserved values [all any]")
}
}
}

{
if tas.Cron != nil && tas.Root != nil {
return nil, tracer.Maskf(taskCronError, "Task.Cron and Task.Root must not be configured together")
Expand Down
127 changes: 126 additions & 1 deletion engine/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,131 @@ import (
"github.com/xh3b4sd/rescue/task"
)

func Test_Engine_Create_Core_Error(t *testing.T) {
testCases := []struct {
tas *task.Task
}{
// Case 000
{
tas: &task.Task{
Core: &task.Core{
task.Method: "foo",
},
Meta: &task.Meta{
"foo": "bar",
},
},
},
// Case 001
{
tas: &task.Task{
Core: &task.Core{
task.Object: "bar",
},
Meta: &task.Meta{
"foo": "bar",
},
},
},
// Case 002
{
tas: &task.Task{
Core: &task.Core{
task.Worker: "baz",
},
Meta: &task.Meta{
"foo": "bar",
},
},
},
// Case 003
{
tas: &task.Task{
Core: &task.Core{
task.Method: task.MthdAll,
task.Worker: "baz",
},
Meta: &task.Meta{
"foo": "bar",
},
},
},
// Case 004
{
tas: &task.Task{
Core: &task.Core{
task.Method: task.MthdAny,
task.Object: "bar",
},
Meta: &task.Meta{
"foo": "bar",
},
},
},
}

for i, tc := range testCases {
t.Run(fmt.Sprintf("%03d", i), func(t *testing.T) {
var e *Engine
{
e = New(Config{
Redigo: redigo.Fake(),
})
}

err := e.Create(tc.tas)
if err == nil {
t.Fatal("expected", "error", "got", nil)
}
})
}
}

func Test_Engine_Create_Core_No_Error(t *testing.T) {
testCases := []struct {
tas *task.Task
}{
// Case 000
{
tas: &task.Task{
Core: &task.Core{
task.Method: task.MthdAll,
},
Meta: &task.Meta{
"foo": "bar",
},
},
},
// Case 001
{
tas: &task.Task{
Core: &task.Core{
task.Method: task.MthdAny,
},
Meta: &task.Meta{
"foo": "bar",
},
},
},
}

for i, tc := range testCases {
t.Run(fmt.Sprintf("%03d", i), func(t *testing.T) {
var e *Engine
{
e = New(Config{
Redigo: redigo.Fake(),
})
}

err := e.Create(tc.tas)
if err != nil {
t.Fatal("expected", nil, "got", err)
}
})
}
}

func Test_Engine_Create_Meta_No_Error(t *testing.T) {
testCases := []struct {
tas *task.Task
Expand Down Expand Up @@ -279,7 +404,7 @@ func Test_Engine_Create_Gate_Error(t *testing.T) {
},
},
// Case 012 ensures that the reserved value "waiting" in Task.Gate cannot be
// used together with task.Cron.
// used together with Task.Cron.
{
tas: &task.Task{
Cron: &task.Cron{
Expand Down
Loading

0 comments on commit c144f8e

Please sign in to comment.