From 3c55d1b5dfe9fbdd94c706104e951353ba68c329 Mon Sep 17 00:00:00 2001 From: wenfeng Date: Thu, 15 Dec 2022 17:21:52 +0800 Subject: [PATCH] fix ut&lint Signed-off-by: wenfeng --- client/internal/vanus/eventlog/name_service.go | 2 +- internal/controller/eventbus/controller.go | 1 + internal/controller/trigger/controller.go | 5 ++--- internal/timer/timingwheel/bucket.go | 2 +- internal/timer/timingwheel/bucket_test.go | 2 +- internal/timer/timingwheel/timingwheel.go | 6 +++--- internal/timer/timingwheel/timingwheel_test.go | 8 ++++---- pkg/cluster/controller.go | 4 ++-- pkg/cluster/eventbus.go | 7 ++++--- pkg/cluster/mock_controller.go | 8 ++++---- 10 files changed, 23 insertions(+), 22 deletions(-) diff --git a/client/internal/vanus/eventlog/name_service.go b/client/internal/vanus/eventlog/name_service.go index 1fe325ba2..b30abceb3 100644 --- a/client/internal/vanus/eventlog/name_service.go +++ b/client/internal/vanus/eventlog/name_service.go @@ -17,7 +17,6 @@ package eventlog import ( // standard libraries. "context" - "github.com/linkall-labs/vanus/pkg/cluster" "math" "time" @@ -28,6 +27,7 @@ import ( // first-party libraries. "github.com/linkall-labs/vanus/observability/log" "github.com/linkall-labs/vanus/observability/tracing" + "github.com/linkall-labs/vanus/pkg/cluster" ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller" metapb "github.com/linkall-labs/vanus/proto/pkg/meta" diff --git a/internal/controller/eventbus/controller.go b/internal/controller/eventbus/controller.go index 04a83232b..0c894fa45 100644 --- a/internal/controller/eventbus/controller.go +++ b/internal/controller/eventbus/controller.go @@ -121,6 +121,7 @@ func (ctrl *controller) CreateEventBus(ctx context.Context, } func isValidEventbusName(name string) error { + name = strings.ToLower(name) for _, v := range name { if v == '.' || v == '_' || v == '-' { continue diff --git a/internal/controller/trigger/controller.go b/internal/controller/trigger/controller.go index f1b134240..3ba5243a2 100644 --- a/internal/controller/trigger/controller.go +++ b/internal/controller/trigger/controller.go @@ -48,7 +48,6 @@ var ( const ( defaultGcSubscriptionInterval = time.Second * 10 - defaultSystemEventbusEventlog = 1 ) func NewController(config Config, member embedetcd.Member) *controller { @@ -521,7 +520,7 @@ func (ctrl *controller) initTriggerSystemEventbus() { } if err := ctrl.cl.EventbusService().CreateSystemEventbusIfNotExist(ctx, primitive.RetryEventbusName, - defaultSystemEventbusEventlog, "System Eventbus For Trigger Service"); err != nil { + "System Eventbus For Trigger Service"); err != nil { log.Error(context.Background(), "failed to create RetryEventbus, exit", map[string]interface{}{ log.KeyError: err, }) @@ -529,7 +528,7 @@ func (ctrl *controller) initTriggerSystemEventbus() { } if err := ctrl.cl.EventbusService().CreateSystemEventbusIfNotExist(ctx, primitive.DeadLetterEventbusName, - defaultSystemEventbusEventlog, "System Eventbus For Trigger Service"); err != nil { + "System Eventbus For Trigger Service"); err != nil { log.Error(context.Background(), "failed to create DeadLetterEventbus, exit", map[string]interface{}{ log.KeyError: err, }) diff --git a/internal/timer/timingwheel/bucket.go b/internal/timer/timingwheel/bucket.go index 35611fe29..c4903ada1 100644 --- a/internal/timer/timingwheel/bucket.go +++ b/internal/timer/timingwheel/bucket.go @@ -329,7 +329,7 @@ func (b *bucket) createEventbus(ctx context.Context) error { return nil } return b.timingwheel.ctrl.EventbusService().CreateSystemEventbusIfNotExist(ctx, b.eventbus, - 1, "System Eventbus For Timing Service") + "System Eventbus For Timing Service") } func (b *bucket) connectEventbus(ctx context.Context) { diff --git a/internal/timer/timingwheel/bucket_test.go b/internal/timer/timingwheel/bucket_test.go index ffbadf8c4..8ef2cbf0a 100644 --- a/internal/timer/timingwheel/bucket_test.go +++ b/internal/timer/timingwheel/bucket_test.go @@ -256,7 +256,7 @@ func TestBucket_createEventBus(t *testing.T) { tw.ctrl = mockCl mockSvc := cluster.NewMockEventbusService(mockCtrl) mockCl.EXPECT().EventbusService().Times(1).Return(mockSvc) - mockSvc.EXPECT().CreateSystemEventbusIfNotExist(Any(), Any(), Any(), Any()).Times(1).Return(nil) + mockSvc.EXPECT().CreateSystemEventbusIfNotExist(Any(), Any(), Any()).Times(1).Return(nil) err := bucket.createEventbus(ctx) So(err, ShouldBeNil) diff --git a/internal/timer/timingwheel/timingwheel.go b/internal/timer/timingwheel/timingwheel.go index 4e61ea262..63d18444b 100644 --- a/internal/timer/timingwheel/timingwheel.go +++ b/internal/timer/timingwheel/timingwheel.go @@ -20,6 +20,9 @@ import ( "encoding/json" stderr "errors" "fmt" + "io" + "sync" + "time" ce "github.com/cloudevents/sdk-go/v2" "github.com/linkall-labs/vanus/client" @@ -32,10 +35,7 @@ import ( "github.com/linkall-labs/vanus/pkg/errors" ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller" "google.golang.org/grpc/credentials/insecure" - "io" "k8s.io/apimachinery/pkg/util/wait" - "sync" - "time" ) const ( diff --git a/internal/timer/timingwheel/timingwheel_test.go b/internal/timer/timingwheel/timingwheel_test.go index f4c66a517..e60a17d13 100644 --- a/internal/timer/timingwheel/timingwheel_test.go +++ b/internal/timer/timingwheel/timingwheel_test.go @@ -86,7 +86,7 @@ func TestTimingWheel_Start(t *testing.T) { tw.ctrl = mockCl mockSvc := cluster.NewMockEventbusService(mockCtrl) mockCl.EXPECT().EventbusService().AnyTimes().Return(mockSvc) - mockSvc.EXPECT().CreateSystemEventbusIfNotExist(Any(), Any(), Any(), Any()).AnyTimes().Return(nil) + mockSvc.EXPECT().CreateSystemEventbusIfNotExist(Any(), Any(), Any()).AnyTimes().Return(nil) mockSvc.EXPECT().IsExist(Any(), Any()).AnyTimes().Return(true) Convey("test timingwheel start bucket start success", func() { @@ -265,7 +265,7 @@ func TestTimingWheel_startReceivingStation(t *testing.T) { mockCl.EXPECT().EventbusService().AnyTimes().Return(mockSvc) Convey("test timingwheel start receiving station with create eventbus failed", func() { - mockSvc.EXPECT().CreateSystemEventbusIfNotExist(Any(), Any(), Any(), Any()).Times(1).Return(errors.New("test")) + mockSvc.EXPECT().CreateSystemEventbusIfNotExist(Any(), Any(), Any()).Times(1).Return(errors.New("test")) err := tw.startReceivingStation(ctx) So(err, ShouldNotBeNil) }) @@ -371,7 +371,7 @@ func TestTimingWheel_startDistributionStation(t *testing.T) { mockCl.EXPECT().EventbusService().AnyTimes().Return(mockSvc) Convey("test timingwheel start distribution station with create eventbus failed", func() { - mockSvc.EXPECT().CreateSystemEventbusIfNotExist(Any(), Any(), Any(), Any()).Times(1).Return(errors.New("test")) + mockSvc.EXPECT().CreateSystemEventbusIfNotExist(Any(), Any(), Any()).Times(1).Return(errors.New("test")) err := tw.startDistributionStation(ctx) So(err, ShouldNotBeNil) }) @@ -591,7 +591,7 @@ func TestTimingWheelElement_pushBack(t *testing.T) { Convey("push timing message failure causes start failed", func() { tw.SetLeader(true) - mockSvc.EXPECT().CreateSystemEventbusIfNotExist(Any(), Any(), Any(), Any()).Times(1).Return(errors.New("test")) + mockSvc.EXPECT().CreateSystemEventbusIfNotExist(Any(), Any(), Any()).Times(1).Return(errors.New("test")) tm := newTimingMsg(ctx, event(1000)) twe := tw.twList.Back().Value.(*timingWheelElement) result := twe.pushBack(ctx, tm) diff --git a/pkg/cluster/controller.go b/pkg/cluster/controller.go index 4442ca9b7..7393d7534 100644 --- a/pkg/cluster/controller.go +++ b/pkg/cluster/controller.go @@ -18,7 +18,6 @@ package cluster import ( "context" "errors" - "google.golang.org/protobuf/types/known/emptypb" "sync" "time" @@ -26,6 +25,7 @@ import ( "github.com/linkall-labs/vanus/pkg/cluster/raw_client" ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller" "google.golang.org/grpc/credentials" + "google.golang.org/protobuf/types/known/emptypb" ) var ( @@ -51,7 +51,7 @@ type Cluster interface { type EventbusService interface { IsExist(ctx context.Context, name string) bool - CreateSystemEventbusIfNotExist(ctx context.Context, name string, logNum int, desc string) error + CreateSystemEventbusIfNotExist(ctx context.Context, name string, desc string) error Delete(ctx context.Context, name string) error RawClient() ctrlpb.EventBusControllerClient } diff --git a/pkg/cluster/eventbus.go b/pkg/cluster/eventbus.go index 3502c2aed..c0b938ebb 100644 --- a/pkg/cluster/eventbus.go +++ b/pkg/cluster/eventbus.go @@ -10,7 +10,8 @@ import ( ) var ( - systemEventbusPrefix = "__" + systemEventbusPrefix = "__" + defaultSystemEventbusEventlog = 1 ) type eventbusService struct { @@ -28,7 +29,7 @@ func (es *eventbusService) IsExist(ctx context.Context, name string) bool { return err == nil } -func (es *eventbusService) CreateSystemEventbusIfNotExist(ctx context.Context, name string, logNum int, desc string) error { +func (es *eventbusService) CreateSystemEventbusIfNotExist(ctx context.Context, name string, desc string) error { if es.IsExist(ctx, name) { return nil } @@ -36,7 +37,7 @@ func (es *eventbusService) CreateSystemEventbusIfNotExist(ctx context.Context, n // TODO 创建前需要等到Store就绪,而store的就绪在controller之后,创建又在controller就绪过程中 _, err := es.client.CreateSystemEventBus(ctx, &ctrlpb.CreateEventBusRequest{ Name: name, - LogNumber: int32(logNum), + LogNumber: int32(defaultSystemEventbusEventlog), Description: desc, }) return err diff --git a/pkg/cluster/mock_controller.go b/pkg/cluster/mock_controller.go index 131a81de9..ecf03ac26 100644 --- a/pkg/cluster/mock_controller.go +++ b/pkg/cluster/mock_controller.go @@ -172,17 +172,17 @@ func (m *MockEventbusService) EXPECT() *MockEventbusServiceMockRecorder { } // CreateSystemEventbusIfNotExist mocks base method. -func (m *MockEventbusService) CreateSystemEventbusIfNotExist(ctx context.Context, name string, logNum int, desc string) error { +func (m *MockEventbusService) CreateSystemEventbusIfNotExist(ctx context.Context, name, desc string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateSystemEventbusIfNotExist", ctx, name, logNum, desc) + ret := m.ctrl.Call(m, "CreateSystemEventbusIfNotExist", ctx, name, desc) ret0, _ := ret[0].(error) return ret0 } // CreateSystemEventbusIfNotExist indicates an expected call of CreateSystemEventbusIfNotExist. -func (mr *MockEventbusServiceMockRecorder) CreateSystemEventbusIfNotExist(ctx, name, logNum, desc interface{}) *gomock.Call { +func (mr *MockEventbusServiceMockRecorder) CreateSystemEventbusIfNotExist(ctx, name, desc interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateSystemEventbusIfNotExist", reflect.TypeOf((*MockEventbusService)(nil).CreateSystemEventbusIfNotExist), ctx, name, logNum, desc) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateSystemEventbusIfNotExist", reflect.TypeOf((*MockEventbusService)(nil).CreateSystemEventbusIfNotExist), ctx, name, desc) } // Delete mocks base method.