Skip to content

Commit

Permalink
feat: adding bulk call for alpha to inform zero about the tablets (#8089
Browse files Browse the repository at this point in the history
)

* adding bulk call for alpha to inform zero about the tablets (#8088)

(cherry picked from commit c204d0a)
  • Loading branch information
aman-bansal authored and NamanJain8 committed Nov 30, 2021
1 parent 3bfd269 commit 38ed972
Show file tree
Hide file tree
Showing 5 changed files with 1,024 additions and 389 deletions.
34 changes: 31 additions & 3 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,25 @@ func (n *node) regenerateChecksum() {
}
}

func (n *node) handleTabletProposal(tablet *pb.Tablet) error {
func (n *node) handleBulkTabletProposal(tablets []*pb.Tablet) error {
n.server.AssertLock()
state := n.server.state

defer n.regenerateChecksum()
for _, tablet := range tablets {
if err := n.handleTablet(tablet); err != nil {
glog.Warningf("not able to handle tablet %s. Got err: %+v", tablet.GetPredicate(), err)
}
}

return nil
}

// handleTablet will check if the given tablet is served by any group.
// If not the tablet will be added to the current group predicate list
//
// This function doesn't take any locks.
// It is the calling functions responsibility to manage the concurrency.
func (n *node) handleTablet(tablet *pb.Tablet) error {
state := n.server.state
if tablet.GroupId == 0 {
return errors.Errorf("Tablet group id is zero: %+v", tablet)
}
Expand Down Expand Up @@ -339,6 +352,12 @@ func (n *node) handleTabletProposal(tablet *pb.Tablet) error {
return nil
}

func (n *node) handleTabletProposal(tablet *pb.Tablet) error {
n.server.AssertLock()
defer n.regenerateChecksum()
return n.handleTablet(tablet)
}

func (n *node) deleteNamespace(delNs uint64) error {
n.server.AssertLock()
state := n.server.state
Expand Down Expand Up @@ -431,6 +450,15 @@ func (n *node) applyProposal(e raftpb.Entry) (uint64, error) {
return key, err
}
}

if p.Tablets != nil && len(p.Tablets) > 0 {
if err := n.handleBulkTabletProposal(p.Tablets); err != nil {
span.Annotatef(nil, "While applying bulk tablet proposal: %v", err)
glog.Errorf("While applying bulk tablet proposal: %v", err)
return key, err
}
}

if p.License != nil {
// Check that the number of nodes in the cluster should be less than MaxNodes, otherwise
// reject the proposal.
Expand Down
77 changes: 69 additions & 8 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,8 @@ func (s *Server) ServingTablet(tablet string) *pb.Tablet {
defer s.RUnlock()

for _, group := range s.state.Groups {
for key, tab := range group.Tablets {
if key == tablet {
return tab
}
if tab, ok := group.Tablets[tablet]; ok {
return tab
}
}
return nil
Expand All @@ -341,10 +339,8 @@ func (s *Server) servingTablet(tablet string) *pb.Tablet {
s.AssertRLock()

for _, group := range s.state.Groups {
for key, tab := range group.Tablets {
if key == tablet {
return tab
}
if tab, ok := group.Tablets[tablet]; ok {
return tab
}
}
return nil
Expand Down Expand Up @@ -410,6 +406,71 @@ func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {
return res, nil
}

func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletResponse, error) {
ctx, span := otrace.StartSpan(ctx, "Zero.Inform")
defer span.End()
if req == nil || len(req.Tablets) == 0 {
return nil, errors.Errorf("Tablets are empty in %+v", req)
}

if req.GroupId == 0 {
return nil, errors.Errorf("Group ID is Zero in %+v", req)
}

tablets := make([]*pb.Tablet, 0)
unknownTablets := make([]*pb.Tablet, 0)
for _, t := range req.Tablets {
tab := s.ServingTablet(t.Predicate)
span.Annotatef(nil, "Tablet for %s: %+v", t.Predicate, tab)
switch {
case tab != nil && !t.Force:
tablets = append(tablets, t)
case t.ReadOnly:
tablets = append(tablets, &pb.Tablet{})
default:
unknownTablets = append(unknownTablets, t)
}
}

if len(unknownTablets) == 0 {
return &pb.TabletResponse{
Tablets: tablets,
}, nil
}

// Set the tablet to be served by this server's group.
var proposal pb.ZeroProposal
proposal.Tablets = make([]*pb.Tablet, 0)
for _, t := range unknownTablets {
if x.IsReservedPredicate(t.Predicate) {
// Force all the reserved predicates to be allocated to group 1.
// This is to make it easier to stream ACL updates to all alpha servers
// since they only need to open one pipeline to receive updates for all
// ACL predicates.
// This will also make it easier to restore the reserved predicates after
// a DropAll operation.
t.GroupId = 1
}
proposal.Tablets = append(proposal.Tablets, t)
}

if err := s.Node.proposeAndWait(ctx, &proposal); err != nil && err != errTabletAlreadyServed {
span.Annotatef(nil, "While proposing tablet: %v", err)
return nil, err
}

for _, t := range unknownTablets {
tab := s.ServingTablet(t.Predicate)
x.AssertTrue(tab != nil)
span.Annotatef(nil, "Now serving tablet for %s: %+v", t.Predicate, tab)
tablets = append(tablets, tab)
}

return &pb.TabletResponse{
Tablets: tablets,
}, nil
}

// RemoveNode removes the given node from the given group.
// It's the user's responsibility to ensure that node doesn't come back again
// before calling the api.
Expand Down
10 changes: 10 additions & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ message ZeroProposal {
ZeroSnapshot snapshot = 11; // Used to make Zeros take a snapshot.
// 12 has already been used.
DeleteNsRequest delete_ns = 13; // Used to delete namespace.
repeated Tablet tablets = 14;
}

// MembershipState is used to pack together the current membership state of all
Expand Down Expand Up @@ -556,6 +557,7 @@ service Zero {

rpc Oracle(api.Payload) returns (stream OracleDelta) {}
rpc ShouldServe(Tablet) returns (Tablet) {}
rpc Inform(TabletRequest) returns (TabletResponse) {}
rpc AssignIds(Num) returns (AssignedIds) {}
rpc Timestamps(Num) returns (AssignedIds) {}
rpc CommitOrAbort(api.TxnContext) returns (api.TxnContext) {}
Expand Down Expand Up @@ -585,6 +587,14 @@ service Worker {
rpc TaskStatus(TaskStatusRequest) returns (TaskStatusResponse) {}
}

message TabletResponse {
repeated Tablet tablets = 1;
}
message TabletRequest {
repeated Tablet tablets = 1;
uint32 group_id = 2 [(gogoproto.jsontag) = "groupId,omitempty"]; // Served by which group.
}

message SubscriptionRequest {
repeated bytes prefixes = 1;
repeated badgerpb3.Match matches = 2;
Expand Down
Loading

0 comments on commit 38ed972

Please sign in to comment.