diff --git a/conformance/exists/host_redis_test.go b/conformance/exists/host_redis_test.go new file mode 100644 index 0000000..3d9f952 --- /dev/null +++ b/conformance/exists/host_redis_test.go @@ -0,0 +1,176 @@ +//go:build redis + +package lister + +import ( + "testing" + + "github.com/xh3b4sd/logger" + "github.com/xh3b4sd/redigo" + "github.com/xh3b4sd/rescue" + "github.com/xh3b4sd/rescue/engine" + "github.com/xh3b4sd/rescue/task" +) + +func Test_Engine_Exists_Host(t *testing.T) { + var err error + + var red redigo.Interface + { + red = redigo.Default() + } + + { + err = red.Purge() + if err != nil { + t.Fatal(err) + } + } + + var eon rescue.Interface + { + eon = engine.New(engine.Config{ + Logger: logger.Fake(), + Redigo: red, + }) + } + + { + exi, err := eon.Exists(&task.Task{Host: &task.Host{task.Method: task.MthdAll}}) + if err != nil { + t.Fatal(err) + } + + if exi { + t.Fatal("expected", false, "got", true) + } + } + + { + exi, err := eon.Exists(&task.Task{Host: &task.Host{task.Method: task.MthdUni}}) + if err != nil { + t.Fatal(err) + } + + if exi { + t.Fatal("expected", false, "got", true) + } + } + + { + exi, err := eon.Exists(&task.Task{Host: &task.Host{task.Method: task.MthdUni, task.Worker: "etw"}}) + if err != nil { + t.Fatal(err) + } + + if exi { + t.Fatal("expected", false, "got", true) + } + } + + { + exi, err := eon.Exists(&task.Task{Host: &task.Host{task.Method: task.MthdAny}}) + if err != nil { + t.Fatal(err) + } + + if exi { + t.Fatal("expected", false, "got", true) + } + } + + { + tas := &task.Task{ + Host: &task.Host{ + task.Method: task.MthdAll, + }, + Meta: &task.Meta{ + "test.api.io/key": "foo", + }, + } + + err = eon.Create(tas) + if err != nil { + t.Fatal(err) + } + } + + { + tas := &task.Task{ + Host: &task.Host{ + task.Method: task.MthdUni, + task.Worker: "eon", + }, + Meta: &task.Meta{ + "test.api.io/key": "bar", + }, + } + + err = eon.Create(tas) + if err != nil { + t.Fatal(err) + } + } + + { + tas := &task.Task{ + Host: &task.Host{ + task.Method: task.MthdUni, + task.Worker: "etw", + }, + Meta: &task.Meta{ + "test.api.io/key": "baz", + }, + } + + err = eon.Create(tas) + if err != nil { + t.Fatal(err) + } + } + + { + exi, err := eon.Exists(&task.Task{Host: &task.Host{task.Method: task.MthdAll}}) + if err != nil { + t.Fatal(err) + } + + if !exi { + t.Fatal("expected", true, "got", false) + } + } + + { + exi, err := eon.Exists(&task.Task{Host: &task.Host{task.Method: task.MthdUni}}) + if err != nil { + t.Fatal(err) + } + + if !exi { + t.Fatal("expected", true, "got", false) + } + } + + { + exi, err := eon.Exists(&task.Task{Host: &task.Host{task.Method: task.MthdUni, task.Worker: "etw"}}) + if err != nil { + t.Fatal(err) + } + + if !exi { + t.Fatal("expected", true, "got", false) + } + } + + // There is no "any" task created, so it must not exist. + { + exi, err := eon.Exists(&task.Task{Host: &task.Host{task.Method: task.MthdAny}}) + if err != nil { + t.Fatal(err) + } + + if exi { + t.Fatal("expected", false, "got", true) + } + } +} diff --git a/conformance/lister/host_redis_test.go b/conformance/lister/host_redis_test.go new file mode 100644 index 0000000..02cdece --- /dev/null +++ b/conformance/lister/host_redis_test.go @@ -0,0 +1,248 @@ +//go:build redis + +package lister + +import ( + "reflect" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/xh3b4sd/logger" + "github.com/xh3b4sd/redigo" + "github.com/xh3b4sd/rescue" + "github.com/xh3b4sd/rescue/engine" + "github.com/xh3b4sd/rescue/task" +) + +func Test_Engine_Lister_Host(t *testing.T) { + var err error + + var red redigo.Interface + { + red = redigo.Default() + } + + { + err = red.Purge() + if err != nil { + t.Fatal(err) + } + } + + var eon rescue.Interface + { + eon = engine.New(engine.Config{ + Logger: logger.Fake(), + Redigo: red, + }) + } + + var lis []*task.Task + { + lis, err = eon.Lister(engine.All()) + if err != nil { + t.Fatal(err) + } + } + + { + if len(lis) != 0 { + t.Fatal("expected", 0, "got", len(lis)) + } + } + + { + tas := &task.Task{ + Host: &task.Host{ + task.Method: task.MthdAll, + }, + Meta: &task.Meta{ + "test.api.io/key": "foo", + }, + } + + err = eon.Create(tas) + if err != nil { + t.Fatal(err) + } + } + + { + tas := &task.Task{ + Host: &task.Host{ + task.Method: task.MthdUni, + task.Worker: "eon", + }, + Meta: &task.Meta{ + "test.api.io/key": "bar", + }, + } + + err = eon.Create(tas) + if err != nil { + t.Fatal(err) + } + } + + { + tas := &task.Task{ + Host: &task.Host{ + task.Method: task.MthdUni, + task.Worker: "etw", + }, + Meta: &task.Meta{ + "test.api.io/key": "baz", + }, + } + + err = eon.Create(tas) + if err != nil { + t.Fatal(err) + } + } + + { + lis, err = eon.Lister(engine.All()) + if err != nil { + t.Fatal(err) + } + } + + { + if len(lis) != 3 { + t.Fatal("expected", 3, "got", len(lis)) + } + } + + { + var tas *task.Task + { + tas = lis[0] + } + + var exp *task.Task + { + exp = &task.Task{ + Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAll, + }, + Meta: &task.Meta{ + "test.api.io/key": "foo", + }, + } + } + + { + if !reflect.DeepEqual(tas, exp) { + t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) + } + } + } + + { + var tas *task.Task + { + tas = lis[1] + } + + var exp *task.Task + { + exp = &task.Task{ + Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdUni, + task.Worker: "eon", + }, + Meta: &task.Meta{ + "test.api.io/key": "bar", + }, + } + } + + { + if !reflect.DeepEqual(tas, exp) { + t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) + } + } + } + + { + var tas *task.Task + { + tas = lis[2] + } + + var exp *task.Task + { + exp = &task.Task{ + Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdUni, + task.Worker: "etw", + }, + Meta: &task.Meta{ + "test.api.io/key": "baz", + }, + } + } + + { + if !reflect.DeepEqual(tas, exp) { + t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) + } + } + } + + { + lis, err = eon.Lister(&task.Task{Host: &task.Host{task.Method: task.MthdAll}}) + if err != nil { + t.Fatal(err) + } + } + + { + if len(lis) != 1 { + t.Fatal("expected", 1, "got", len(lis)) + } + } + + { + lis, err = eon.Lister(&task.Task{Host: &task.Host{task.Method: task.MthdUni}}) + if err != nil { + t.Fatal(err) + } + } + + { + if len(lis) != 2 { + t.Fatal("expected", 2, "got", len(lis)) + } + } + + { + lis, err = eon.Lister(&task.Task{Host: &task.Host{task.Method: task.MthdUni, task.Worker: "etw"}}) + if err != nil { + t.Fatal(err) + } + } + + { + if len(lis) != 1 { + t.Fatal("expected", 1, "got", len(lis)) + } + } + + { + lis, err = eon.Lister(&task.Task{Host: &task.Host{task.Method: task.MthdAny}}) + if err != nil { + t.Fatal(err) + } + } + + { + if len(lis) != 0 { + t.Fatal("expected", 0, "got", len(lis)) + } + } +} diff --git a/conformance/method/uni_redis_test.go b/conformance/method/uni_redis_test.go new file mode 100644 index 0000000..be88706 --- /dev/null +++ b/conformance/method/uni_redis_test.go @@ -0,0 +1,485 @@ +//go:build redis + +package method + +import ( + "reflect" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/xh3b4sd/logger" + "github.com/xh3b4sd/redigo" + "github.com/xh3b4sd/rescue" + "github.com/xh3b4sd/rescue/engine" + "github.com/xh3b4sd/rescue/task" + "github.com/xh3b4sd/rescue/timer" +) + +func Test_Engine_Method_Uni_Cleanup(t *testing.T) { + var err error + + var red redigo.Interface + { + red = redigo.Default() + } + + { + err = red.Purge() + if err != nil { + t.Fatal(err) + } + } + + var tim *timer.Timer + { + tim = timer.New() + } + + // The engine is configured with a particular time. This point in time will be + // set inside the worker process as the pointer for when it started processing + // tasks. + { + tim.Setter(func() time.Time { + return musTim("2023-10-20T00:00:00Z") + }) + } + + var etw rescue.Interface + { + etw = engine.New(engine.Config{ + Logger: logger.Fake(), + Redigo: red, + Timer: tim, + Worker: "etw", + }) + } + + // Worker two creates a task for worker one, which never shows up. + { + tas := &task.Task{ + Host: &task.Host{ + task.Method: task.MthdUni, + task.Worker: "eon", + }, + Meta: &task.Meta{ + "test.api.io/key": "bar", + }, + } + + err = etw.Create(tas) + if err != nil { + t.Fatal(err) + } + } + + // Engine two can never receive any tasks. + { + _, err = etw.Search() + if !engine.IsTaskNotFound(err) { + t.Fatal("expected", "taskNotFoundError", "got", err) + } + } + + var lis []*task.Task + { + lis, err = etw.Lister(engine.All()) + if err != nil { + t.Fatal(err) + } + } + + // There should be one task in the queue, since one got just created. + { + if len(lis) != 1 { + t.Fatal("expected", 1, "got", len(lis)) + } + } + + // Time advances 2 minutes. The task should still exist. + { + tim.Setter(func() time.Time { + return musTim("2023-10-20T00:02:00Z") + }) + } + + // Engine two can never receive any tasks. + { + _, err = etw.Search() + if !engine.IsTaskNotFound(err) { + t.Fatal("expected", "taskNotFoundError", "got", err) + } + } + + // Engine two can never receive any tasks, but calling Engine.Expire purges + // any lingering task, regardless which engine executes it. + { + err = etw.Expire() + if err != nil { + t.Fatal(err) + } + } + + { + lis, err = etw.Lister(engine.All()) + if err != nil { + t.Fatal(err) + } + } + + { + if len(lis) != 1 { + t.Fatal("expected", 1, "got", len(lis)) + } + } + + // Time advances 7 days. The task should be gone now. + { + tim.Setter(func() time.Time { + return musTim("2023-10-27T00:02:00Z") + }) + } + + // Engine two can never receive any tasks, but calling Engine.Expire purges + // any lingering task, regardless which engine executes it. + { + err = etw.Expire() + if err != nil { + t.Fatal(err) + } + } + + { + lis, err = etw.Lister(engine.All()) + if err != nil { + t.Fatal(err) + } + } + + { + if len(lis) != 0 { + t.Fatal("expected", 0, "got", len(lis)) + } + } +} + +func Test_Engine_Method_Uni_Lifecycle(t *testing.T) { + var err error + + var red redigo.Interface + { + red = redigo.Default() + } + + { + err = red.Purge() + if err != nil { + t.Fatal(err) + } + } + + var tim *timer.Timer + { + tim = timer.New() + } + + // The engine is configured with a particular time. This point in time will be + // set inside the worker process as the pointer for when it started processing + // tasks. + { + tim.Setter(func() time.Time { + return musTim("2023-10-20T00:00:00Z") + }) + } + + var eon rescue.Interface + { + eon = engine.New(engine.Config{ + Logger: logger.Fake(), + Redigo: red, + Timer: tim, + Worker: "eon", + }) + } + + var etw rescue.Interface + { + etw = engine.New(engine.Config{ + Logger: logger.Fake(), + Redigo: red, + Timer: tim, + Worker: "etw", + }) + } + + // Time advances by 1 minute. So the first task "foo" got created at minute + // one. + { + tim.Setter(func() time.Time { + return musTim("2023-10-20T00:01:00Z") + }) + } + + { + tas := &task.Task{ + Host: &task.Host{ + task.Method: task.MthdAny, + }, + Meta: &task.Meta{ + "test.api.io/key": "foo", + }, + } + + err = eon.Create(tas) + if err != nil { + t.Fatal(err) + } + } + + // Time advances by 1 more minute. So the second task "bar" got created at + // minute two. + { + tim.Setter(func() time.Time { + return musTim("2023-10-20T00:02:00Z") + }) + } + + { + tas := &task.Task{ + Host: &task.Host{ + task.Method: task.MthdUni, + task.Worker: "eon", + }, + Meta: &task.Meta{ + "test.api.io/key": "bar", + }, + } + + err = eon.Create(tas) + if err != nil { + t.Fatal(err) + } + } + + // Worker one looks for new tasks and finds task two first, because it is + // directly addressed at worker "eon". + var tas *task.Task + { + tas, err = eon.Search() + if err != nil { + t.Fatal(err) + } + } + + { + var exp *task.Task + { + exp = &task.Task{ + Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdUni, + task.Worker: "eon", + }, + Meta: &task.Meta{ + "test.api.io/key": "bar", + }, + } + } + + { + if !reflect.DeepEqual(tas, exp) { + t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) + } + } + } + + // For the test here we pretend task two gets stuck or fails for whatever + // reason, it will expire within the underlying queue. After expiry we want to + // see task two being picked up by worker "eon" again. + + // Worker two looks for a task now and finds task one. + { + tas, err = etw.Search() + if err != nil { + t.Fatal(err) + } + } + + { + var exp *task.Task + { + exp = &task.Task{ + Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAny, + }, + Meta: &task.Meta{ + "test.api.io/key": "foo", + }, + } + } + + { + if !reflect.DeepEqual(tas, exp) { + t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) + } + } + } + + // Time advances some 10 seconds. So task one gets completed without issues. + { + tim.Setter(func() time.Time { + return musTim("2023-10-20T00:02:10Z") + }) + } + + { + err = etw.Delete(tas) + if err != nil { + t.Fatal(err) + } + } + + // Worker "etw" cannot find another task anymore, because from its point of + // view, the queue is empty now. + { + _, err = etw.Search() + if !engine.IsTaskNotFound(err) { + t.Fatal("expected", "taskNotFoundError", "got", err) + } + } + + // Now the failing task two can still not be received by worker one, because + // of the "failing" task' expiry. + { + tas, err = eon.Search() + if !engine.IsTaskNotFound(err) { + t.Fatal("expected", "taskNotFoundError", "got", err) + } + } + + // Time advances by another 25 seconds. So task two expired within the + // underlying queue and should be receivable again, particularly for worker + // "eon". + { + tim.Setter(func() time.Time { + return musTim("2023-10-20T00:02:35Z") + }) + } + + // Without running Engine.Expire no task can be expired within the underlying + // system. So even if the task's expiry is due, the system does not recognize + // it yet. That means searching for a task will not yield any result. + { + _, err = eon.Search() + if !engine.IsTaskNotFound(err) { + t.Fatal("expected", "taskNotFoundError", "got", err) + } + } + + // Any worker may run Engine.Expire in order to expire tasks within the + // underlying queue. Here worker "etw" is executing the expiration routine for + // the task addressed directly at worker "eon" to be expired. + { + err = etw.Expire() + if err != nil { + t.Fatal(err) + } + } + + // After task two got properly expired within the system, it can now be + // received by worker one. + { + tas, err = eon.Search() + if err != nil { + t.Fatal(err) + } + } + + // Engine two can never receive any more tasks from here. + { + _, err = etw.Search() + if !engine.IsTaskNotFound(err) { + t.Fatal("expected", "taskNotFoundError", "got", err) + } + } + + { + var exp *task.Task + { + exp = &task.Task{ + Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdUni, + task.Worker: "eon", + }, + Meta: &task.Meta{ + "test.api.io/key": "bar", + }, + } + } + + { + if !reflect.DeepEqual(tas, exp) { + t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) + } + } + } + + // Time advances by some 5 more seconds. So this time around the worker "eon" + // completed task two, which was "failing" earlier. + { + tim.Setter(func() time.Time { + return musTim("2023-10-20T00:02:40Z") + }) + } + + { + err = eon.Delete(tas) + if err != nil { + t.Fatal(err) + } + } + + // Since worker one completed its directly assigned task there should not be + // any more tasks to be received. + { + _, err = eon.Search() + if !engine.IsTaskNotFound(err) { + t.Fatal("expected", "taskNotFoundError", "got", err) + } + } + + // Engine two can never receive any more tasks from here. + { + _, err = etw.Search() + if !engine.IsTaskNotFound(err) { + t.Fatal("expected", "taskNotFoundError", "got", err) + } + } + + var lis []*task.Task + { + lis, err = eon.Lister(engine.All()) + if err != nil { + t.Fatal(err) + } + } + + // There should be no task left in the queue, because all tasks got resolved + // by the workers within the network. + { + if len(lis) != 0 { + t.Fatal("expected", 0, "got", len(lis)) + } + } +} + +func musTim(str string) time.Time { + tim, err := time.Parse("2006-01-02T15:04:05.999999Z", str) + if err != nil { + panic(err) + } + + return tim +} diff --git a/engine/create.go b/engine/create.go index 9365f91..6c05370 100644 --- a/engine/create.go +++ b/engine/create.go @@ -59,15 +59,6 @@ func (e *Engine) create(tas *task.Task) error { }() } - var met string - if tas.Core != nil { - met = tas.Core.Get().Method() - } - - if met == "" { - met = task.MthdAny - } - var tid int64 { tid = e.tim.Create().UnixNano() @@ -78,7 +69,6 @@ func (e *Engine) create(tas *task.Task) error { } { - tas.Core.Set().Method(met) tas.Core.Set().Object(tid) } @@ -87,6 +77,14 @@ func (e *Engine) create(tas *task.Task) error { tas.Cron.Set().TickP1(tic.TickP1()) } + if tas.Host == nil { + tas.Host = &task.Host{} + } + + if tas.Host.Get(task.Method) == "" { + tas.Host.Set(task.Method, task.MthdAny) + } + { k := e.Keyfmt() v := task.ToString(tas) @@ -106,18 +104,42 @@ func (e *Engine) verCre(tas *task.Task) (*ticker.Ticker, error) { if tas == nil { return nil, tracer.Maskf(taskEmptyError, "Task must not be empty") } + if tas.Core != nil { + return nil, tracer.Maskf(taskCoreError, "Task.Core must be empty") + } if tas.Meta == nil || tas.Meta.Emp() { return nil, tracer.Maskf(taskMetaEmptyError, "Task.Meta must not be empty") } } - if tas.Core != nil { - for k, v := range *tas.Core { - if k != task.Method { - return nil, tracer.Maskf(taskCoreError, "Task.Core can only contain one of the reserved labels [%s]", task.Method) - } - if v != task.MthdAll && v != task.MthdAny { - return nil, tracer.Maskf(labelValueError, "Task.Core must only contain one of the reserved values [all any]") + if tas.Host != nil { + if !tas.Host.Has(Met()) { + return nil, tracer.Maskf(taskHostError, "Task.Host must contain reserved key [%s]", task.Method) + } + + var met string + { + met = tas.Host.Get(task.Method) + } + + if met == task.MthdAll && tas.Host.Len() != 1 { + return nil, tracer.Maskf(taskHostError, `Task.Host must not contain any more labels if delivery method "all" is configured`) + } + + if met == task.MthdAny && tas.Host.Len() != 1 { + return nil, tracer.Maskf(taskHostError, `Task.Host must not contain any more labels if delivery method "any" is configured`) + } + + if met == task.MthdUni && !tas.Host.Exi(task.Worker) { + return nil, tracer.Maskf(taskHostError, `Task.Host must only contain reserved keys [%s %s] if delivery method "uni" is configured`, task.Method, task.Worker) + } + if met == task.MthdUni && tas.Host.Len() != 2 { + return nil, tracer.Maskf(taskHostError, `Task.Host must only contain reserved keys [%s %s] if delivery method "uni" is configured`, task.Method, task.Worker) + } + + for k, v := range *tas.Host { + if k == task.Method && v != task.MthdAll && v != task.MthdAny && v != task.MthdUni { + return nil, tracer.Maskf(labelValueError, "Task.Host must only contain one of the reserved values [%s %s %s]", task.MthdAll, task.MthdAny, task.MthdUni) } } } diff --git a/engine/create_test.go b/engine/create_test.go index 8e9ce79..f713b2c 100644 --- a/engine/create_test.go +++ b/engine/create_test.go @@ -69,6 +69,40 @@ func Test_Engine_Create_Core_Error(t *testing.T) { }, }, }, + // Case 005 + { + tas: &task.Task{ + Core: &task.Core{ + task.Method: task.MthdAll, + }, + Meta: &task.Meta{ + "foo": "bar", + }, + }, + }, + // Case 006 + { + tas: &task.Task{ + Core: &task.Core{ + task.Method: task.MthdAny, + }, + Meta: &task.Meta{ + "foo": "bar", + }, + }, + }, + // Case 007 + { + tas: &task.Task{ + Core: &task.Core{ + task.Method: task.MthdUni, + task.Worker: "bar", + }, + Meta: &task.Meta{ + "foo": "bar", + }, + }, + }, } for i, tc := range testCases { @@ -88,14 +122,128 @@ func Test_Engine_Create_Core_Error(t *testing.T) { } } -func Test_Engine_Create_Core_No_Error(t *testing.T) { +func Test_Engine_Create_Host_Error(t *testing.T) { testCases := []struct { tas *task.Task }{ // Case 000 + { + tas: &task.Task{ + Host: &task.Host{ + task.Method: "foo", + }, + Meta: &task.Meta{ + "foo": "bar", + }, + }, + }, + // Case 001 + { + tas: &task.Task{ + Host: &task.Host{ + task.Object: "bar", + }, + Meta: &task.Meta{ + "foo": "bar", + }, + }, + }, + // Case 002 + { + tas: &task.Task{ + Host: &task.Host{ + task.Worker: "baz", + }, + Meta: &task.Meta{ + "foo": "bar", + }, + }, + }, + // Case 003 + { + tas: &task.Task{ + Host: &task.Host{ + task.Method: task.MthdAll, + task.Worker: "baz", + }, + Meta: &task.Meta{ + "foo": "bar", + }, + }, + }, + // Case 004 + { + tas: &task.Task{ + Host: &task.Host{ + task.Method: task.MthdAny, + task.Object: "bar", + }, + Meta: &task.Meta{ + "foo": "bar", + }, + }, + }, + // Case 005 { tas: &task.Task{ Core: &task.Core{ + task.Method: task.MthdAny, + }, + Meta: &task.Meta{ + "foo": "bar", + }, + }, + }, + // Case 006 + { + tas: &task.Task{ + Host: &task.Host{ + task.Method: task.MthdUni, + }, + Meta: &task.Meta{ + "foo": "bar", + }, + }, + }, + // Case 007 + { + tas: &task.Task{ + Host: &task.Host{ + task.Method: task.MthdUni, + task.Object: "foo", + }, + Meta: &task.Meta{ + "foo": "bar", + }, + }, + }, + } + + for i, tc := range testCases { + t.Run(fmt.Sprintf("%03d", i), func(t *testing.T) { + var e *Engine + { + e = New(Config{ + Redigo: redigo.Fake(), + }) + } + + err := e.Create(tc.tas) + if err == nil { + t.Fatal("expected", "error", "got", nil) + } + }) + } +} + +func Test_Engine_Create_Host_No_Error(t *testing.T) { + testCases := []struct { + tas *task.Task + }{ + // Case 000 + { + tas: &task.Task{ + Host: &task.Host{ task.Method: task.MthdAll, }, Meta: &task.Meta{ @@ -106,7 +254,7 @@ func Test_Engine_Create_Core_No_Error(t *testing.T) { // Case 001 { tas: &task.Task{ - Core: &task.Core{ + Host: &task.Host{ task.Method: task.MthdAny, }, Meta: &task.Meta{ @@ -114,6 +262,18 @@ func Test_Engine_Create_Core_No_Error(t *testing.T) { }, }, }, + // Case 002 + { + tas: &task.Task{ + Host: &task.Host{ + task.Method: task.MthdUni, + task.Worker: "foo", + }, + Meta: &task.Meta{ + "foo": "bar", + }, + }, + }, } for i, tc := range testCases { diff --git a/engine/delete.go b/engine/delete.go index e3d67cb..42a0a97 100644 --- a/engine/delete.go +++ b/engine/delete.go @@ -67,7 +67,7 @@ func (e *Engine) delete(tas *task.Task) error { // Allow the local deletion of any broadcasted task that is not a task // template. if loc != nil { - all := tas.Core.Get().Method() == task.MthdAll + all := tas.Host.Get(task.Method) == task.MthdAll byp := tas.Core.Exi().Bypass() crn := tas.Cron == nil gat := tas.Gate == nil @@ -231,6 +231,8 @@ func (e *Engine) delete(tas *task.Task) error { var t *task.Task { t = &task.Task{ + Core: &task.Core{}, + Host: x.Host, Meta: x.Meta, Root: &task.Root{ task.Object: x.Core.Map().Object(), @@ -239,27 +241,21 @@ func (e *Engine) delete(tas *task.Task) error { } } - var met string - if x.Core != nil { - met = x.Core.Get().Method() - } - - if met == "" { - met = task.MthdAny - } - var tid int64 { tid = e.tim.Delete().UnixNano() } { - t.Core = &task.Core{} + t.Core.Set().Object(tid) } - { - t.Core.Set().Method(met) - t.Core.Set().Object(tid) + if t.Host == nil { + t.Host = &task.Host{} + } + + if t.Host.Get(task.Method) == "" { + t.Host.Set(task.Method, task.MthdAny) } { diff --git a/engine/engine.go b/engine/engine.go index 6923b43..d209ece 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -25,6 +25,7 @@ const ( type Config struct { Balancer balancer.Interface + Cleanup time.Duration Expiry time.Duration Logger logger.Interface Metric *metric.Collection @@ -37,6 +38,7 @@ type Config struct { type Engine struct { bal balancer.Interface + cln time.Duration ctx context.Context exp time.Duration // loc is the local lookup table for tasks that have been chosen to be @@ -55,6 +57,7 @@ type Engine struct { red redigo.Interface sep string tim *timer.Timer + // wrk is the identifier of this worker process. wrk string } @@ -62,6 +65,9 @@ func New(config Config) *Engine { if config.Balancer == nil { config.Balancer = balancer.Default() } + if config.Cleanup == 0 { + config.Cleanup = Week + } if config.Expiry == 0 { config.Expiry = Expiry } @@ -89,6 +95,7 @@ func New(config Config) *Engine { e := &Engine{ bal: config.Balancer, + cln: config.Cleanup, ctx: context.Background(), exp: config.Expiry, loc: map[string]*local{}, diff --git a/engine/engine_create_redis_test.go b/engine/engine_create_redis_test.go index ba0700f..191ccb9 100644 --- a/engine/engine_create_redis_test.go +++ b/engine/engine_create_redis_test.go @@ -124,6 +124,9 @@ func Test_Engine_Create(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAny, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -134,9 +137,6 @@ func Test_Engine_Create(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAny { - t.Fatal("expected", task.MthdAny, "got", tas.Core.Get().Method()) - } } } @@ -150,6 +150,9 @@ func Test_Engine_Create(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAny, + }, Meta: &task.Meta{ "test.api.io/key": "zap", }, @@ -163,9 +166,6 @@ func Test_Engine_Create(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAny { - t.Fatal("expected", task.MthdAny, "got", tas.Core.Get().Method()) - } } } @@ -179,6 +179,9 @@ func Test_Engine_Create(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAny, + }, Meta: &task.Meta{ "test.api.io/key": "bar", }, @@ -192,9 +195,6 @@ func Test_Engine_Create(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAny { - t.Fatal("expected", task.MthdAny, "got", tas.Core.Get().Method()) - } } } @@ -211,6 +211,9 @@ func Test_Engine_Create(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAny, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -221,9 +224,6 @@ func Test_Engine_Create(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAny { - t.Fatal("expected", task.MthdAny, "got", tas.Core.Get().Method()) - } } } @@ -254,6 +254,9 @@ func Test_Engine_Create(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAny, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -264,9 +267,6 @@ func Test_Engine_Create(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAny { - t.Fatal("expected", task.MthdAny, "got", tas.Core.Get().Method()) - } } } @@ -280,6 +280,9 @@ func Test_Engine_Create(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAny, + }, Meta: &task.Meta{ "test.api.io/key": "bar", }, @@ -293,9 +296,6 @@ func Test_Engine_Create(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAny { - t.Fatal("expected", task.MthdAny, "got", tas.Core.Get().Method()) - } } } @@ -318,6 +318,9 @@ func Test_Engine_Create(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAny, + }, Meta: &task.Meta{ "test.api.io/key": "bar", }, @@ -331,9 +334,6 @@ func Test_Engine_Create(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAny { - t.Fatal("expected", task.MthdAny, "got", tas.Core.Get().Method()) - } } } @@ -360,6 +360,9 @@ func Test_Engine_Create(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAny, + }, Meta: &task.Meta{ "test.api.io/key": "bar", }, @@ -373,9 +376,6 @@ func Test_Engine_Create(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAny { - t.Fatal("expected", task.MthdAny, "got", tas.Core.Get().Method()) - } } } @@ -657,7 +657,7 @@ func Test_Engine_Create_Method_All(t *testing.T) { // of creation. { tas := &task.Task{ - Core: &task.Core{ + Host: &task.Host{ task.Method: task.MthdAll, }, Meta: &task.Meta{ @@ -685,6 +685,9 @@ func Test_Engine_Create_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "sec", }, @@ -698,9 +701,6 @@ func Test_Engine_Create_Method_All(t *testing.T) { if tas.Core.Exi().Worker() { t.Fatal("expected", false, "got", true) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -717,6 +717,9 @@ func Test_Engine_Create_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "sec", }, @@ -730,9 +733,6 @@ func Test_Engine_Create_Method_All(t *testing.T) { if tas.Core.Exi().Worker() { t.Fatal("expected", false, "got", true) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -749,6 +749,9 @@ func Test_Engine_Create_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "sec", }, @@ -762,9 +765,6 @@ func Test_Engine_Create_Method_All(t *testing.T) { if tas.Core.Exi().Worker() { t.Fatal("expected", false, "got", true) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -783,6 +783,9 @@ func Test_Engine_Create_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAny, + }, Meta: &task.Meta{ "test.api.io/key": "fir", }, @@ -793,9 +796,6 @@ func Test_Engine_Create_Method_All(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAny { - t.Fatal("expected", task.MthdAny, "got", tas.Core.Get().Method()) - } } } diff --git a/engine/engine_delete_redis_test.go b/engine/engine_delete_redis_test.go index cc77879..5103e44 100644 --- a/engine/engine_delete_redis_test.go +++ b/engine/engine_delete_redis_test.go @@ -270,12 +270,12 @@ func Test_Engine_Delete_Cron_Method_All(t *testing.T) { var tas *task.Task { tas = &task.Task{ - Core: &task.Core{ - task.Method: task.MthdAll, - }, Cron: &task.Cron{ task.Aevery: "6 hours", }, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "bar", }, @@ -490,7 +490,7 @@ func Test_Engine_Delete_Gate_Method_All(t *testing.T) { var tas *task.Task { tas = &task.Task{ - Core: &task.Core{ + Host: &task.Host{ task.Method: task.MthdAll, }, Meta: &task.Meta{ @@ -621,7 +621,7 @@ func Test_Engine_Delete_Method_All_Purge(t *testing.T) { { tas := &task.Task{ - Core: &task.Core{ + Host: &task.Host{ task.Method: task.MthdAll, }, Meta: &task.Meta{ @@ -656,6 +656,9 @@ func Test_Engine_Delete_Method_All_Purge(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -666,9 +669,6 @@ func Test_Engine_Delete_Method_All_Purge(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -692,6 +692,9 @@ func Test_Engine_Delete_Method_All_Purge(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -702,9 +705,6 @@ func Test_Engine_Delete_Method_All_Purge(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -756,6 +756,9 @@ func Test_Engine_Delete_Method_All_Purge(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -766,9 +769,6 @@ func Test_Engine_Delete_Method_All_Purge(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -779,11 +779,12 @@ func Test_Engine_Delete_Method_All_Purge(t *testing.T) { }) } - // Searching for tasks now should purge the broadcasted task. + // Calling Engine.Expire purges any lingering task, regardless which engine + // executes it. { - tas, err = etw.Search() - if !IsTaskNotFound(err) { - t.Fatal("expected", taskNotFoundError, "got", err) + err = etw.Expire() + if err != nil { + t.Fatal(err) } } diff --git a/engine/engine_expire_redis_test.go b/engine/engine_expire_redis_test.go index b100494..8ef9985 100644 --- a/engine/engine_expire_redis_test.go +++ b/engine/engine_expire_redis_test.go @@ -208,7 +208,7 @@ func Test_Engine_Expire_Method_All(t *testing.T) { { tas := &task.Task{ - Core: &task.Core{ + Host: &task.Host{ task.Method: task.MthdAll, }, Meta: &task.Meta{ @@ -235,6 +235,9 @@ func Test_Engine_Expire_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -245,9 +248,6 @@ func Test_Engine_Expire_Method_All(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -263,6 +263,9 @@ func Test_Engine_Expire_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -273,9 +276,6 @@ func Test_Engine_Expire_Method_All(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -314,6 +314,9 @@ func Test_Engine_Expire_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -324,9 +327,6 @@ func Test_Engine_Expire_Method_All(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -342,6 +342,9 @@ func Test_Engine_Expire_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -352,9 +355,6 @@ func Test_Engine_Expire_Method_All(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -409,6 +409,9 @@ func Test_Engine_Expire_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -419,9 +422,6 @@ func Test_Engine_Expire_Method_All(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -474,6 +474,9 @@ func Test_Engine_Expire_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -484,9 +487,6 @@ func Test_Engine_Expire_Method_All(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -537,6 +537,9 @@ func Test_Engine_Expire_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -547,9 +550,6 @@ func Test_Engine_Expire_Method_All(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } } diff --git a/engine/engine_lifecycle_redis_test.go b/engine/engine_lifecycle_redis_test.go index 795b8a6..6fa8bdf 100644 --- a/engine/engine_lifecycle_redis_test.go +++ b/engine/engine_lifecycle_redis_test.go @@ -82,12 +82,12 @@ func Test_Engine_Lifecycle_Cron_3Days(t *testing.T) { var tas *task.Task { tas = &task.Task{ - Core: &task.Core{ - task.Method: task.MthdAll, - }, Cron: &task.Cron{ task.Aevery: "3 days", }, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -155,6 +155,9 @@ func Test_Engine_Lifecycle_Cron_3Days(t *testing.T) { task.TickM1: "2022-12-30T00:00:00Z", task.TickP1: "2023-01-02T00:00:00Z", }, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -165,9 +168,6 @@ func Test_Engine_Lifecycle_Cron_3Days(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -219,6 +219,9 @@ func Test_Engine_Lifecycle_Cron_3Days(t *testing.T) { task.TickM1: "2022-12-30T00:00:00Z", task.TickP1: "2023-01-02T00:00:00Z", }, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -229,9 +232,6 @@ func Test_Engine_Lifecycle_Cron_3Days(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -277,6 +277,9 @@ func Test_Engine_Lifecycle_Cron_3Days(t *testing.T) { task.TickM1: "2023-01-02T00:00:00Z", task.TickP1: "2023-01-05T00:00:00Z", }, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -287,9 +290,6 @@ func Test_Engine_Lifecycle_Cron_3Days(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -305,6 +305,9 @@ func Test_Engine_Lifecycle_Cron_3Days(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -318,9 +321,6 @@ func Test_Engine_Lifecycle_Cron_3Days(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } } @@ -1182,6 +1182,9 @@ func Test_Engine_Lifecycle_Cron_Resolve(t *testing.T) { Gate: &task.Gate{ "test.api.io/k-1": task.Trigger, }, + Host: &task.Host{ + task.Method: task.MthdAny, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -1196,9 +1199,6 @@ func Test_Engine_Lifecycle_Cron_Resolve(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAny { - t.Fatal("expected", task.MthdAny, "got", tas.Core.Get().Method()) - } } } @@ -1221,6 +1221,9 @@ func Test_Engine_Lifecycle_Cron_Resolve(t *testing.T) { Meta: &task.Meta{ "test.api.io/key": "foo", }, + Host: &task.Host{ + task.Method: task.MthdAny, + }, Root: &task.Root{ task.Object: lis[0].Core.Map().Object(), }, @@ -1235,9 +1238,6 @@ func Test_Engine_Lifecycle_Cron_Resolve(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAny { - t.Fatal("expected", task.MthdAny, "got", tas.Core.Get().Method()) - } } } @@ -1293,6 +1293,9 @@ func Test_Engine_Lifecycle_Cron_Resolve(t *testing.T) { Gate: &task.Gate{ "test.api.io/k-1": task.Trigger, }, + Host: &task.Host{ + task.Method: task.MthdAny, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -1307,9 +1310,6 @@ func Test_Engine_Lifecycle_Cron_Resolve(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAny { - t.Fatal("expected", task.MthdAny, "got", tas.Core.Get().Method()) - } } } } @@ -1362,7 +1362,7 @@ func Test_Engine_Lifecycle_Method_All_Failure(t *testing.T) { { tas := &task.Task{ - Core: &task.Core{ + Host: &task.Host{ task.Method: task.MthdAll, }, Meta: &task.Meta{ @@ -1386,7 +1386,7 @@ func Test_Engine_Lifecycle_Method_All_Failure(t *testing.T) { { tas := &task.Task{ - Core: &task.Core{ + Host: &task.Host{ task.Method: task.MthdAll, }, Meta: &task.Meta{ @@ -1414,6 +1414,9 @@ func Test_Engine_Lifecycle_Method_All_Failure(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -1424,9 +1427,6 @@ func Test_Engine_Lifecycle_Method_All_Failure(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -1446,6 +1446,9 @@ func Test_Engine_Lifecycle_Method_All_Failure(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "bar", }, @@ -1456,9 +1459,6 @@ func Test_Engine_Lifecycle_Method_All_Failure(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -1470,7 +1470,6 @@ func Test_Engine_Lifecycle_Method_All_Failure(t *testing.T) { } { - fmt.Printf("delete %#v\n", 11111111) err = eon.Delete(tas) if err != nil { t.Fatal(err) @@ -1533,6 +1532,9 @@ func Test_Engine_Lifecycle_Method_All_Failure(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "foo", }, @@ -1543,9 +1545,6 @@ func Test_Engine_Lifecycle_Method_All_Failure(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } diff --git a/engine/engine_lister_redis_test.go b/engine/engine_lister_redis_test.go index 482e885..1551ae7 100644 --- a/engine/engine_lister_redis_test.go +++ b/engine/engine_lister_redis_test.go @@ -353,12 +353,12 @@ func Test_Engine_Lister_Gate(t *testing.T) { { tas := &task.Task{ - Meta: &task.Meta{ - "test.api.io/key": "foo", - }, Gate: &task.Gate{ "test.api.io/k-0": task.Trigger, }, + Meta: &task.Meta{ + "test.api.io/key": "foo", + }, Sync: &task.Sync{ "test.api.io/zer": "0", }, @@ -372,12 +372,12 @@ func Test_Engine_Lister_Gate(t *testing.T) { { tas := &task.Task{ - Meta: &task.Meta{ - "test.api.io/key": "foo", - }, Gate: &task.Gate{ "test.api.io/k-1": task.Trigger, }, + Meta: &task.Meta{ + "test.api.io/key": "foo", + }, } err = eon.Create(tas) @@ -388,13 +388,13 @@ func Test_Engine_Lister_Gate(t *testing.T) { { tas := &task.Task{ - Meta: &task.Meta{ - "test.api.io/key": "bar", - }, Gate: &task.Gate{ "test.api.io/k-0": task.Waiting, "test.api.io/k-1": task.Waiting, }, + Meta: &task.Meta{ + "test.api.io/key": "bar", + }, Sync: &task.Sync{ "test.api.io/zer": "n/a", "test.api.io/one": "n/a", @@ -717,13 +717,16 @@ func Test_Engine_Lister_Gate(t *testing.T) { { exp = &task.Task{ Core: tas.Core, - Meta: &task.Meta{ - "test.api.io/key": "bar", - }, Gate: &task.Gate{ "test.api.io/k-0": task.Waiting, "test.api.io/k-1": task.Waiting, }, + Host: &task.Host{ + task.Method: task.MthdAny, + }, + Meta: &task.Meta{ + "test.api.io/key": "bar", + }, Sync: &task.Sync{ "test.api.io/zer": "0", "test.api.io/one": "n/a", @@ -735,9 +738,6 @@ func Test_Engine_Lister_Gate(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAny { - t.Fatal("expected", task.MthdAny, "got", tas.Core.Get().Method()) - } } } @@ -752,6 +752,9 @@ func Test_Engine_Lister_Gate(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAny, + }, Meta: &task.Meta{ "test.api.io/key": "bar", }, @@ -769,9 +772,6 @@ func Test_Engine_Lister_Gate(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAny { - t.Fatal("expected", task.MthdAny, "got", tas.Core.Get().Method()) - } } } } @@ -815,12 +815,12 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { { tas := &task.Task{ - Meta: &task.Meta{ - "test.api.io/key": "foo", - }, Gate: &task.Gate{ "test.api.io/k-0": task.Trigger, }, + Meta: &task.Meta{ + "test.api.io/key": "foo", + }, Sync: &task.Sync{ "test.api.io/zer": "0", }, @@ -834,12 +834,12 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { { tas := &task.Task{ - Meta: &task.Meta{ - "test.api.io/key": "foo", - }, Gate: &task.Gate{ "test.api.io/k-1": task.Trigger, }, + Meta: &task.Meta{ + "test.api.io/key": "foo", + }, } err = eon.Create(tas) @@ -850,16 +850,16 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { { tas := &task.Task{ - Core: &task.Core{ + Gate: &task.Gate{ + "test.api.io/k-0": task.Waiting, + "test.api.io/k-1": task.Waiting, + }, + Host: &task.Host{ task.Method: task.MthdAll, }, Meta: &task.Meta{ "test.api.io/key": "bar", }, - Gate: &task.Gate{ - "test.api.io/k-0": task.Waiting, - "test.api.io/k-1": task.Waiting, - }, Sync: &task.Sync{ "test.api.io/zer": "n/a", "test.api.io/one": "n/a", @@ -895,12 +895,15 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, - Meta: &task.Meta{ - "test.api.io/key": "foo", - }, Gate: &task.Gate{ "test.api.io/k-0": task.Trigger, }, + Host: &task.Host{ + task.Method: task.MthdAny, + }, + Meta: &task.Meta{ + "test.api.io/key": "foo", + }, Sync: &task.Sync{ "test.api.io/zer": "0", }, @@ -911,9 +914,6 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAny { - t.Fatal("expected", task.MthdAny, "got", tas.Core.Get().Method()) - } } } @@ -927,12 +927,15 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, - Meta: &task.Meta{ - "test.api.io/key": "foo", - }, Gate: &task.Gate{ "test.api.io/k-1": task.Trigger, }, + Host: &task.Host{ + task.Method: task.MthdAny, + }, + Meta: &task.Meta{ + "test.api.io/key": "foo", + }, } } @@ -940,9 +943,6 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAny { - t.Fatal("expected", task.MthdAny, "got", tas.Core.Get().Method()) - } } } @@ -956,13 +956,16 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, - Meta: &task.Meta{ - "test.api.io/key": "bar", - }, Gate: &task.Gate{ "test.api.io/k-0": task.Waiting, "test.api.io/k-1": task.Waiting, }, + Host: &task.Host{ + task.Method: task.MthdAll, + }, + Meta: &task.Meta{ + "test.api.io/key": "bar", + }, Sync: &task.Sync{ "test.api.io/zer": "n/a", "test.api.io/one": "n/a", @@ -974,9 +977,6 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -993,12 +993,15 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, - Meta: &task.Meta{ - "test.api.io/key": "foo", - }, Gate: &task.Gate{ "test.api.io/k-0": task.Trigger, }, + Host: &task.Host{ + task.Method: task.MthdAny, + }, + Meta: &task.Meta{ + "test.api.io/key": "foo", + }, Sync: &task.Sync{ "test.api.io/zer": "0", }, @@ -1009,9 +1012,6 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAny { - t.Fatal("expected", task.MthdAny, "got", tas.Core.Get().Method()) - } } } @@ -1045,12 +1045,15 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, - Meta: &task.Meta{ - "test.api.io/key": "foo", - }, Gate: &task.Gate{ "test.api.io/k-1": task.Trigger, }, + Host: &task.Host{ + task.Method: task.MthdAny, + }, + Meta: &task.Meta{ + "test.api.io/key": "foo", + }, } } @@ -1058,9 +1061,6 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAny { - t.Fatal("expected", task.MthdAny, "got", tas.Core.Get().Method()) - } } } @@ -1076,13 +1076,16 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, - Meta: &task.Meta{ - "test.api.io/key": "bar", - }, Gate: &task.Gate{ "test.api.io/k-0": task.Deleted, "test.api.io/k-1": task.Waiting, }, + Host: &task.Host{ + task.Method: task.MthdAll, + }, + Meta: &task.Meta{ + "test.api.io/key": "bar", + }, Sync: &task.Sync{ "test.api.io/zer": "0", "test.api.io/one": "n/a", @@ -1094,9 +1097,6 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -1112,12 +1112,15 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, - Meta: &task.Meta{ - "test.api.io/key": "foo", - }, Gate: &task.Gate{ "test.api.io/k-1": task.Trigger, }, + Host: &task.Host{ + task.Method: task.MthdAny, + }, + Meta: &task.Meta{ + "test.api.io/key": "foo", + }, } } @@ -1125,9 +1128,6 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAny { - t.Fatal("expected", task.MthdAny, "got", tas.Core.Get().Method()) - } } } @@ -1152,6 +1152,9 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "bar", }, @@ -1169,9 +1172,6 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -1201,13 +1201,16 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, - Meta: &task.Meta{ - "test.api.io/key": "bar", - }, Gate: &task.Gate{ "test.api.io/k-0": task.Waiting, "test.api.io/k-1": task.Waiting, }, + Host: &task.Host{ + task.Method: task.MthdAll, + }, + Meta: &task.Meta{ + "test.api.io/key": "bar", + }, Sync: &task.Sync{ "test.api.io/zer": "0", "test.api.io/one": "n/a", @@ -1219,9 +1222,6 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } @@ -1238,6 +1238,9 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { { exp = &task.Task{ Core: tas.Core, + Host: &task.Host{ + task.Method: task.MthdAll, + }, Meta: &task.Meta{ "test.api.io/key": "bar", }, @@ -1255,9 +1258,6 @@ func Test_Engine_Lister_Gate_Method_All(t *testing.T) { if !reflect.DeepEqual(tas, exp) { t.Fatalf("\n\n%s\n", cmp.Diff(exp, tas)) } - if tas.Core.Get().Method() != task.MthdAll { - t.Fatal("expected", task.MthdAll, "got", tas.Core.Get().Method()) - } } } } diff --git a/engine/error.go b/engine/error.go index 3ab7bf9..ba2b8c6 100644 --- a/engine/error.go +++ b/engine/error.go @@ -54,6 +54,14 @@ func IsTaskGate(err error) bool { return errors.Is(err, taskGateError) } +var taskHostError = &tracer.Error{ + Kind: "taskHostError", +} + +func IsTaskHost(err error) bool { + return errors.Is(err, taskHostError) +} + var taskMetaEmptyError = &tracer.Error{ Kind: "taskMetaEmptyError", } diff --git a/engine/exists.go b/engine/exists.go index 0733989..41fbd8b 100644 --- a/engine/exists.go +++ b/engine/exists.go @@ -39,14 +39,8 @@ func (e *Engine) exists(tas *task.Task) (bool, error) { } { - cor := tas.Core.Emp() - crn := tas.Cron.Emp() - gat := tas.Gate.Emp() - met := tas.Meta.Emp() - roo := tas.Root.Emp() - - if cor && crn && gat && met && roo { - return false, tracer.Maskf(taskMetaEmptyError, "at least one of [Task.Core Task.Cron Task.Gate Task.Meta Task.Root] must be given") + if tas.Emp() { + return false, tracer.Maskf(taskMetaEmptyError, "at least one of [Task.Core Task.Cron Task.Gate Task.Host Task.Meta Task.Root] must be configured") } } @@ -90,14 +84,8 @@ func (e *Engine) exists(tas *task.Task) (bool, error) { e.met.Task.Inactive.Set(float64(len(lis))) } - for _, t := range lis { - cor := tas.Core.Emp() || (t.Core != nil && t.Core.Has(*tas.Core)) - crn := tas.Cron.Emp() || (t.Cron != nil && t.Cron.Has(*tas.Cron)) - gat := tas.Gate.Emp() || (t.Gate != nil && t.Gate.Has(*tas.Gate)) - met := tas.Meta.Emp() || (t.Meta != nil && t.Meta.Has(*tas.Meta)) - roo := tas.Root.Emp() || (t.Root != nil && t.Root.Has(*tas.Root)) - - if cor && crn && gat && met && roo { + for _, x := range lis { + if x.Has(tas) { return true, nil } } diff --git a/engine/expire.go b/engine/expire.go index 3970e83..c3b076a 100644 --- a/engine/expire.go +++ b/engine/expire.go @@ -73,11 +73,42 @@ func (e *Engine) expire() error { cur[l.Core.Get().Worker()]++ } - for _, t := range lis { + for _, x := range lis { + // Derive this task's creation timestamp from its object ID. + var tim time.Time + { + tim = created(x.Core.Get().Object()) + } + + // Any lingering task is removed from the internal state once it is older + // the configured retention period, which defaults to 1 week. This is just a + // random guess on what is sensible, and since we want to do some house + // keeping in order to prevent unnecessary state bloat, we just get rid of + // it eventually. The assumption here right now is that tasks to be + // processed by all workers within the network are either already processed, + // or not relevant anymore beyond 1 week of creation. + if e.tim.Search().Sub(tim) > e.cln { + // Remove the irrelevant task from memory, if any. + { + delete(e.loc, x.Core.Map().Object()) + } + + // Remove the irrelevant task from the underlying queue. + { + k := e.Keyfmt() + s := float64(x.Core.Get().Object()) + + err = e.red.Sorted().Delete().Score(k, s) + if err != nil { + return tracer.Mask(err) + } + } + } + // We are looking for tasks which have an owner. So if there is no // owner assigned we ignore the task and move on to find another // one. - if t.Core.Get().Worker() == "" { + if x.Core.Get().Worker() == "" { continue } @@ -85,9 +116,9 @@ func (e *Engine) expire() error { var now time.Time var wrk string { - exp = t.Core.Get().Expiry() + exp = x.Core.Get().Expiry() now = e.tim.Expire() - wrk = t.Core.Get().Worker() + wrk = x.Core.Get().Worker() } // We are looking for tasks which are expired already. So if the task we @@ -100,15 +131,15 @@ func (e *Engine) expire() error { } { - t.Core.Prg().Expiry() - t.Core.Prg().Worker() - t.Core.Set().Cycles(t.Core.Get().Cycles() + 1) + x.Core.Prg().Expiry() + x.Core.Prg().Worker() + x.Core.Set().Cycles(x.Core.Get().Cycles() + 1) } { k := e.Keyfmt() - v := task.ToString(t) - s := float64(t.Core.Get().Object()) + v := task.ToString(x) + s := float64(x.Core.Get().Object()) _, err := e.red.Sorted().Update().Score(k, v, s) if err != nil { @@ -135,12 +166,12 @@ func (e *Engine) expire() error { dev = e.bal.Dev(cur, des) } - for _, t := range lis { + for _, x := range lis { // We are looking for tasks which have an owner that is supposed to // revoke their ownership. So if there is no revocation indicated // for the current owner we ignore the task and move on to find // another one. - if dev[t.Core.Get().Worker()] == 0 { + if dev[x.Core.Get().Worker()] == 0 { continue } @@ -148,9 +179,9 @@ func (e *Engine) expire() error { var now time.Time var wrk string { - exp = t.Core.Get().Expiry() + exp = x.Core.Get().Expiry() now = e.tim.Expire() - wrk = t.Core.Get().Worker() + wrk = x.Core.Get().Worker() } // We are looking for tasks which are expired already. So if the task we @@ -163,15 +194,15 @@ func (e *Engine) expire() error { } { - t.Core.Prg().Expiry() - t.Core.Prg().Worker() - t.Core.Set().Cycles(t.Core.Get().Cycles() + 1) + x.Core.Prg().Expiry() + x.Core.Prg().Worker() + x.Core.Set().Cycles(x.Core.Get().Cycles() + 1) } { k := e.Keyfmt() - v := task.ToString(t) - s := float64(t.Core.Get().Object()) + v := task.ToString(x) + s := float64(x.Core.Get().Object()) _, err := e.red.Sorted().Update().Score(k, v, s) if err != nil { diff --git a/engine/labels.go b/engine/labels.go index 3ec110f..8c2516c 100644 --- a/engine/labels.go +++ b/engine/labels.go @@ -16,6 +16,12 @@ func Del() map[string]string { } } +func Met() map[string]string { + return map[string]string{ + task.Method: "*", + } +} + func Obj() map[string]string { return map[string]string{ task.Object: "*", diff --git a/engine/lister.go b/engine/lister.go index 95d98a5..1d76063 100644 --- a/engine/lister.go +++ b/engine/lister.go @@ -46,13 +46,8 @@ func (e *Engine) lister(tas *task.Task) ([]*task.Task, error) { } { - crn := (tas.Cron == nil || tas.Cron.Emp()) - gat := (tas.Gate == nil || tas.Gate.Emp()) - met := (tas.Meta == nil || tas.Meta.Emp()) - roo := (tas.Root == nil || tas.Root.Emp()) - - if crn && gat && met && roo { - return nil, tracer.Maskf(taskMetaEmptyError, "at least one of [Task.Cron Task.Gate Task.Meta Task.Root] must be given") + if tas.Emp() { + return nil, tracer.Maskf(taskMetaEmptyError, "at least one of [Task.Cron Task.Gate Task.Host Task.Meta Task.Root] must be configured") } } @@ -97,14 +92,9 @@ func (e *Engine) lister(tas *task.Task) ([]*task.Task, error) { } var fil []*task.Task - for _, t := range lis { - crn := tas.Cron.Emp() || (t.Cron != nil && t.Cron.Has(*tas.Cron)) - gat := tas.Gate.Emp() || (t.Gate != nil && t.Gate.Has(*tas.Gate)) - met := tas.Meta.Emp() || (t.Meta != nil && t.Meta.Has(*tas.Meta)) - roo := tas.Root.Emp() || (t.Root != nil && t.Root.Has(*tas.Root)) - - if crn && gat && met && roo { - fil = append(fil, t) + for _, x := range lis { + if x.Has(tas) { + fil = append(fil, x) } } diff --git a/engine/search.go b/engine/search.go index 9b85b47..6db0490 100644 --- a/engine/search.go +++ b/engine/search.go @@ -98,41 +98,10 @@ func (e *Engine) search() (*task.Task, error) { } // Skip any task that does not define the task delivery method "all". - if x.Core.Get().Method() != task.MthdAll { + if x.Host.Get(task.Method) != task.MthdAll { continue } - // Derive this task's creation timestamp from its object ID. - var tim time.Time - { - tim = created(x.Core.Get().Object()) - } - - // Any task with delivery method "all" is removed from the internal state - // once it is older than 1 week. This is just a random guess on what is - // sensible, and since we want to do some house keeping in order to prevent - // unnecessary state bloat, we just get rid of it eventually. The assumption - // here right now is that tasks to be processed by all workers within the - // network are either already processed, or not relevant anymore beyond 1 - // week of creation. - if e.tim.Search().Sub(tim) > Week { - // Remove the purged task from memory, if any. - { - delete(e.loc, x.Core.Map().Object()) - } - - // Remove the purged task from the underlying queue. - { - k := e.Keyfmt() - s := float64(x.Core.Get().Object()) - - err = e.red.Sorted().Delete().Score(k, s) - if err != nil { - return nil, tracer.Mask(err) - } - } - } - var loc *local { loc = e.loc[x.Core.Map().Object()] @@ -143,6 +112,12 @@ func (e *Engine) search() (*task.Task, error) { continue } + // Derive this task's creation timestamp from its object ID. + var tim time.Time + { + tim = created(x.Core.Get().Object()) + } + // Skip any task that got created before this worker started to participate // within the network. Engine.pnt is the earliest point in time at which the // worker process came online, or the latest point in time of having @@ -188,7 +163,7 @@ func (e *Engine) search() (*task.Task, error) { for i, x := range lis { // Remove all broadcasted tasks for further processing. Any task defining // delivery method "all" must have been addressed already above. - if x.Core.Get().Method() == task.MthdAll { + if x.Host.Get(task.Method) == task.MthdAll { rem = append(rem, i) continue } @@ -307,19 +282,43 @@ func (e *Engine) search() (*task.Task, error) { } var tas *task.Task - for _, t := range lis { - // We are looking for tasks which do not yet have an owner. So if - // there is an owner assigned we ignore the task and move on to find - // another one. - { - if t.Core.Get().Worker() != "" { + + if tas == nil { + for _, x := range lis { + // We are looking for tasks which do not yet have an owner. So if there is + // an owner assigned we ignore the task and move on to find another one. + if x.Core.Get().Worker() != "" { continue } + + // The current task is not assigned to any worker. If this task's delivery + // method is now set to "uni" and its target worker address is this current + // worker, then we simply take it and assign it to the this current worker. + // Note that we want to give tasks priority that are specifically addressed + // to a particular worker. Tasks that can be processed by anyone are of + // secondary importance in our system. + if x.Host.Get(task.Method) == task.MthdUni && x.Host.Get(task.Worker) == e.wrk { + tas = x + break + } } + } - { - tas = t - break + if tas == nil { + for _, x := range lis { + // We are looking for tasks which do not yet have an owner. So if there is + // an owner assigned we ignore the task and move on to find another one. + if x.Core.Get().Worker() != "" { + continue + } + + // The current task is not assigned to any worker. If this task's delivery + // method is now set to "any", then we simply take it and assign it to this + // current worker. + if x.Host.Get(task.Method) == task.MthdAny { + tas = x + break + } } } diff --git a/engine/ticker.go b/engine/ticker.go index e8ef5e7..8a26b1c 100644 --- a/engine/ticker.go +++ b/engine/ticker.go @@ -208,6 +208,8 @@ func (e *Engine) ticker() error { var t *task.Task { t = &task.Task{ + Core: &task.Core{}, + Host: x.Host, Gate: x.Gate, Meta: x.Meta, Root: &task.Root{ @@ -217,27 +219,21 @@ func (e *Engine) ticker() error { } } - var met string - if x.Core != nil { - met = x.Core.Get().Method() - } - - if met == "" { - met = task.MthdAny - } - var tid int64 { tid = e.tim.Ticker().UnixNano() } { - t.Core = &task.Core{} + t.Core.Set().Object(tid) } - { - t.Core.Set().Method(met) - t.Core.Set().Object(tid) + if t.Host == nil { + t.Host = &task.Host{} + } + + if t.Host.Get(task.Method) == "" { + t.Host.Set(task.Method, task.MthdAny) } { @@ -274,7 +270,7 @@ func (e *Engine) ticker() error { // means there is no completion or acknowledgement for scheduled tasks if // they are delivered to all workers. We just fire at-least-once, on // schedule, and leave the rest to the workers. - if x.Core.Get().Method() == task.MthdAll { + if x.Host.Get(task.Method) == task.MthdAll { x.Cron.Set().TickM1(tic.TickM1()) } diff --git a/task/core_exists.go b/task/core_exists.go index 2002484..d554d0e 100644 --- a/task/core_exists.go +++ b/task/core_exists.go @@ -22,10 +22,6 @@ func (e *exicor) Expiry() bool { return e.labl[Expiry] != "" } -func (e *exicor) Method() bool { - return e.labl[Method] != "" -} - func (e *exicor) Object() bool { return e.labl[Object] != "" } diff --git a/task/core_getter.go b/task/core_getter.go index 73811a7..4a204ea 100644 --- a/task/core_getter.go +++ b/task/core_getter.go @@ -52,10 +52,6 @@ func (g *getcor) Expiry() time.Time { return exp } -func (g *getcor) Method() string { - return g.labl[Method] -} - func (g *getcor) Object() int64 { obj, err := strconv.ParseInt(g.labl[Object], 10, 64) if err != nil { diff --git a/task/core_mapper.go b/task/core_mapper.go index a3ef664..3ae78b6 100644 --- a/task/core_mapper.go +++ b/task/core_mapper.go @@ -22,10 +22,6 @@ func (m *mapcor) Expiry() string { return m.labl[Expiry] } -func (m *mapcor) Method() string { - return m.labl[Method] -} - func (m *mapcor) Object() string { return m.labl[Object] } diff --git a/task/core_purger.go b/task/core_purger.go index a0a3353..65d7ac0 100644 --- a/task/core_purger.go +++ b/task/core_purger.go @@ -22,10 +22,6 @@ func (p *prgcor) Expiry() { delete(p.labl, Expiry) } -func (p *prgcor) Method() { - delete(p.labl, Method) -} - func (p *prgcor) Object() { delete(p.labl, Object) } diff --git a/task/core_setter.go b/task/core_setter.go index 4364d3d..813a5ed 100644 --- a/task/core_setter.go +++ b/task/core_setter.go @@ -29,10 +29,6 @@ func (s *setcor) Expiry(x time.Time) { s.labl[Expiry] = x.Format(ticker.Layout) } -func (s *setcor) Method(x string) { - s.labl[Method] = x -} - func (s *setcor) Object(x int64) { s.labl[Object] = strconv.FormatInt(x, 10) } diff --git a/task/host.go b/task/host.go new file mode 100644 index 0000000..91bfff6 --- /dev/null +++ b/task/host.go @@ -0,0 +1,73 @@ +package task + +import "github.com/xh3b4sd/rescue/matcher" + +type Host map[string]string + +func (h *Host) All(key ...string) *Host { + hos := Host(matcher.All(*h, key...)) + return &hos +} + +func (h *Host) Any(key ...string) *Host { + hos := Host(matcher.Any(*h, key...)) + return &hos +} + +func (h *Host) Emp() bool { + return h.Len() == 0 +} + +func (h *Host) Eql(x *Host) bool { + return h != nil && x != nil && h.Len() == x.Len() && h.Has(*x) +} + +func (h *Host) Exi(key string) bool { + if h == nil { + return false + } + + hos := *h + return key != "" && hos[key] != "" +} + +func (h *Host) Get(key string) string { + if h == nil { + return "" + } + + hos := *h + return hos[key] +} + +func (h *Host) Has(lab map[string]string) bool { + return matcher.Has(*h, lab) +} + +func (h *Host) Key() []string { + if h == nil { + return nil + } + + var key []string + + for k := range *h { + key = append(key, k) + } + + return key +} + +func (h *Host) Len() int { + if h == nil { + return 0 + } + + hos := *h + return len(hos) +} + +func (h *Host) Set(key string, val string) { + hos := *h + hos[key] = val +} diff --git a/task/host_test.go b/task/host_test.go new file mode 100644 index 0000000..ebb9f55 --- /dev/null +++ b/task/host_test.go @@ -0,0 +1,384 @@ +package task + +import ( + "fmt" + "reflect" + "slices" + "testing" + + "github.com/google/go-cmp/cmp" +) + +func Test_Task_Host_Emp(t *testing.T) { + testCases := []struct { + tas *Task + emp bool + }{ + // Case 000 + { + tas: &Task{}, + emp: true, + }, + // Case 001 + { + tas: &Task{ + Core: &Core{"foo": "bar"}, + }, + emp: true, + }, + // Case 002 + { + tas: &Task{ + Host: &Host{}, + }, + emp: true, + }, + // Case 003 + { + tas: &Task{ + Host: &Host{"foo": "bar"}, + }, + emp: false, + }, + // Case 004 + { + tas: &Task{ + Root: &Root{}, + }, + emp: true, + }, + // Case 005 + { + tas: &Task{ + Root: &Root{"foo": "bar"}, + }, + emp: true, + }, + } + + for i, tc := range testCases { + t.Run(fmt.Sprintf("%03d", i), func(t *testing.T) { + emp := tc.tas.Host.Emp() + + if emp != tc.emp { + t.Fatalf("\n\n%s\n", cmp.Diff(tc.emp, emp)) + } + }) + } +} + +func Test_Task_Host_Eql(t *testing.T) { + testCases := []struct { + tas *Task + hos *Host + eql bool + }{ + // Case 000 + { + tas: &Task{}, + hos: nil, + eql: false, + }, + // Case 001 + { + tas: &Task{}, + hos: &Host{}, + eql: false, + }, + // Case 002 + { + tas: &Task{ + Host: &Host{"foo": "bar"}, + }, + hos: &Host{}, + eql: false, + }, + // Case 003 + { + tas: &Task{ + Core: &Core{"foo": "bar"}, + }, + hos: &Host{"foo": "bar"}, + eql: false, + }, + // Case 004 + { + tas: &Task{ + Host: &Host{"foo": "bar"}, + }, + hos: &Host{"foo": "bar"}, + eql: true, + }, + // Case 005 + { + tas: &Task{ + Host: &Host{"foo": "bar", "baz": "zap"}, + }, + hos: &Host{"foo": "bar"}, + eql: false, + }, + // Case 006 + { + tas: &Task{ + Host: &Host{"foo": "bar", "baz": "zap"}, + }, + hos: &Host{"baz": "zap"}, + eql: false, + }, + // Case 007 + { + tas: &Task{ + Host: &Host{"foo": "bar", "baz": "zap"}, + }, + hos: &Host{"foo": "bar", "baz": "zap"}, + eql: true, + }, + // Case 008 + { + tas: &Task{ + Host: &Host{"foo": "", "baz": "zap"}, + }, + hos: &Host{"foo": "", "baz": "zap"}, + eql: true, + }, + // Case 009 + { + tas: &Task{ + Host: &Host{"foo": "", "baz": "zap"}, + }, + hos: &Host{"foo": "bar", "baz": ""}, + eql: false, + }, + } + + for i, tc := range testCases { + t.Run(fmt.Sprintf("%03d", i), func(t *testing.T) { + eql := tc.tas.Host.Eql(tc.hos) + + if eql != tc.eql { + t.Fatalf("\n\n%s\n", cmp.Diff(tc.eql, eql)) + } + }) + } +} + +func Test_Task_Host_Exi(t *testing.T) { + testCases := []struct { + tas *Task + exi bool + }{ + // Case 000 + { + tas: &Task{}, + exi: false, + }, + // Case 001 + { + tas: &Task{ + Core: &Core{"foo": "bar"}, + }, + exi: false, + }, + // Case 002 + { + tas: &Task{ + Host: &Host{}, + }, + exi: false, + }, + // Case 003 + { + tas: &Task{ + Host: &Host{"foo": "bar"}, + }, + exi: true, + }, + // Case 004 + { + tas: &Task{ + Root: &Root{}, + }, + exi: false, + }, + // Case 005 + { + tas: &Task{ + Root: &Root{"foo": "bar"}, + }, + exi: false, + }, + } + + for i, tc := range testCases { + t.Run(fmt.Sprintf("%03d", i), func(t *testing.T) { + exi := tc.tas.Host.Exi("foo") + + if exi != tc.exi { + t.Fatalf("\n\n%s\n", cmp.Diff(tc.exi, exi)) + } + }) + } +} + +func Test_Task_Host_Get(t *testing.T) { + testCases := []struct { + tas *Task + get string + }{ + // Case 000 + { + tas: &Task{}, + get: "", + }, + // Case 001 + { + tas: &Task{ + Core: &Core{"foo": "bar"}, + }, + get: "", + }, + // Case 002 + { + tas: &Task{ + Host: &Host{}, + }, + get: "", + }, + // Case 003 + { + tas: &Task{ + Host: &Host{"foo": "bar"}, + }, + get: "bar", + }, + // Case 004 + { + tas: &Task{ + Root: &Root{}, + }, + get: "", + }, + // Case 005 + { + tas: &Task{ + Root: &Root{"foo": "bar"}, + }, + get: "", + }, + } + + for i, tc := range testCases { + t.Run(fmt.Sprintf("%03d", i), func(t *testing.T) { + get := tc.tas.Host.Get("foo") + + if get != tc.get { + t.Fatalf("\n\n%s\n", cmp.Diff(tc.get, get)) + } + }) + } +} + +func Test_Task_Host_Key(t *testing.T) { + testCases := []struct { + tas *Task + key []string + }{ + // Case 000 + { + tas: &Task{}, + key: nil, + }, + // Case 001 + { + tas: &Task{ + Meta: &Meta{ + "foo": "bar", + }, + }, + key: nil, + }, + // Case 002 + { + tas: &Task{ + Host: &Host{ + "foo": "bar", + }, + }, + key: []string{ + "foo", + }, + }, + // Case 003 + { + tas: &Task{ + Host: &Host{ + "foo": "bar", + "baz": "foo", + "key": "baz", + }, + }, + key: []string{ + "foo", + "baz", + "key", + }, + }, + } + + for i, tc := range testCases { + t.Run(fmt.Sprintf("%03d", i), func(t *testing.T) { + key := tc.tas.Host.Key() + + slices.Sort(key) + slices.Sort(tc.key) + + if !reflect.DeepEqual(key, tc.key) { + t.Fatalf("\n\n%s\n", cmp.Diff(tc.key, key)) + } + }) + } +} + +func Test_Task_Host_Set(t *testing.T) { + testCases := []struct { + tas *Task + key string + val string + set *Task + }{ + // Case 000 + { + tas: &Task{ + Host: &Host{"foo": "bar"}, + }, + key: "foo", + val: "zap", + set: &Task{ + Host: &Host{"foo": "zap"}, + }, + }, + // Case 001 + { + tas: &Task{ + Host: &Host{"foo": "bar", "one": "two"}, + }, + key: "one", + val: "thr", + set: &Task{ + Host: &Host{"foo": "bar", "one": "thr"}, + }, + }, + } + + for i, tc := range testCases { + t.Run(fmt.Sprintf("%03d", i), func(t *testing.T) { + tc.tas.Host.Set(tc.key, tc.val) + + set := tc.set + + if !reflect.DeepEqual(set, tc.tas) { + t.Fatalf("\n\n%s\n", cmp.Diff(tc.tas, set)) + } + }) + } +} diff --git a/task/labels.go b/task/labels.go index 6957a98..b894995 100644 --- a/task/labels.go +++ b/task/labels.go @@ -24,17 +24,6 @@ const ( // fact and stop executing on the expired task. Expiry = "task.rescue.io/expiry" - // Method is the addressing strategy to deliver a task within the network of - // worker nodes. Every single task may be processed with varying guarantees of - // delivery, but always at-least-once. - // - // all delivered to all workers within the network (timeline based) - // any delivered to any worker within the network (default method) - // mny delivered to many specific workers within the network (not implemented) - // uni delivered to a single specific worker within the network (not implemented) - // - Method = "task.rescue.io/method" - // Object is the identifier of the task within the queue. Object = "task.rescue.io/object" @@ -62,6 +51,19 @@ const ( TickP1 = "time.rescue.io/tick+1" ) +const ( + // Method is the addressing strategy to deliver a task within the network of + // worker nodes. Every single task may be processed with varying guarantees of + // delivery, but always at-least-once. + // + // all delivered to all workers within the network (timeline based) + // any delivered to any worker within the network (default method) + // mny delivered to many specific workers within the network (not implemented) + // uni delivered to a single specific worker within the network (worker based) + // + Method = "addr.rescue.io/method" +) + const ( // MthdAll is the addressing method to deliver a task to every worker within // the network at the time of that particular task creation. Consider workers @@ -79,6 +81,15 @@ const ( // long as it is being processed. This is the default method and does not have // to be specified. MthdAny = "any" + + // MthdUni is the addressing method to deliver a task to a specific worker + // within the network. Using "uni" requires the accompanied usage of the core + // label key "task.rescue.io/worker" for specifying a particular identifier. + // Tasks routed via "uni" may expire like "any" task. In fact the only + // difference between "any" and "uni" is that the "uni" task has a sticky task + // ownership requirement, while "any" task may be picked up by some arbitrary + // worker after expiry. + MthdUni = "uni" ) const ( diff --git a/task/task.go b/task/task.go index 9ec4b43..3966b52 100644 --- a/task/task.go +++ b/task/task.go @@ -75,6 +75,18 @@ type Task struct { // to begin. Gate *Gate `json:"gate,omitempty"` + // Host contains addressable task delivery information for targeting any + // addressable worker within the network. The default delivery method is + // "any". Tasks may be processed by "all" workers within the network without + // acknowledgement of completion. Any particular worker may be addressed like + // shown below. Tasks not being addressed within a configured retention period + // are being deleted. + // + // addr.rescue.io/method uni + // task.rescue.io/worker 90dc68ba-4820-42ac-a924-2450388c15a6 + // + Host *Host `json:"host,omitempty"` + // Meta contains task specific information defined by the user. Any worker // should be able to identify whether they are able to execute on a task // successfully, given the task metadata. Upon task creation, certain metadata @@ -117,3 +129,22 @@ type Task struct { // Sync *Sync `json:"sync,omitempty"` } + +// Emp expresses whether this task t contains any definition at all. +func (t *Task) Emp() bool { + return t != nil && t.Core.Emp() && t.Cron.Emp() && t.Host.Emp() && t.Gate.Emp() && t.Meta.Emp() && t.Root.Emp() +} + +// Has expresses whether this task t contains all the definitions of the given +// task x. Here, x is a subset of t. If t has all of x's definitions, then Has +// returns true. +func (t *Task) Has(x *Task) bool { + cor := x.Core.Emp() || (t.Core != nil && t.Core.Has(*x.Core)) + crn := x.Cron.Emp() || (t.Cron != nil && t.Cron.Has(*x.Cron)) + hos := x.Host.Emp() || (t.Host != nil && t.Host.Has(*x.Host)) + gat := x.Gate.Emp() || (t.Gate != nil && t.Gate.Has(*x.Gate)) + met := x.Meta.Emp() || (t.Meta != nil && t.Meta.Has(*x.Meta)) + roo := x.Root.Emp() || (t.Root != nil && t.Root.Has(*x.Root)) + + return cor && crn && hos && gat && met && roo +}