Skip to content

Commit

Permalink
feat: add eventbus api for checkhealth
Browse files Browse the repository at this point in the history
Signed-off-by: jyjiangkai <[email protected]>
  • Loading branch information
hwjiangkai committed Sep 7, 2023
1 parent 8ac9ade commit 4b2b1bd
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 6 deletions.
2 changes: 2 additions & 0 deletions client/pkg/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Eventbus interface {

GetLog(ctx context.Context, logID uint64, opts ...LogOption) (Eventlog, error)
ListLog(ctx context.Context, opts ...LogOption) ([]Eventlog, error)
CheckHealth(ctx context.Context) error
Close(ctx context.Context)
}

Expand All @@ -46,6 +47,7 @@ type Eventlog interface {
LatestOffset(ctx context.Context) (int64, error)
Length(ctx context.Context) (int64, error)
QueryOffsetByTime(ctx context.Context, timestamp int64) (int64, error)
CheckHealth(ctx context.Context) error
}

func Append(ctx context.Context, w BusWriter, events []*ce.Event, opts ...WriteOption) (eids []string, err error) {
Expand Down
32 changes: 30 additions & 2 deletions client/pkg/api/mock_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions client/pkg/eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,23 @@ func (b *eventbus) ListLog(ctx context.Context, opts ...api.LogOption) ([]api.Ev
}
}

func (b *eventbus) CheckHealth(ctx context.Context) error {
_, span := b.tracer.Start(ctx, "pkg.eventbus.checkhealth")
defer span.End()
if len(b.writableLogs) == 0 {
b.refreshWritableLogs(ctx)
if len(b.writableLogs) == 0 {
return errors.ErrNotWritable.WithMessage("no writable log")
}
}
for _, el := range b.writableLogs {
if err := el.CheckHealth(ctx); err != nil {
return err
}
}
return nil
}

func (b *eventbus) ID() uint64 {
return b.cfg.ID
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ const (
ErrorCode_INTERNAL ErrorCode = 9500
ErrorCode_TRIGGER_WORKER ErrorCode = 9501
ErrorCode_INVALID_SEGMENT ErrorCode = 9502
ErrorCode_INVAILD_HEAETBEAT ErrorCode = 9503
ErrorCode_INVALID_HEARTBEAT ErrorCode = 9503
ErrorCode_VOLUME_NO_SERVER ErrorCode = 9504
ErrorCode_JSON_MARSHAL ErrorCode = 9505
ErrorCode_JSON_UNMARSHAL ErrorCode = 9506
Expand Down Expand Up @@ -82,7 +82,7 @@ const (
ErrorCode_NOT_LEADER ErrorCode = 9700
ErrorCode_NO_CONTROLLER_LEADER ErrorCode = 9701
ErrorCode_NOT_RAFT_LEADER ErrorCode = 9702
ErrorCodeNotReady ErrorCode = 9704
ErrorCode_NOT_READY ErrorCode = 9704

// ErrorCode_RESERVE 98xx

Expand Down Expand Up @@ -129,7 +129,7 @@ var (
ErrInternal = New("internal error").WithGRPCCode(ErrorCode_INTERNAL)
ErrTriggerWorker = New("trigger worker error").WithGRPCCode(ErrorCode_TRIGGER_WORKER)
ErrInvalidSegment = New("invalid segment").WithGRPCCode(ErrorCode_INVALID_SEGMENT)
ErrInvalidHeartBeat = New("invalid heartbeat").WithGRPCCode(ErrorCode_INVAILD_HEAETBEAT)
ErrInvalidHeartBeat = New("invalid heartbeat").WithGRPCCode(ErrorCode_INVALID_HEARTBEAT)
ErrVolumeInstanceNoServer = New("no segment server was bound to volume instance").WithGRPCCode(ErrorCode_VOLUME_NO_SERVER)
ErrJSONMarshal = New("json marshal").WithGRPCCode(ErrorCode_JSON_MARSHAL)
ErrJSONUnMarshal = New("json unmarshal").WithGRPCCode(ErrorCode_JSON_UNMARSHAL)
Expand Down Expand Up @@ -158,7 +158,7 @@ var (

// ErrNotLeader not leader
ErrNotLeader = New("not leader").WithGRPCCode(ErrorCode_NOT_LEADER)
ErrNotReady = New("not ready").WithGRPCCode(ErrorCodeNotReady)
ErrNotReady = New("not ready").WithGRPCCode(ErrorCode_NOT_READY)
ErrNoControllerLeader = New("no leader controller found").WithGRPCCode(ErrorCode_NO_CONTROLLER_LEADER)
ErrNotRaftLeader = New("the node is not raft leader").WithGRPCCode(ErrorCode_NOT_RAFT_LEADER)

Expand Down

0 comments on commit 4b2b1bd

Please sign in to comment.