Skip to content

Commit

Permalink
Merge pull request spegel-org#325 from XenitAB/refactor/bootstrap
Browse files Browse the repository at this point in the history
Refactor bootstrap to exit on error
  • Loading branch information
phillebaba authored Jan 19, 2024
2 parents d29a138 + 7a44c7b commit 76d0743
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- [#319](https://github.com/XenitAB/spegel/pull/319) Move metrics definitions to separate package.
- [#322](https://github.com/XenitAB/spegel/pull/322) Refactor type of router resolve.
- [#325](https://github.com/XenitAB/spegel/pull/325) Refactor bootstrap to exit on error.

### Deprecated

Expand Down
18 changes: 12 additions & 6 deletions internal/routing/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

type Bootstrapper interface {
Run(ctx context.Context, id string) error
GetAddress() (*peer.AddrInfo, error)
Get() (*peer.AddrInfo, error)
}

type KubernetesBootstrapper struct {
Expand All @@ -27,20 +27,26 @@ type KubernetesBootstrapper struct {
}

func NewKubernetesBootstrapper(cs kubernetes.Interface, namespace, name string) Bootstrapper {
k := &KubernetesBootstrapper{
return &KubernetesBootstrapper{
leaderElectionNamespace: namespace,
leaderElectioName: name,
cs: cs,
initCh: make(chan interface{}),
}
return k
}

func (k *KubernetesBootstrapper) Run(ctx context.Context, id string) error {
lockCfg := resourcelock.ResourceLockConfig{
Identity: id,
}
rl, err := resourcelock.New(resourcelock.ConfigMapsLeasesResourceLock, k.leaderElectionNamespace, k.leaderElectioName, k.cs.CoreV1(), k.cs.CoordinationV1(), lockCfg)
rl, err := resourcelock.New(
resourcelock.LeasesResourceLock,
k.leaderElectionNamespace,
k.leaderElectioName,
k.cs.CoreV1(),
k.cs.CoordinationV1(),
lockCfg,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -71,11 +77,11 @@ func (k *KubernetesBootstrapper) Run(ctx context.Context, id string) error {
},
},
}
go leaderelection.RunOrDie(ctx, leCfg)
leaderelection.RunOrDie(ctx, leCfg)
return nil
}

func (k *KubernetesBootstrapper) GetAddress() (*peer.AddrInfo, error) {
func (k *KubernetesBootstrapper) Get() (*peer.AddrInfo, error) {
<-k.initCh
k.mx.RLock()
defer k.mx.RUnlock()
Expand Down
37 changes: 19 additions & 18 deletions internal/routing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,14 @@ import (
)

type P2PRouter struct {
b Bootstrapper
bootstrapper Bootstrapper
host host.Host
kdht *dht.IpfsDHT
rd *routing.RoutingDiscovery
registryPort uint16
}

func NewP2PRouter(ctx context.Context, addr string, b Bootstrapper, registryPortStr string) (Router, error) {
log := logr.FromContextOrDiscard(ctx).WithName("p2p")

func NewP2PRouter(ctx context.Context, addr string, bootstrapper Bootstrapper, registryPortStr string) (*P2PRouter, error) {
registryPort, err := strconv.ParseUint(registryPortStr, 10, 16)
if err != nil {
return nil, err
Expand Down Expand Up @@ -81,16 +79,9 @@ func NewP2PRouter(ctx context.Context, addr string, b Bootstrapper, registryPort
return nil, fmt.Errorf("expected single host address but got %d %s", len(addrs), strings.Join(addrs, ", "))
}

self := fmt.Sprintf("%s/p2p/%s", host.Addrs()[0].String(), host.ID().Pretty())
log.Info("starting p2p router", "id", self)

err = b.Run(ctx, self)
if err != nil {
return nil, err
}

log := logr.FromContextOrDiscard(ctx).WithName("p2p")
bootstrapPeerOpt := dht.BootstrapPeersFunc(func() []peer.AddrInfo {
addrInfo, err := b.GetAddress()
addrInfo, err := bootstrapper.Get()
if err != nil {
log.Error(err, "could not get bootstrap addresses")
return nil
Expand All @@ -112,26 +103,36 @@ func NewP2PRouter(ctx context.Context, addr string, b Bootstrapper, registryPort
if err != nil {
return nil, fmt.Errorf("could not create distributed hash table: %w", err)
}
if err = kdht.Bootstrap(ctx); err != nil {
return nil, fmt.Errorf("could not boostrap distributed hash table: %w", err)
}
rd := routing.NewRoutingDiscovery(kdht)

return &P2PRouter{
b: b,
bootstrapper: bootstrapper,
host: host,
kdht: kdht,
rd: rd,
registryPort: uint16(registryPort),
}, nil
}

func (r *P2PRouter) Run(ctx context.Context) error {
self := fmt.Sprintf("%s/p2p/%s", r.host.Addrs()[0].String(), r.host.ID().Pretty())
logr.FromContextOrDiscard(ctx).WithName("p2p").Info("starting p2p router", "id", self)
if err := r.kdht.Bootstrap(ctx); err != nil {
return fmt.Errorf("could not boostrap distributed hash table: %w", err)
}
err := r.bootstrapper.Run(ctx, self)
if err != nil {
return err
}
return nil
}

func (r *P2PRouter) Close() error {
return r.host.Close()
}

func (r *P2PRouter) HasMirrors() (bool, error) {
addrInfo, err := r.b.GetAddress()
addrInfo, err := r.bootstrapper.Get()
if err != nil {
return false, err
}
Expand Down
10 changes: 9 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) {
return err
}

// Metrics
metrics.Register()
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(metrics.DefaultGatherer, promhttp.HandlerOpts{}))
Expand All @@ -137,6 +138,7 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) {
return metricsSrv.Shutdown(shutdownCtx)
})

// Router
_, registryPort, err := net.SplitHostPort(args.RegistryAddr)
if err != nil {
return err
Expand All @@ -146,15 +148,21 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) {
if err != nil {
return err
}
g.Go(func() error {
return router.Run(ctx)
})
g.Go(func() error {
<-ctx.Done()
return router.Close()
})

// State tracking
g.Go(func() error {
state.Track(ctx, ociClient, router, args.ResolveLatestTag)
return nil
})

// Registry
reg := registry.NewRegistry(ociClient, router, args.LocalAddr, args.MirrorResolveRetries, args.MirrorResolveTimeout, args.ResolveLatestTag)
regSrv := reg.Server(args.RegistryAddr, log)
g.Go(func() error {
Expand All @@ -170,7 +178,7 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) {
return regSrv.Shutdown(shutdownCtx)
})

log.Info("running registry", "addr", args.RegistryAddr)
log.Info("running Spegel", "registry", args.RegistryAddr, "router", args.RouterAddr)
err = g.Wait()
if err != nil {
return err
Expand Down

0 comments on commit 76d0743

Please sign in to comment.