diff --git a/client/pkg/api/client.go b/client/pkg/api/client.go index 721655c24..955e8b03f 100644 --- a/client/pkg/api/client.go +++ b/client/pkg/api/client.go @@ -29,6 +29,7 @@ type Eventbus interface { GetLog(ctx context.Context, logID uint64, opts ...LogOption) (Eventlog, error) ListLog(ctx context.Context, opts ...LogOption) ([]Eventlog, error) + CheckHealth(ctx context.Context) error Close(ctx context.Context) } @@ -46,6 +47,7 @@ type Eventlog interface { LatestOffset(ctx context.Context) (int64, error) Length(ctx context.Context) (int64, error) QueryOffsetByTime(ctx context.Context, timestamp int64) (int64, error) + CheckHealth(ctx context.Context) error } func Append(ctx context.Context, w BusWriter, events []*ce.Event, opts ...WriteOption) (eids []string, err error) { diff --git a/client/pkg/api/mock_client.go b/client/pkg/api/mock_client.go index 6c61a5192..d0593c6cf 100644 --- a/client/pkg/api/mock_client.go +++ b/client/pkg/api/mock_client.go @@ -35,6 +35,20 @@ func (m *MockEventbus) EXPECT() *MockEventbusMockRecorder { return m.recorder } +// CheckHealth mocks base method. +func (m *MockEventbus) CheckHealth(ctx context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CheckHealth", ctx) + ret0, _ := ret[0].(error) + return ret0 +} + +// CheckHealth indicates an expected call of CheckHealth. +func (mr *MockEventbusMockRecorder) CheckHealth(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckHealth", reflect.TypeOf((*MockEventbus)(nil).CheckHealth), ctx) +} + // Close mocks base method. func (m *MockEventbus) Close(ctx context.Context) { m.ctrl.T.Helper() @@ -234,6 +248,20 @@ func (m *MockEventlog) EXPECT() *MockEventlogMockRecorder { return m.recorder } +// CheckHealth mocks base method. +func (m *MockEventlog) CheckHealth(ctx context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CheckHealth", ctx) + ret0, _ := ret[0].(error) + return ret0 +} + +// CheckHealth indicates an expected call of CheckHealth. +func (mr *MockEventlogMockRecorder) CheckHealth(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckHealth", reflect.TypeOf((*MockEventlog)(nil).CheckHealth), ctx) +} + // EarliestOffset mocks base method. func (m *MockEventlog) EarliestOffset(ctx context.Context) (int64, error) { m.ctrl.T.Helper() @@ -252,7 +280,7 @@ func (mr *MockEventlogMockRecorder) EarliestOffset(ctx interface{}) *gomock.Call // ID mocks base method. func (m *MockEventlog) ID() uint64 { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "VolumeID") + ret := m.ctrl.Call(m, "ID") ret0, _ := ret[0].(uint64) return ret0 } @@ -260,7 +288,7 @@ func (m *MockEventlog) ID() uint64 { // ID indicates an expected call of ID. func (mr *MockEventlogMockRecorder) ID() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "VolumeID", reflect.TypeOf((*MockEventlog)(nil).ID)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ID", reflect.TypeOf((*MockEventlog)(nil).ID)) } // LatestOffset mocks base method. diff --git a/client/pkg/eventbus/eventbus.go b/client/pkg/eventbus/eventbus.go index 6e92e78b8..09bdf6b72 100644 --- a/client/pkg/eventbus/eventbus.go +++ b/client/pkg/eventbus/eventbus.go @@ -231,6 +231,23 @@ func (b *eventbus) ListLog(ctx context.Context, opts ...api.LogOption) ([]api.Ev } } +func (b *eventbus) CheckHealth(ctx context.Context) error { + _, span := b.tracer.Start(ctx, "pkg.eventbus.checkhealth") + defer span.End() + if len(b.writableLogs) == 0 { + b.refreshWritableLogs(ctx) + if len(b.writableLogs) == 0 { + return errors.ErrNotWritable.WithMessage("no writable log") + } + } + for _, el := range b.writableLogs { + if err := el.CheckHealth(ctx); err != nil { + return err + } + } + return nil +} + func (b *eventbus) ID() uint64 { return b.cfg.ID } diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 4898c4e05..2e8eb412e 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -53,7 +53,7 @@ const ( ErrorCode_INTERNAL ErrorCode = 9500 ErrorCode_TRIGGER_WORKER ErrorCode = 9501 ErrorCode_INVALID_SEGMENT ErrorCode = 9502 - ErrorCode_INVAILD_HEAETBEAT ErrorCode = 9503 + ErrorCode_INVALID_HEARTBEAT ErrorCode = 9503 ErrorCode_VOLUME_NO_SERVER ErrorCode = 9504 ErrorCode_JSON_MARSHAL ErrorCode = 9505 ErrorCode_JSON_UNMARSHAL ErrorCode = 9506 @@ -82,7 +82,7 @@ const ( ErrorCode_NOT_LEADER ErrorCode = 9700 ErrorCode_NO_CONTROLLER_LEADER ErrorCode = 9701 ErrorCode_NOT_RAFT_LEADER ErrorCode = 9702 - ErrorCodeNotReady ErrorCode = 9704 + ErrorCode_NOT_READY ErrorCode = 9704 // ErrorCode_RESERVE 98xx @@ -129,7 +129,7 @@ var ( ErrInternal = New("internal error").WithGRPCCode(ErrorCode_INTERNAL) ErrTriggerWorker = New("trigger worker error").WithGRPCCode(ErrorCode_TRIGGER_WORKER) ErrInvalidSegment = New("invalid segment").WithGRPCCode(ErrorCode_INVALID_SEGMENT) - ErrInvalidHeartBeat = New("invalid heartbeat").WithGRPCCode(ErrorCode_INVAILD_HEAETBEAT) + ErrInvalidHeartBeat = New("invalid heartbeat").WithGRPCCode(ErrorCode_INVALID_HEARTBEAT) ErrVolumeInstanceNoServer = New("no segment server was bound to volume instance").WithGRPCCode(ErrorCode_VOLUME_NO_SERVER) ErrJSONMarshal = New("json marshal").WithGRPCCode(ErrorCode_JSON_MARSHAL) ErrJSONUnMarshal = New("json unmarshal").WithGRPCCode(ErrorCode_JSON_UNMARSHAL) @@ -158,7 +158,7 @@ var ( // ErrNotLeader not leader ErrNotLeader = New("not leader").WithGRPCCode(ErrorCode_NOT_LEADER) - ErrNotReady = New("not ready").WithGRPCCode(ErrorCodeNotReady) + ErrNotReady = New("not ready").WithGRPCCode(ErrorCode_NOT_READY) ErrNoControllerLeader = New("no leader controller found").WithGRPCCode(ErrorCode_NO_CONTROLLER_LEADER) ErrNotRaftLeader = New("the node is not raft leader").WithGRPCCode(ErrorCode_NOT_RAFT_LEADER)