Skip to content

Commit

Permalink
Fix raft bootstrap
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Jul 3, 2024
1 parent 53ad30b commit b7d0a45
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,22 @@ spec:
- "-server.http-listen-port={{ $values.service.port }}"
- "-memberlist.cluster-label={{ .Release.Namespace }}-{{ include "pyroscope.fullname" .}}"
- "-memberlist.join=dns+{{ include "pyroscope.fullname" .}}-memberlist.{{ .Release.Namespace }}.svc{{ .Values.pyroscope.cluster_domain }}:{{ .Values.pyroscope.memberlist.port }}"
- "-metastore.address=dns:///{{ include "pyroscope.fullname" .}}-metastore-grpc.{{ .Release.Namespace }}.svc{{ .Values.pyroscope.cluster_domain }}:9095"
- "-metastore.raft.bootstrap-peers=dns:///{{ include "pyroscope.fullname" .}}-metastore-raft.{{ .Release.Namespace }}.svc{{ .Values.pyroscope.cluster_domain }}:9099"
- "-metastore.raft.bind-address=:9099"
- "-metastore.raft.advertise-address=($HOSTNAME).{{ include "pyroscope.fullname" .}}-metastore-headless.{{ .Release.Namespace }}.svc{{ .Values.pyroscope.cluster_domain }}:9099"
- "-metastore.raft.server-id=($HOSTNAME).{{ include "pyroscope.fullname" .}}-metastore-headless.{{ .Release.Namespace }}.svc{{ .Values.pyroscope.cluster_domain }}:9099"
- "-metastore.raft.bootstrap-peers=dnssrvnoa+_raft._tcp.{{ include "pyroscope.fullname" .}}-metastore-headless.{{ .Release.Namespace }}.svc{{ .Values.pyroscope.cluster_domain }}:9099"
- "-metastore.address=dns:///_grpc._tcp.{{ include "pyroscope.fullname" .}}-metastore-headless.{{ .Release.Namespace }}.svc{{ .Values.pyroscope.cluster_domain }}:9095"
- "-config.file=/etc/pyroscope/config.yaml"
- "-runtime-config.file=/etc/pyroscope/overrides/overrides.yaml"
{{- range $key, $value := $extraArgs }}
- "-{{ $key }}={{ $value }}"
{{- end }}
{{- with $values.extraEnvVars }}
env:
- name: HOSTNAME
valueFrom:
fieldRef:
fieldPath: metadata.name
{{- range $key, $value := . }}
- name: {{ $key }}
{{- if kindIs "map" $value }}
Expand Down

This file was deleted.

11 changes: 11 additions & 0 deletions operations/pyroscope/helm/pyroscope/templates/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ spec:
targetPort: {{ $values.service.port_name }}
protocol: TCP
name: {{ $values.service.port_name }}
{{- if eq $component "metastore" }}
- name: grpc
port: 9095
protocol: TCP
targetPort: 9095
- name: raft
port: 9099
protocol: TCP
targetPort: 9099
publishNotReadyAddresses: true
{{- end }}
selector:
{{- include "pyroscope.selectorLabels" . | nindent 4 }}
app.kubernetes.io/component: {{ $component | quote }}
Expand Down
7 changes: 2 additions & 5 deletions pkg/metastore/metastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,10 @@ func (cfg *RaftConfig) RegisterFlags(f *flag.FlagSet) {
const prefix = "metastore.raft."
f.StringVar(&cfg.Dir, prefix+"dir", "./data-metastore/raft", "")
f.Var((*flagext.StringSlice)(&cfg.BootstrapPeers), prefix+"bootstrap-peers", "")
f.StringVar(&cfg.ServerID, prefix+"server-id", "localhost", "")
f.StringVar(&cfg.BindAddress, prefix+"bind-address", ":9099", "")
f.StringVar(&cfg.BindAddress, prefix+"bind-address", "localhost:9099", "")
f.StringVar(&cfg.ServerID, prefix+"server-id", "localhost:9099", "")
f.StringVar(&cfg.AdvertiseAddress, prefix+"advertise-address", "localhost:9099", "")
f.DurationVar(&cfg.ApplyTimeout, prefix+"apply-timeout", 5*time.Second, "")
if len(cfg.BootstrapPeers) == 0 {
cfg.BootstrapPeers = []string{cfg.AdvertiseAddress + "/" + cfg.ServerID}
}
}

type Metastore struct {
Expand Down
128 changes: 55 additions & 73 deletions pkg/metastore/metastore_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import (
"errors"
"fmt"
"net"
"net/url"
"slices"
"strconv"
"strings"
"time"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/dns"
"github.com/hashicorp/raft"
)

Expand All @@ -23,7 +22,7 @@ func (m *Metastore) bootstrap() error {
if len(peers) == 0 {
return fmt.Errorf("no peers found")
}
if raft.ServerID(m.config.Raft.ServerID) != peers[0].ID {
if raft.ServerAddress(m.config.Raft.AdvertiseAddress) != peers[0].Address {
_ = level.Info(m.logger).Log("msg", "skipping raft bootstrap",
"local", m.config.Raft.ServerID,
"peers", fmt.Sprint(peers))
Expand All @@ -39,89 +38,72 @@ func (m *Metastore) bootstrap() error {
}

func (m *Metastore) bootstrapPeers() ([]raft.Server, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// If no bootstrap peers are provided, we bootstrap a single-node cluster.
if len(m.config.Raft.BootstrapPeers) == 0 {
m.config.Raft.BootstrapPeers = []string{m.config.Raft.AdvertiseAddress}
}
// Otherwise, we build the list of peers, resolving the names if needed.
//
// Note that raft requires stable node IDs, therefore we're using
// the node FQDN:port for both purposes: as the identifier and as the
// address. This requires a DNS SRV record lookup without further
// resolution of A records (dnssrvnoa+).
//
// Alternatively, peers may be specified explicitly in the
// "{addr}</{node_id}>" format, where the node is the optional node
// identifier.
var peers []raft.Server
for _, addr := range m.config.Raft.BootstrapPeers {
parsed, err := peerFromAddress(ctx, addr)
if err != nil {
return nil, err
var resolve []string
for _, peer := range m.config.Raft.BootstrapPeers {
if strings.Contains(peer, "+") {
resolve = append(resolve, peer)
} else {
peers = append(peers, parsePeer(peer))
}
peers = append(peers, parsed...)
}
slices.SortFunc(peers, func(a, b raft.Server) int {
return strings.Compare(string(a.ID), string(b.ID))
})
return peers, nil
}

func peerFromAddress(ctx context.Context, addr string) ([]raft.Server, error) {
if name, found := strings.CutPrefix(addr, "dns+"); found {
return lookupPeers(ctx, name)
}
p, err := url.Parse(addr)
if err != nil {
return nil, err
}
if p.Scheme == "dns" {
// dns://[authority]/{host[:port]}
// DNS scheme uses the URL host for authority, and the actual target
// is in the path token, and the path (name) may have a leading slash.
return lookupPeers(ctx, strings.TrimPrefix(p.Path, "/"))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
prov := dns.NewProvider(m.logger, m.reg, dns.GolangResolverType)
if err := prov.Resolve(ctx, resolve); err != nil {
return nil, fmt.Errorf("failed to resolve bootstrap peers: %w", err)
}
return parsePeer(addr)
}

func parsePeer(raw ...string) ([]raft.Server, error) {
peers := make([]raft.Server, 0, len(raw))
for _, str := range raw {
// The string may be "{addr}" or "{addr}/{node}".
parts := strings.SplitN(str, "/", 2)
var addr string
var node string
if len(parts) == 2 {
addr = parts[0]
node = parts[1]
} else {
addr = str
}
host, _, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
if node == "" {
node = host
}
for _, peer := range prov.Addresses() {
peers = append(peers, raft.Server{
Suffrage: raft.Voter,
ID: raft.ServerID(node),
Address: raft.ServerAddress(addr),
ID: raft.ServerID(peer),
Address: raft.ServerAddress(peer),
})
}
// Finally, we sort the peers: the first one is to boostrap the cluster.
slices.SortFunc(peers, func(a, b raft.Server) int {
return strings.Compare(string(a.ID), string(b.ID))
})
return peers, nil
}

func lookupPeers(ctx context.Context, addr string) ([]raft.Server, error) {
host, port, err := net.SplitHostPort(addr)
func parsePeer(raw string) raft.Server {
// The string may be "{addr}" or "{addr}/{node_id}".
parts := strings.SplitN(raw, "/", 2)
var addr string
var node string
if len(parts) == 2 {
addr = parts[0]
node = parts[1]
} else {
addr = raw
}
host, _, err := net.SplitHostPort(addr)
if err != nil {
host, port = addr, ""
// No port specified.
host = addr
}
_, recs, err := net.DefaultResolver.LookupSRV(ctx, "", "", host)
if len(recs) == 0 && err != nil {
return nil, err
if node == "" {
// No node_id specified.
node = host
}
var peers []raft.Server
for _, r := range recs {
// The SRV record may have a port, but we prefer the one from the URL.
rPort := port
if rPort == "" {
rPort = strconv.Itoa(int(r.Port))
}
peers = append(peers, raft.Server{
Suffrage: raft.Voter,
ID: raft.ServerID(r.Target),
Address: raft.ServerAddress(net.JoinHostPort(r.Target, rPort)),
})
return raft.Server{
Suffrage: raft.Voter,
ID: raft.ServerID(node),
Address: raft.ServerAddress(addr),
}
return peers, nil
}
110 changes: 0 additions & 110 deletions pkg/metastore/metastore_test.go

This file was deleted.

5 changes: 2 additions & 3 deletions pkg/metastore/raftleader/raftleader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ func (hs *HealthObserver) Register(r *raft.Raft, service string) {
hs.mu.Lock()
defer hs.mu.Unlock()
k := serviceKey{raft: r, service: service}
svc, ok := hs.registered[k]
if ok {
if _, ok := hs.registered[k]; ok {
return
}
svc = &raftService{
svc := &raftService{
server: hs.server,
logger: log.With(hs.logger, "service", service),
service: service,
Expand Down
Loading

0 comments on commit b7d0a45

Please sign in to comment.