Skip to content

Commit

Permalink
fix ut&lint
Browse files Browse the repository at this point in the history
Signed-off-by: wenfeng <[email protected]>
  • Loading branch information
wenfengwang committed Dec 16, 2022
1 parent 41af6b1 commit 3c55d1b
Show file tree
Hide file tree
Showing 10 changed files with 23 additions and 22 deletions.
2 changes: 1 addition & 1 deletion client/internal/vanus/eventlog/name_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package eventlog
import (
// standard libraries.
"context"
"github.com/linkall-labs/vanus/pkg/cluster"
"math"
"time"

Expand All @@ -28,6 +27,7 @@ import (
// first-party libraries.
"github.com/linkall-labs/vanus/observability/log"
"github.com/linkall-labs/vanus/observability/tracing"
"github.com/linkall-labs/vanus/pkg/cluster"
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
metapb "github.com/linkall-labs/vanus/proto/pkg/meta"

Expand Down
1 change: 1 addition & 0 deletions internal/controller/eventbus/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func (ctrl *controller) CreateEventBus(ctx context.Context,
}

func isValidEventbusName(name string) error {
name = strings.ToLower(name)
for _, v := range name {
if v == '.' || v == '_' || v == '-' {
continue
Expand Down
5 changes: 2 additions & 3 deletions internal/controller/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ var (

const (
defaultGcSubscriptionInterval = time.Second * 10
defaultSystemEventbusEventlog = 1
)

func NewController(config Config, member embedetcd.Member) *controller {
Expand Down Expand Up @@ -521,15 +520,15 @@ func (ctrl *controller) initTriggerSystemEventbus() {
}

if err := ctrl.cl.EventbusService().CreateSystemEventbusIfNotExist(ctx, primitive.RetryEventbusName,
defaultSystemEventbusEventlog, "System Eventbus For Trigger Service"); err != nil {
"System Eventbus For Trigger Service"); err != nil {
log.Error(context.Background(), "failed to create RetryEventbus, exit", map[string]interface{}{
log.KeyError: err,
})
os.Exit(-1)
}

if err := ctrl.cl.EventbusService().CreateSystemEventbusIfNotExist(ctx, primitive.DeadLetterEventbusName,
defaultSystemEventbusEventlog, "System Eventbus For Trigger Service"); err != nil {
"System Eventbus For Trigger Service"); err != nil {
log.Error(context.Background(), "failed to create DeadLetterEventbus, exit", map[string]interface{}{
log.KeyError: err,
})
Expand Down
2 changes: 1 addition & 1 deletion internal/timer/timingwheel/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (b *bucket) createEventbus(ctx context.Context) error {
return nil
}
return b.timingwheel.ctrl.EventbusService().CreateSystemEventbusIfNotExist(ctx, b.eventbus,
1, "System Eventbus For Timing Service")
"System Eventbus For Timing Service")
}

func (b *bucket) connectEventbus(ctx context.Context) {
Expand Down
2 changes: 1 addition & 1 deletion internal/timer/timingwheel/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestBucket_createEventBus(t *testing.T) {
tw.ctrl = mockCl
mockSvc := cluster.NewMockEventbusService(mockCtrl)
mockCl.EXPECT().EventbusService().Times(1).Return(mockSvc)
mockSvc.EXPECT().CreateSystemEventbusIfNotExist(Any(), Any(), Any(), Any()).Times(1).Return(nil)
mockSvc.EXPECT().CreateSystemEventbusIfNotExist(Any(), Any(), Any()).Times(1).Return(nil)

err := bucket.createEventbus(ctx)
So(err, ShouldBeNil)
Expand Down
6 changes: 3 additions & 3 deletions internal/timer/timingwheel/timingwheel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"encoding/json"
stderr "errors"
"fmt"
"io"
"sync"
"time"

ce "github.com/cloudevents/sdk-go/v2"
"github.com/linkall-labs/vanus/client"
Expand All @@ -32,10 +35,7 @@ import (
"github.com/linkall-labs/vanus/pkg/errors"
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
"google.golang.org/grpc/credentials/insecure"
"io"
"k8s.io/apimachinery/pkg/util/wait"
"sync"
"time"
)

const (
Expand Down
8 changes: 4 additions & 4 deletions internal/timer/timingwheel/timingwheel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestTimingWheel_Start(t *testing.T) {
tw.ctrl = mockCl
mockSvc := cluster.NewMockEventbusService(mockCtrl)
mockCl.EXPECT().EventbusService().AnyTimes().Return(mockSvc)
mockSvc.EXPECT().CreateSystemEventbusIfNotExist(Any(), Any(), Any(), Any()).AnyTimes().Return(nil)
mockSvc.EXPECT().CreateSystemEventbusIfNotExist(Any(), Any(), Any()).AnyTimes().Return(nil)
mockSvc.EXPECT().IsExist(Any(), Any()).AnyTimes().Return(true)

Convey("test timingwheel start bucket start success", func() {
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestTimingWheel_startReceivingStation(t *testing.T) {
mockCl.EXPECT().EventbusService().AnyTimes().Return(mockSvc)

Convey("test timingwheel start receiving station with create eventbus failed", func() {
mockSvc.EXPECT().CreateSystemEventbusIfNotExist(Any(), Any(), Any(), Any()).Times(1).Return(errors.New("test"))
mockSvc.EXPECT().CreateSystemEventbusIfNotExist(Any(), Any(), Any()).Times(1).Return(errors.New("test"))
err := tw.startReceivingStation(ctx)
So(err, ShouldNotBeNil)
})
Expand Down Expand Up @@ -371,7 +371,7 @@ func TestTimingWheel_startDistributionStation(t *testing.T) {
mockCl.EXPECT().EventbusService().AnyTimes().Return(mockSvc)

Convey("test timingwheel start distribution station with create eventbus failed", func() {
mockSvc.EXPECT().CreateSystemEventbusIfNotExist(Any(), Any(), Any(), Any()).Times(1).Return(errors.New("test"))
mockSvc.EXPECT().CreateSystemEventbusIfNotExist(Any(), Any(), Any()).Times(1).Return(errors.New("test"))
err := tw.startDistributionStation(ctx)
So(err, ShouldNotBeNil)
})
Expand Down Expand Up @@ -591,7 +591,7 @@ func TestTimingWheelElement_pushBack(t *testing.T) {

Convey("push timing message failure causes start failed", func() {
tw.SetLeader(true)
mockSvc.EXPECT().CreateSystemEventbusIfNotExist(Any(), Any(), Any(), Any()).Times(1).Return(errors.New("test"))
mockSvc.EXPECT().CreateSystemEventbusIfNotExist(Any(), Any(), Any()).Times(1).Return(errors.New("test"))
tm := newTimingMsg(ctx, event(1000))
twe := tw.twList.Back().Value.(*timingWheelElement)
result := twe.pushBack(ctx, tm)
Expand Down
4 changes: 2 additions & 2 deletions pkg/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ package cluster
import (
"context"
"errors"
"google.golang.org/protobuf/types/known/emptypb"
"sync"
"time"

"github.com/linkall-labs/vanus/observability/log"
"github.com/linkall-labs/vanus/pkg/cluster/raw_client"
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/types/known/emptypb"
)

var (
Expand All @@ -51,7 +51,7 @@ type Cluster interface {

type EventbusService interface {
IsExist(ctx context.Context, name string) bool
CreateSystemEventbusIfNotExist(ctx context.Context, name string, logNum int, desc string) error
CreateSystemEventbusIfNotExist(ctx context.Context, name string, desc string) error
Delete(ctx context.Context, name string) error
RawClient() ctrlpb.EventBusControllerClient
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/cluster/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
)

var (
systemEventbusPrefix = "__"
systemEventbusPrefix = "__"
defaultSystemEventbusEventlog = 1
)

type eventbusService struct {
Expand All @@ -28,15 +29,15 @@ func (es *eventbusService) IsExist(ctx context.Context, name string) bool {
return err == nil
}

func (es *eventbusService) CreateSystemEventbusIfNotExist(ctx context.Context, name string, logNum int, desc string) error {
func (es *eventbusService) CreateSystemEventbusIfNotExist(ctx context.Context, name string, desc string) error {
if es.IsExist(ctx, name) {
return nil
}

// TODO 创建前需要等到Store就绪,而store的就绪在controller之后,创建又在controller就绪过程中
_, err := es.client.CreateSystemEventBus(ctx, &ctrlpb.CreateEventBusRequest{
Name: name,
LogNumber: int32(logNum),
LogNumber: int32(defaultSystemEventbusEventlog),
Description: desc,
})
return err
Expand Down
8 changes: 4 additions & 4 deletions pkg/cluster/mock_controller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 3c55d1b

Please sign in to comment.