Skip to content

Commit

Permalink
feat: add function foreach array
Browse files Browse the repository at this point in the history
Signed-off-by: xdlbdy <[email protected]>
  • Loading branch information
xdlbdy committed Jan 16, 2023
1 parent d3617ed commit 2fa3241
Show file tree
Hide file tree
Showing 18 changed files with 69 additions and 154 deletions.
1 change: 0 additions & 1 deletion cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/linkall-labs/vanus/internal/controller/trigger"
"github.com/linkall-labs/vanus/internal/primitive/interceptor/errinterceptor"
"github.com/linkall-labs/vanus/internal/primitive/interceptor/memberinterceptor"
_ "github.com/linkall-labs/vanus/internal/primitive/transform"
"github.com/linkall-labs/vanus/internal/primitive/vanus"
"github.com/linkall-labs/vanus/observability"
"github.com/linkall-labs/vanus/observability/log"
Expand Down
1 change: 0 additions & 1 deletion cmd/trigger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"sync"

"github.com/linkall-labs/vanus/internal/primitive"
_ "github.com/linkall-labs/vanus/internal/primitive/transform"
"github.com/linkall-labs/vanus/internal/trigger"
"github.com/linkall-labs/vanus/observability"
"github.com/linkall-labs/vanus/observability/log"
Expand Down
1 change: 0 additions & 1 deletion internal/controller/trigger/validation/suscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"testing"

_ "github.com/linkall-labs/vanus/internal/primitive/transform"
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
metapb "github.com/linkall-labs/vanus/proto/pkg/meta"
. "github.com/smartystreets/goconvey/convey"
Expand Down
1 change: 0 additions & 1 deletion internal/gateway/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/linkall-labs/vanus/client/pkg/policy"
"github.com/linkall-labs/vanus/internal/convert"
"github.com/linkall-labs/vanus/internal/primitive"
_ "github.com/linkall-labs/vanus/internal/primitive/transform"
"github.com/linkall-labs/vanus/internal/primitive/vanus"
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
metapb "github.com/linkall-labs/vanus/proto/pkg/meta"
Expand Down
15 changes: 15 additions & 0 deletions internal/primitive/transform/action/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,18 @@ var (
ErrExist = fmt.Errorf("action have exist")
ErrArgNumber = fmt.Errorf("action arg number invalid")
)

type NestAction interface {
Action
InitAction(actions []Action) error
}

type NestActionImpl struct {
CommonAction
Actions []Action
}

func (c *NestActionImpl) InitAction(actions []Action) error {
c.Actions = actions
return nil
}
1 change: 0 additions & 1 deletion internal/primitive/transform/action/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"testing"

cetest "github.com/cloudevents/sdk-go/v2/test"
_ "github.com/linkall-labs/vanus/internal/primitive/transform"
"github.com/linkall-labs/vanus/internal/primitive/transform/context"
"github.com/linkall-labs/vanus/internal/primitive/transform/runtime"
. "github.com/smartystreets/goconvey/convey"
Expand Down
27 changes: 6 additions & 21 deletions internal/primitive/transform/action/array/foreach_array.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,27 @@ import (
"github.com/linkall-labs/vanus/internal/primitive/transform/arg"
"github.com/linkall-labs/vanus/internal/primitive/transform/common"
"github.com/linkall-labs/vanus/internal/primitive/transform/context"
"github.com/linkall-labs/vanus/internal/primitive/transform/runtime"
"github.com/pkg/errors"
)

// ["foreach_array","array root", function].
type foreachArrayAction struct {
action.CommonAction
actions []action.Action
action.NestActionImpl
}

func NewForeachArrayAction() action.Action {
a := &foreachArrayAction{}
a.CommonAction = action.CommonAction{
ActionName: "FOREACH_ARRAY",
FixedArgs: []arg.TypeList{[]arg.Type{arg.EventData}, []arg.Type{arg.Constant}},
VariadicArg: arg.TypeList{arg.Constant},
ActionName: "FOREACH_ARRAY",
FixedArgs: []arg.TypeList{[]arg.Type{arg.EventData}},
}
return a
}

func (a *foreachArrayAction) Init(args []arg.Arg) error {
a.TargetArg = args[0]
a.Args = args[:1]
a.Args = args
a.ArgTypes = []common.Type{common.Array}
for i := 1; i < len(args); i++ {
v, _ := args[i].Evaluate(nil)
commands, ok := v.([]interface{})
if !ok {
return errors.Errorf("arg %d %s is invalid", i, args[i].Original())
}
_action, err := runtime.NewAction(commands)
if err != nil {
return errors.Wrapf(err, "arg %d %s new action error", i, args[i].Original())
}
a.actions = append(a.actions, _action)
}
return nil
}

Expand All @@ -68,8 +53,8 @@ func (a *foreachArrayAction) Execute(ceCtx *context.EventContext) error {
newCtx := &context.EventContext{
Data: arrayValue[i],
}
for i := range a.actions {
err = a.actions[i].Execute(newCtx)
for i := range a.Actions {
err = a.Actions[i].Execute(newCtx)
if err != nil {
return errors.Wrapf(err, "action %dst execute error", i+1)
}
Expand Down
17 changes: 0 additions & 17 deletions internal/primitive/transform/action/array/init_test.go

This file was deleted.

1 change: 0 additions & 1 deletion internal/primitive/transform/action/common/length_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"testing"

cetest "github.com/cloudevents/sdk-go/v2/test"
_ "github.com/linkall-labs/vanus/internal/primitive/transform"
"github.com/linkall-labs/vanus/internal/primitive/transform/action/common"
"github.com/linkall-labs/vanus/internal/primitive/transform/context"
"github.com/linkall-labs/vanus/internal/primitive/transform/runtime"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"testing"

cetest "github.com/cloudevents/sdk-go/v2/test"
_ "github.com/linkall-labs/vanus/internal/primitive/transform"
"github.com/linkall-labs/vanus/internal/primitive/transform/action/condition"
"github.com/linkall-labs/vanus/internal/primitive/transform/context"
"github.com/linkall-labs/vanus/internal/primitive/transform/runtime"
Expand Down
17 changes: 0 additions & 17 deletions internal/primitive/transform/action/datetime/init_test.go

This file was deleted.

17 changes: 0 additions & 17 deletions internal/primitive/transform/action/math/init_test.go

This file was deleted.

17 changes: 0 additions & 17 deletions internal/primitive/transform/action/source/init_test.go

This file was deleted.

17 changes: 0 additions & 17 deletions internal/primitive/transform/action/strings/init_test.go

This file was deleted.

17 changes: 0 additions & 17 deletions internal/primitive/transform/action/structs/init_test.go

This file was deleted.

59 changes: 43 additions & 16 deletions internal/primitive/transform/runtime/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
package runtime

import (
"fmt"
stdStrs "strings"
"strings"

"github.com/linkall-labs/vanus/internal/primitive/transform/action"
"github.com/linkall-labs/vanus/internal/primitive/transform/arg"
Expand All @@ -29,9 +28,9 @@ var actionMap = map[string]newAction{}

func AddAction(actionFn newAction) error {
a := actionFn()
name := stdStrs.ToUpper(a.Name())
name := strings.ToUpper(a.Name())
if _, exist := actionMap[name]; exist {
return fmt.Errorf("action %s has exist", name)
return errors.Errorf("action %s has exist", name)
}
actionMap[name] = actionFn
return nil
Expand All @@ -40,36 +39,64 @@ func AddAction(actionFn newAction) error {
func NewAction(command []interface{}) (action.Action, error) {
funcName, ok := command[0].(string)
if !ok {
return nil, fmt.Errorf("command name must be string")
return nil, errors.Errorf("command name must be string")
}
actionFn, exist := actionMap[stdStrs.ToUpper(funcName)]
actionFn, exist := actionMap[strings.ToUpper(funcName)]
if !exist {
return nil, fmt.Errorf("command %s not exist", funcName)
return nil, errors.Errorf("command %s not exist", funcName)
}
a := actionFn()
argNum := len(command) - 1
if argNum < a.Arity() {
return nil, fmt.Errorf("command %s arg number is not enough, it need %d but only have %d",
return nil, errors.Errorf("command %s arg number is not enough, it need %d but only have %d",
funcName, a.Arity(), argNum)
}
if argNum > a.Arity() && !a.IsVariadic() {
return nil, fmt.Errorf("command %s arg number is too many, it need %d but have %d", funcName, a.Arity(), argNum)
nestAction, isNestAction := a.(action.NestAction)
if !isNestAction {
if argNum > a.Arity() && !a.IsVariadic() {
return nil, errors.Errorf("command %s arg number is too many, it need %d but have %d", funcName, a.Arity(), argNum)
}
} else {
argNum = a.Arity()
}
args := make([]arg.Arg, argNum)
for i := 1; i < len(command); i++ {
_arg, err := arg.NewArg(command[i])
for i := 0; i < len(args); i++ {
index := i + 1
_arg, err := arg.NewArg(command[index])
if err != nil {
return nil, errors.Wrapf(err, "command %s arg %d is invalid", funcName, i)
return nil, errors.Wrapf(err, "command %s arg %d is invalid", funcName, index)
}
argType := a.ArgType(i - 1)
argType := a.ArgType(i)
if !argType.Contains(_arg) {
return nil, fmt.Errorf("command %s arg %d not support type %s", funcName, i, _arg.Type())
return nil, errors.Errorf("command %s arg %d not support type %s", funcName, index, _arg.Type())
}
args[i-1] = _arg
args[i] = _arg
}
err := a.Init(args)
if err != nil {
return nil, errors.Wrapf(err, "command %s init error", funcName)
}
if isNestAction {
actions := make([]action.Action, len(command)-1-argNum)
if len(actions) == 0 {
return nil, errors.Errorf("command %s arg number is not enough, lost function arg", funcName)
}
for i := 0; i < len(actions); i++ {
index := i + 1 + argNum
if arr, ok := command[index].([]interface{}); ok {
_a, err := NewAction(arr)
if err != nil {
return nil, errors.Wrapf(err, "action %s arg %d new action failed", funcName, index)
}
actions[i] = _a
} else {
return nil, errors.Errorf("arg %d is invalid", index)
}
}
err = nestAction.InitAction(actions)
if err != nil {
return nil, errors.Wrapf(err, "command %s init action error", funcName)
}
}
return a, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package transform_test
package runtime

import (
"testing"

ce "github.com/cloudevents/sdk-go/v2"
cetest "github.com/cloudevents/sdk-go/v2/test"
"github.com/linkall-labs/vanus/internal/primitive/transform/context"
"github.com/linkall-labs/vanus/internal/primitive/transform/runtime"
)

func actionBenchmark(command []interface{}) func(b *testing.B) {
a, err := runtime.NewAction(command)
a, err := NewAction(command)
if err != nil {
panic(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package transform
package runtime

import (
"github.com/linkall-labs/vanus/internal/primitive/transform/action"
"github.com/linkall-labs/vanus/internal/primitive/transform/action/array"
"github.com/linkall-labs/vanus/internal/primitive/transform/action/common"
"github.com/linkall-labs/vanus/internal/primitive/transform/action/condition"
Expand All @@ -24,11 +23,10 @@ import (
"github.com/linkall-labs/vanus/internal/primitive/transform/action/source"
"github.com/linkall-labs/vanus/internal/primitive/transform/action/strings"
"github.com/linkall-labs/vanus/internal/primitive/transform/action/structs"
"github.com/linkall-labs/vanus/internal/primitive/transform/runtime"
)

func init() {
for _, fn := range []func() action.Action{
for _, fn := range []newAction{
// struct
structs.NewCreateAction,
structs.NewDeleteAction,
Expand Down Expand Up @@ -63,7 +61,7 @@ func init() {
// source
source.NewDebeziumConvertToMongoDBSink,
} {
if err := runtime.AddAction(fn); err != nil {
if err := AddAction(fn); err != nil {
panic(err)
}
}
Expand Down

0 comments on commit 2fa3241

Please sign in to comment.