Skip to content

Commit

Permalink
feat: introduce namespace (#522)
Browse files Browse the repository at this point in the history
* feat: support namespace

* feat: support namespace

* update

* fix lint and ut

* fix lint

* fix bugs

* update

* update

* fix lint and ut

* fix lint
  • Loading branch information
wenfengwang authored Mar 13, 2023
1 parent eee446a commit 20e543c
Show file tree
Hide file tree
Showing 29 changed files with 906 additions and 702 deletions.
74 changes: 36 additions & 38 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,47 +78,10 @@ func main() {
os.Exit(-1)
}

// TODO wait server ready
snowflakeCtrl := snowflake.NewSnowflakeController(cfg.GetSnowflakeConfig(), mem)
if err = snowflakeCtrl.Start(ctx); err != nil {
log.Error(ctx, "start Snowflake Controller failed", map[string]interface{}{
log.KeyError: err,
})
os.Exit(-1)
}

// namespace controller
namespaceCtrlStv := tenant.NewController(cfg.GetTenantConfig(), mem)
if err = namespaceCtrlStv.Start(); err != nil {
log.Error(ctx, "start namespace controller fail", map[string]interface{}{
log.KeyError: err,
})
os.Exit(-1)
}

segmentCtrl := eventbus.NewController(cfg.GetEventbusCtrlConfig(), mem)
if err = segmentCtrl.Start(ctx); err != nil {
log.Error(ctx, "start EventbusService Controller failed", map[string]interface{}{
log.KeyError: err,
})
os.Exit(-1)
}

// trigger controller
triggerCtrlStv := trigger.NewController(cfg.GetTriggerConfig(), mem)
if err = triggerCtrlStv.Start(); err != nil {
log.Error(ctx, "start trigger controller fail", map[string]interface{}{
log.KeyError: err,
})
os.Exit(-1)
}

if err = mem.Start(ctx); err != nil {
log.Error(ctx, "failed to start member", map[string]interface{}{
log.KeyError: err,
})
os.Exit(-2)
}

recoveryOpt := recovery.WithRecoveryHandlerContext(
func(ctx context.Context, p interface{}) error {
Expand Down Expand Up @@ -150,12 +113,12 @@ func main() {
reflection.Register(grpcServer)
}

ctrlpb.RegisterPingServerServer(grpcServer, segmentCtrl)
ctrlpb.RegisterSnowflakeControllerServer(grpcServer, snowflakeCtrl)
ctrlpb.RegisterNamespaceControllerServer(grpcServer, namespaceCtrlStv)
ctrlpb.RegisterEventbusControllerServer(grpcServer, segmentCtrl)
ctrlpb.RegisterEventlogControllerServer(grpcServer, segmentCtrl)
ctrlpb.RegisterSegmentControllerServer(grpcServer, segmentCtrl)
ctrlpb.RegisterPingServerServer(grpcServer, segmentCtrl)
ctrlpb.RegisterTriggerControllerServer(grpcServer, triggerCtrlStv)
log.Info(ctx, "the grpc server ready to work", nil)
wg := sync.WaitGroup{}
Expand All @@ -170,6 +133,41 @@ func main() {
wg.Done()
}()

if err = snowflakeCtrl.Start(ctx); err != nil {
log.Error(ctx, "start Snowflake Controller failed", map[string]interface{}{
log.KeyError: err,
})
os.Exit(-1)
}

if err = namespaceCtrlStv.Start(); err != nil {
log.Error(ctx, "start namespace controller fail", map[string]interface{}{
log.KeyError: err,
})
os.Exit(-1)
}

if err = segmentCtrl.Start(ctx); err != nil {
log.Error(ctx, "start EventbusService Controller failed", map[string]interface{}{
log.KeyError: err,
})
os.Exit(-1)
}

if err = triggerCtrlStv.Start(); err != nil {
log.Error(ctx, "start trigger controller fail", map[string]interface{}{
log.KeyError: err,
})
os.Exit(-1)
}

if err = mem.Start(ctx); err != nil {
log.Error(ctx, "failed to start member", map[string]interface{}{
log.KeyError: err,
})
os.Exit(-2)
}

exit := func() {
vanus.DestroySnowflake()
snowflakeCtrl.Stop()
Expand Down
124 changes: 83 additions & 41 deletions internal/controller/eventbus/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,6 @@ import (
"sync/atomic"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/vanus-labs/vanus/observability/log"
"github.com/vanus-labs/vanus/observability/metrics"
"github.com/vanus-labs/vanus/pkg/errors"
"github.com/vanus-labs/vanus/pkg/util"
ctrlpb "github.com/vanus-labs/vanus/proto/pkg/controller"
metapb "github.com/vanus-labs/vanus/proto/pkg/meta"

"github.com/vanus-labs/vanus/internal/controller/eventbus/eventlog"
"github.com/vanus-labs/vanus/internal/controller/eventbus/metadata"
"github.com/vanus-labs/vanus/internal/controller/eventbus/server"
Expand All @@ -47,6 +35,18 @@ import (
"github.com/vanus-labs/vanus/internal/kv/etcd"
"github.com/vanus-labs/vanus/internal/primitive"
"github.com/vanus-labs/vanus/internal/primitive/vanus"
"github.com/vanus-labs/vanus/observability/log"
"github.com/vanus-labs/vanus/observability/metrics"
"github.com/vanus-labs/vanus/pkg/cluster"
"github.com/vanus-labs/vanus/pkg/errors"
"github.com/vanus-labs/vanus/pkg/util"
ctrlpb "github.com/vanus-labs/vanus/proto/pkg/controller"
metapb "github.com/vanus-labs/vanus/proto/pkg/meta"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/wrapperspb"
)

var (
Expand All @@ -58,6 +58,7 @@ var (

const (
maximumEventlogNum = 64
mappingKey = "@%d_%s@" // @{namespace_id}_{eventbus}@
)

func NewController(cfg Config, mem member.Member) *controller {
Expand All @@ -76,33 +77,24 @@ func NewController(cfg Config, mem member.Member) *controller {
}

type controller struct {
cfg *Config
kvStore kv.Client
volumeMgr volume.Manager
eventlogMgr eventlog.Manager
ssMgr server.Manager
eventbusMap map[vanus.ID]*metadata.Eventbus
member member.Member
cancelCtx context.Context
cancelFunc context.CancelFunc
membershipMutex sync.Mutex
isLeader bool
readyNotify chan error
stopNotify chan error
mutex sync.Mutex
eventbusUpdatedCount int64
eventbusDeletedCount int64
}

func (ctrl *controller) GetEventbusWithHumanFriendly(
ctx context.Context, request *ctrlpb.GetEventbusWithHumanFriendlyRequest,
) (*metapb.Eventbus, error) {
for id, eventbus := range ctrl.eventbusMap {
if eventbus.Name == request.EventbusName {
return ctrl.getEventbus(id)
}
}
return nil, errors.ErrResourceNotFound.WithMessage("eventbus not found")
cfg *Config
kvStore kv.Client
volumeMgr volume.Manager
eventlogMgr eventlog.Manager
ssMgr server.Manager
eventbusMap map[vanus.ID]*metadata.Eventbus
eventbusNamespaceMapping sync.Map // string, *metadata.Eventbus
member member.Member
cancelCtx context.Context
cancelFunc context.CancelFunc
membershipMutex sync.Mutex
isLeader bool
readyNotify chan error
stopNotify chan error
mutex sync.Mutex
eventbusUpdatedCount int64
eventbusDeletedCount int64
clusterCli cluster.Cluster
}

func (ctrl *controller) Start(_ context.Context) error {
Expand All @@ -112,7 +104,12 @@ func (ctrl *controller) Start(_ context.Context) error {
}
ctrl.kvStore = store
ctrl.cancelCtx, ctrl.cancelFunc = context.WithCancel(context.Background())
go ctrl.member.RegisterMembershipChangedProcessor(ctrl.membershipChangedProcessor)
ctrl.member.RegisterMembershipChangedProcessor(ctrl.membershipChangedProcessor)
var endpoints = make([]string, 0, len(ctrl.cfg.Topology))
for _, v := range ctrl.cfg.Topology {
endpoints = append(endpoints, v)
}
ctrl.clusterCli = cluster.NewClusterController(endpoints, insecure.NewCredentials())
go ctrl.recordMetrics()
return nil
}
Expand All @@ -132,18 +129,33 @@ func (ctrl *controller) StopNotify() <-chan error {
func (ctrl *controller) CreateEventbus(
ctx context.Context, req *ctrlpb.CreateEventbusRequest,
) (*metapb.Eventbus, error) {
pb, err := ctrl.clusterCli.NamespaceService().GetNamespace(ctx, req.NamespaceId)
if err != nil {
return nil, err
}

if pb == nil {
return nil, errors.ErrResourceNotFound.WithMessage("namespace not found")
}

if err := isValidEventbusName(req.Name); err != nil {
return nil, err
}
eb, err := ctrl.createEventbus(ctx, req)
if err != nil {
return nil, err
}

// TODO async create
// create dead letter eventbus
pb, err = ctrl.clusterCli.NamespaceService().GetSystemNamespace(ctx)
if err != nil {
return nil, err
}
_, err = ctrl.createEventbus(context.Background(), &ctrlpb.CreateEventbusRequest{
Name: primitive.GetDeadLetterEventbusName(vanus.NewIDFromUint64(eb.Id)),
LogNumber: 1,
NamespaceId: pb.Id,
Description: "System DeadLetter Eventbus For " + req.Name,
})
if err != nil {
Expand Down Expand Up @@ -176,12 +188,33 @@ func isValidEventbusName(name string) error {
return nil
}

func (ctrl *controller) GetEventbusWithHumanFriendly(_ context.Context,
request *ctrlpb.GetEventbusWithHumanFriendlyRequest) (*metapb.Eventbus, error) {
meta, exist := ctrl.eventbusNamespaceMapping.Load(GetMappingKey(request.GetNamespaceId(), request.GetEventbusName()))
if !exist {
return nil, errors.ErrResourceNotFound.WithMessage("eventbus not found")
}
return metadata.Convert2ProtoEventbus(meta.(*metadata.Eventbus))[0], nil
}

func GetMappingKey(namespace uint64, name string) string {
return fmt.Sprintf(mappingKey, namespace, name)
}

func (ctrl *controller) CreateSystemEventbus(
ctx context.Context, req *ctrlpb.CreateEventbusRequest,
) (*metapb.Eventbus, error) {
if !strings.HasPrefix(req.Name, primitive.SystemEventbusNamePrefix) {
return nil, errors.ErrInvalidRequest.WithMessage("system eventbus must start with __")
}
pb, err := ctrl.clusterCli.NamespaceService().GetSystemNamespace(ctx)
if err != nil {
return nil, err
}

if req.NamespaceId != pb.Id {
return nil, errors.ErrInvalidRequest.WithMessage("invalid system namespace id")
}
return ctrl.createEventbus(ctx, req)
}

Expand All @@ -194,6 +227,10 @@ func (ctrl *controller) createEventbus(
return nil, errors.ErrResourceCanNotOp.WithMessage(
"the cluster isn't ready for create eventbus")
}
if req.NamespaceId == 0 {
return nil, errors.ErrInvalidRequest.WithMessage(
"namespace_id can't be 0")
}
logNum := req.LogNumber
if logNum == 0 {
logNum = 1
Expand All @@ -218,6 +255,7 @@ func (ctrl *controller) createEventbus(
Description: req.Description,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
NamespaceID: req.NamespaceId,
}
exist, err := ctrl.kvStore.Exists(ctx, metadata.GetEventbusMetadataKey(id))
if err != nil {
Expand All @@ -233,14 +271,17 @@ func (ctrl *controller) createEventbus(
}
eb.Eventlogs[idx] = el
}
ctrl.eventbusMap[eb.ID] = eb

{
data, _ := json.Marshal(eb)
if err := ctrl.kvStore.Set(ctx, metadata.GetEventbusMetadataKey(id), data); err != nil {
return nil, err
}
}

ctrl.eventbusMap[eb.ID] = eb
ctrl.eventbusNamespaceMapping.Store(GetMappingKey(eb.NamespaceID, eb.Name), eb)

return ctrl.getEventbus(eb.ID)
}

Expand Down Expand Up @@ -677,6 +718,7 @@ func (ctrl *controller) loadEventbus(ctx context.Context) error {
return err
}
ctrl.eventbusMap[busInfo.ID] = busInfo
ctrl.eventbusNamespaceMapping.Store(GetMappingKey(busInfo.NamespaceID, busInfo.Name), busInfo)
}
return nil
}
Expand Down
Loading

0 comments on commit 20e543c

Please sign in to comment.