Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding bulk call for alpha to inform zero about the tablets #8089

Merged
merged 3 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,25 @@ func (n *node) regenerateChecksum() {
}
}

func (n *node) handleTabletProposal(tablet *pb.Tablet) error {
func (n *node) handleBulkTabletProposal(tablets []*pb.Tablet) error {
n.server.AssertLock()
state := n.server.state

defer n.regenerateChecksum()
for _, tablet := range tablets {
if err := n.handleTablet(tablet); err != nil {
glog.Warningf("not able to handle tablet %s. Got err: %+v", tablet.GetPredicate(), err)
}
}

return nil
}

// handleTablet will check if the given tablet is served by any group.
// If not the tablet will be added to the current group predicate list
//
// This function doesn't take any locks.
// It is the calling functions responsibility to manage the concurrency.
func (n *node) handleTablet(tablet *pb.Tablet) error {
state := n.server.state
if tablet.GroupId == 0 {
return errors.Errorf("Tablet group id is zero: %+v", tablet)
}
Expand Down Expand Up @@ -339,6 +352,12 @@ func (n *node) handleTabletProposal(tablet *pb.Tablet) error {
return nil
}

func (n *node) handleTabletProposal(tablet *pb.Tablet) error {
n.server.AssertLock()
defer n.regenerateChecksum()
return n.handleTablet(tablet)
}

func (n *node) deleteNamespace(delNs uint64) error {
n.server.AssertLock()
state := n.server.state
Expand Down Expand Up @@ -431,6 +450,15 @@ func (n *node) applyProposal(e raftpb.Entry) (uint64, error) {
return key, err
}
}

if p.Tablets != nil && len(p.Tablets) > 0 {
if err := n.handleBulkTabletProposal(p.Tablets); err != nil {
span.Annotatef(nil, "While applying bulk tablet proposal: %v", err)
glog.Errorf("While applying bulk tablet proposal: %v", err)
return key, err
}
}

if p.License != nil {
// Check that the number of nodes in the cluster should be less than MaxNodes, otherwise
// reject the proposal.
Expand Down
77 changes: 69 additions & 8 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,8 @@ func (s *Server) ServingTablet(tablet string) *pb.Tablet {
defer s.RUnlock()

for _, group := range s.state.Groups {
for key, tab := range group.Tablets {
if key == tablet {
return tab
}
if tab, ok := group.Tablets[tablet]; ok {
return tab
}
}
return nil
Expand All @@ -339,10 +337,8 @@ func (s *Server) servingTablet(tablet string) *pb.Tablet {
s.AssertRLock()

for _, group := range s.state.Groups {
for key, tab := range group.Tablets {
if key == tablet {
return tab
}
if tab, ok := group.Tablets[tablet]; ok {
return tab
}
}
return nil
Expand Down Expand Up @@ -408,6 +404,71 @@ func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {
return res, nil
}

func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletResponse, error) {
ctx, span := otrace.StartSpan(ctx, "Zero.Inform")
defer span.End()
if req == nil || len(req.Tablets) == 0 {
return nil, errors.Errorf("Tablets are empty in %+v", req)
}

if req.GroupId == 0 {
return nil, errors.Errorf("Group ID is Zero in %+v", req)
}

tablets := make([]*pb.Tablet, 0)
unknownTablets := make([]*pb.Tablet, 0)
for _, t := range req.Tablets {
tab := s.ServingTablet(t.Predicate)
span.Annotatef(nil, "Tablet for %s: %+v", t.Predicate, tab)
switch {
case tab != nil && !t.Force:
tablets = append(tablets, t)
case t.ReadOnly:
tablets = append(tablets, &pb.Tablet{})
default:
unknownTablets = append(unknownTablets, t)
}
}

if len(unknownTablets) == 0 {
return &pb.TabletResponse{
Tablets: tablets,
}, nil
}

// Set the tablet to be served by this server's group.
var proposal pb.ZeroProposal
proposal.Tablets = make([]*pb.Tablet, 0)
for _, t := range unknownTablets {
if x.IsReservedPredicate(t.Predicate) {
// Force all the reserved predicates to be allocated to group 1.
// This is to make it easier to stream ACL updates to all alpha servers
// since they only need to open one pipeline to receive updates for all
// ACL predicates.
// This will also make it easier to restore the reserved predicates after
// a DropAll operation.
t.GroupId = 1
}
proposal.Tablets = append(proposal.Tablets, t)
}

if err := s.Node.proposeAndWait(ctx, &proposal); err != nil && err != errTabletAlreadyServed {
span.Annotatef(nil, "While proposing tablet: %v", err)
return nil, err
}

for _, t := range unknownTablets {
tab := s.ServingTablet(t.Predicate)
x.AssertTrue(tab != nil)
span.Annotatef(nil, "Now serving tablet for %s: %+v", t.Predicate, tab)
tablets = append(tablets, tab)
}

return &pb.TabletResponse{
Tablets: tablets,
}, nil
}

// RemoveNode removes the given node from the given group.
// It's the user's responsibility to ensure that node doesn't come back again
// before calling the api.
Expand Down
10 changes: 10 additions & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ message ZeroProposal {
ZeroSnapshot snapshot = 11; // Used to make Zeros take a snapshot.
// 12 has already been used.
DeleteNsRequest delete_ns = 13; // Used to delete namespace.
repeated Tablet tablets = 14;
}

// MembershipState is used to pack together the current membership state of all
Expand Down Expand Up @@ -567,6 +568,7 @@ service Zero {

rpc Oracle(api.Payload) returns (stream OracleDelta) {}
rpc ShouldServe(Tablet) returns (Tablet) {}
rpc Inform(TabletRequest) returns (TabletResponse) {}
rpc AssignIds(Num) returns (AssignedIds) {}
rpc Timestamps(Num) returns (AssignedIds) {}
rpc CommitOrAbort(api.TxnContext) returns (api.TxnContext) {}
Expand Down Expand Up @@ -596,6 +598,14 @@ service Worker {
rpc TaskStatus(TaskStatusRequest) returns (TaskStatusResponse) {}
}

message TabletResponse {
repeated Tablet tablets = 1;
}
message TabletRequest {
repeated Tablet tablets = 1;
uint32 group_id = 2 [(gogoproto.jsontag) = "groupId,omitempty"]; // Served by which group.
}

message SubscriptionRequest {
repeated bytes prefixes = 1;
repeated badgerpb3.Match matches = 2;
Expand Down
Loading