Skip to content

Commit

Permalink
Support Dynamic Cluster IP Addresses in Failure Scenarios (#495)
Browse files Browse the repository at this point in the history
Adds a retry mechanism during RingPop Bootstrap where if we encounter a bootstrap failure, we retry up to 5 more times before crashing the process, refreshing the bootstrap list prior to each retry.

We suspect (and were able to repro on onebox) that the node is unable to join a ringpop cluster if all of the supplied seed nodes are invalid.

Background: Our bootstrap logic relies on nodes in a Temporal cluster writing their Host:Ports periodically to a table. In the case of a cluster that is cold-starting, all of those written IP addresses may no longer be valid, so no node would be able to start until those heartbeats expire.

Furthermore, the node would write its own heartbeat, fail to start, immediately recycle and potentially get a new IP address meaning that the heartbeat it just wrote is no longer valid, which will negatively impact other nodes (and itself) the same way. This means that the situation could never stabilize.

This fix will retry refreshing the bootstrap list and joining the RingPop cluster without recycling the process up to 5 additional times. The node will continue to write its heartbeats during this process. This basically increases the window of time that this node is discoverable by other nodes (and vice-versa) and ensures that our retries are using the freshest bootstrap list possible.

Because this issue reproduces on onebox, we were able to write unit tests and test locally to verify that the retry logic works and that bootstrap can be invoked on the same ringpop object multiple times without any feature of repercussion (its internal initialization code is also idempotent). We also inspected the ringpop library code to validate that 1) our understanding of the problem is correct and 2) multiple bootstrap retries would work. This has not explicitly been verified on staging, but can be done after the merge to master given the low risks.

The risk here is substantially low - this is addressing a situation where the cluster degenerates into an unstable state. It does not affect the happy path (e.g. first-time startup, single-node cluster startup, stable cluster startup). In the worst case, this fix doesn't solve the problem and the cluster is still unhealthy and fails to start.
  • Loading branch information
mastermanu authored Jul 3, 2020
1 parent eb79edc commit 1d4a36c
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 37 deletions.
52 changes: 42 additions & 10 deletions common/membership/ringpop.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ import (
"github.com/temporalio/temporal/common/log/tag"
)

const (
// Number of times we retry refreshing the bootstrap list and try to join the Ringpop cluster before giving up
maxBootstrapRetries = 5
)

type (
// RingPop is a simple wrapper
RingPop struct {
Expand All @@ -63,7 +68,10 @@ func NewRingPop(
}

// Start start ring pop
func (r *RingPop) Start(bootstrapHostPorts []string) {
func (r *RingPop) Start(
bootstrapHostPostRetriever func() ([]string, error),
bootstrapRetryBackoffInterval time.Duration,
) {
if !atomic.CompareAndSwapInt32(
&r.status,
common.DaemonStatusInitialized,
Expand All @@ -72,16 +80,40 @@ func (r *RingPop) Start(bootstrapHostPorts []string) {
return
}

bootParams := &swim.BootstrapOptions{
ParallelismFactor: 10,
JoinSize: 1,
MaxJoinDuration: r.maxJoinDuration,
DiscoverProvider: statichosts.New(bootstrapHostPorts...),
}
r.bootstrap(bootstrapHostPostRetriever, bootstrapRetryBackoffInterval)
}

func (r *RingPop) bootstrap(
bootstrapHostPostRetriever func() ([]string, error),
bootstrapRetryBackoffInterval time.Duration,
) {
retryCount := 0

for {
hostPorts, err := bootstrapHostPostRetriever()
if err != nil {
r.logger.Fatal("unable to bootstrap ringpop. unable to read hostport bootstrap list", tag.Error(err))
}

bootParams := &swim.BootstrapOptions{
ParallelismFactor: 10,
JoinSize: 1,
MaxJoinDuration: r.maxJoinDuration,
DiscoverProvider: statichosts.New(hostPorts...),
}

_, err = r.Ringpop.Bootstrap(bootParams)
if err == nil {
return
}

if retryCount >= maxBootstrapRetries {
r.logger.Fatal("unable to bootstrap ringpop. exhausted all retries", tag.Error(err))
}

_, err := r.Ringpop.Bootstrap(bootParams)
if err != nil {
r.logger.Fatal("unable to bootstrap ringpop", tag.Error(err))
r.logger.Error("unable to bootstrap ringpop. retrying", tag.Error(err))
retryCount = retryCount + 1
time.Sleep(bootstrapRetryBackoffInterval)
}
}

Expand Down
30 changes: 11 additions & 19 deletions common/membership/rpMonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (
upsertMembershipRecordExpiryDefault = time.Hour * 48

// 10 second base reporting frequency + 5 second jitter + 5 second acceptable time skew
healthyHostLastHeartbeatCutoff = time.Second * 20
healthyHostLastHeartbeatCutoff = time.Second * 20
)

type ringpopMonitor struct {
Expand Down Expand Up @@ -111,12 +111,14 @@ func (rpo *ringpopMonitor) Start() {
// TODO - Note this presents a small race condition as we write our identity before we bootstrap ringpop.
// This is a current limitation of the current structure of the ringpop library as
// we must know our seed nodes before bootstrapping
bootstrapHostPorts, err := rpo.startHeartbeatAndFetchBootstrapHosts(broadcastAddress)
err = rpo.startHeartbeat(broadcastAddress)
if err != nil {
rpo.logger.Fatal("unable to initialize membership heartbeats", tag.Error(err))
}

rpo.rp.Start(bootstrapHostPorts)
rpo.rp.Start(
func() ([]string, error) { return fetchCurrentBootstrapHostports(rpo.metadataManager) },
healthyHostLastHeartbeatCutoff/2)

labels, err := rpo.rp.Labels()
if err != nil {
Expand Down Expand Up @@ -178,7 +180,7 @@ func SplitHostPortTyped(hostPort string) (net.IP, uint16, error) {
return broadcastAddress, uint16(broadcastPort), nil
}

func (rpo *ringpopMonitor) startHeartbeatAndFetchBootstrapHosts(broadcastHostport string) ([]string, error) {
func (rpo *ringpopMonitor) startHeartbeat(broadcastHostport string) error {
// Start by cleaning up expired records to avoid growth
err := rpo.metadataManager.PruneClusterMembership(&persistence.PruneClusterMembershipRequest{MaxRecordsPruned: 10})

Expand All @@ -187,13 +189,13 @@ func (rpo *ringpopMonitor) startHeartbeatAndFetchBootstrapHosts(broadcastHostpor
// Parse and validate broadcast hostport
broadcastAddress, broadcastPort, err := SplitHostPortTyped(broadcastHostport)
if err != nil {
return nil, err
return err
}

// Parse and validate existing service name
role, err := ServiceNameToServiceTypeEnum(rpo.serviceName)
if err != nil {
return nil, err
return err
}

req := &persistence.UpsertClusterMembershipRequest{
Expand All @@ -211,25 +213,16 @@ func (rpo *ringpopMonitor) startHeartbeatAndFetchBootstrapHosts(broadcastHostpor
// For bootstrapping, we filter to a much shorter duration on the
// read side by filtering on the last time a heartbeat was seen.
err = rpo.upsertMyMembership(req)

if err == nil {
rpo.logger.Info("Membership heartbeat upserted successfully",
tag.Address(broadcastAddress.String()),
tag.Port(int(broadcastPort)),
tag.HostID(rpo.hostID.String()))
}

if err != nil {
return nil, err
}

bootstrapHostPorts, err := fetchCurrentBootstrapHostports(rpo.metadataManager)
if err != nil {
return nil, err
rpo.startHeartbeatUpsertLoop(req)
}

rpo.startHeartbeatUpsertLoop(req)
return bootstrapHostPorts, err
return err
}

func fetchCurrentBootstrapHostports(manager persistence.ClusterMetadataManager) ([]string, error) {
Expand All @@ -239,7 +232,6 @@ func fetchCurrentBootstrapHostports(manager persistence.ClusterMetadataManager)
var nextPageToken []byte

for {
// Get active hosts in last 5 minutes - Limit page size to 1000.
resp, err := manager.GetClusterMembers(
&persistence.GetClusterMembersRequest{
LastHeartbeatWithin: healthyHostLastHeartbeatCutoff,
Expand Down Expand Up @@ -278,7 +270,7 @@ func (rpo *ringpopMonitor) startHeartbeatUpsertLoop(request *persistence.UpsertC
}

jitter := math.Round(rand.Float64() * 5)
time.Sleep(time.Second * time.Duration(10 + jitter))
time.Sleep(time.Second * time.Duration(10+jitter))
}
}

Expand Down
30 changes: 22 additions & 8 deletions common/membership/rp_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import (

"github.com/pborman/uuid"
"github.com/uber/ringpop-go"
"github.com/uber/ringpop-go/discovery/statichosts"
"github.com/uber/ringpop-go/swim"
"github.com/uber/tchannel-go"

"github.com/temporalio/temporal/common/log/loggerimpl"
Expand Down Expand Up @@ -113,10 +111,6 @@ func NewTestRingpopCluster(
cluster.seedNode = cluster.hostAddrs[0]
}
logger.Info("seedNode", tag.Name(cluster.seedNode))
bOptions := new(swim.BootstrapOptions)
bOptions.DiscoverProvider = statichosts.New(cluster.seedNode) // just use the first addr as the seed
bOptions.MaxJoinDuration = time.Duration(time.Second * 2)
bOptions.JoinSize = 1

seedAddress, seedPort, err := SplitHostPortTyped(cluster.seedNode)
seedMember := &persistence.ClusterMember{
Expand All @@ -127,8 +121,28 @@ func NewTestRingpopCluster(
LastHeartbeat: time.Now().UTC(),
}

mockMgr.EXPECT().GetClusterMembers(gomock.Any()).
Return(&persistence.GetClusterMembersResponse{ActiveMembers: []*persistence.ClusterMember{seedMember}}, nil).AnyTimes()
firstGetClusterMemberCall := true
mockMgr.EXPECT().GetClusterMembers(gomock.Any()).DoAndReturn(
func(_ *persistence.GetClusterMembersRequest) (*persistence.GetClusterMembersResponse, error) {
res := &persistence.GetClusterMembersResponse{ActiveMembers: []*persistence.ClusterMember{seedMember}}

if firstGetClusterMemberCall {
// The first time GetClusterMembers is invoked, we simulate returning a stale/bad heartbeat.
// All subsequent calls only return the single "good" seed member
// This ensures that we exercise the retry path in bootstrap properly.
badSeedMember := &persistence.ClusterMember{
HostID: uuid.NewUUID(),
RPCAddress: seedAddress,
RPCPort: seedPort + 1,
SessionStart: time.Now().UTC(),
LastHeartbeat: time.Now().UTC(),
}
res = &persistence.GetClusterMembersResponse{ActiveMembers: []*persistence.ClusterMember{seedMember, badSeedMember}}
}

firstGetClusterMemberCall = false
return res, nil
}).AnyTimes()

for i := 0; i < size; i++ {
resolver := func() (string, error) {
Expand Down

0 comments on commit 1d4a36c

Please sign in to comment.