diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c286502..b1470c71 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - [#319](https://github.com/XenitAB/spegel/pull/319) Move metrics definitions to separate package. - [#322](https://github.com/XenitAB/spegel/pull/322) Refactor type of router resolve. +- [#325](https://github.com/XenitAB/spegel/pull/325) Refactor bootstrap to exit on error. ### Deprecated diff --git a/internal/routing/bootstrap.go b/internal/routing/bootstrap.go index 3113eda8..b3b32b1a 100644 --- a/internal/routing/bootstrap.go +++ b/internal/routing/bootstrap.go @@ -14,7 +14,7 @@ import ( type Bootstrapper interface { Run(ctx context.Context, id string) error - GetAddress() (*peer.AddrInfo, error) + Get() (*peer.AddrInfo, error) } type KubernetesBootstrapper struct { @@ -27,20 +27,26 @@ type KubernetesBootstrapper struct { } func NewKubernetesBootstrapper(cs kubernetes.Interface, namespace, name string) Bootstrapper { - k := &KubernetesBootstrapper{ + return &KubernetesBootstrapper{ leaderElectionNamespace: namespace, leaderElectioName: name, cs: cs, initCh: make(chan interface{}), } - return k } func (k *KubernetesBootstrapper) Run(ctx context.Context, id string) error { lockCfg := resourcelock.ResourceLockConfig{ Identity: id, } - rl, err := resourcelock.New(resourcelock.ConfigMapsLeasesResourceLock, k.leaderElectionNamespace, k.leaderElectioName, k.cs.CoreV1(), k.cs.CoordinationV1(), lockCfg) + rl, err := resourcelock.New( + resourcelock.LeasesResourceLock, + k.leaderElectionNamespace, + k.leaderElectioName, + k.cs.CoreV1(), + k.cs.CoordinationV1(), + lockCfg, + ) if err != nil { return err } @@ -71,11 +77,11 @@ func (k *KubernetesBootstrapper) Run(ctx context.Context, id string) error { }, }, } - go leaderelection.RunOrDie(ctx, leCfg) + leaderelection.RunOrDie(ctx, leCfg) return nil } -func (k *KubernetesBootstrapper) GetAddress() (*peer.AddrInfo, error) { +func (k *KubernetesBootstrapper) Get() (*peer.AddrInfo, error) { <-k.initCh k.mx.RLock() defer k.mx.RUnlock() diff --git a/internal/routing/p2p.go b/internal/routing/p2p.go index 1be9585e..658e7339 100644 --- a/internal/routing/p2p.go +++ b/internal/routing/p2p.go @@ -25,16 +25,14 @@ import ( ) type P2PRouter struct { - b Bootstrapper + bootstrapper Bootstrapper host host.Host kdht *dht.IpfsDHT rd *routing.RoutingDiscovery registryPort uint16 } -func NewP2PRouter(ctx context.Context, addr string, b Bootstrapper, registryPortStr string) (Router, error) { - log := logr.FromContextOrDiscard(ctx).WithName("p2p") - +func NewP2PRouter(ctx context.Context, addr string, bootstrapper Bootstrapper, registryPortStr string) (*P2PRouter, error) { registryPort, err := strconv.ParseUint(registryPortStr, 10, 16) if err != nil { return nil, err @@ -81,16 +79,9 @@ func NewP2PRouter(ctx context.Context, addr string, b Bootstrapper, registryPort return nil, fmt.Errorf("expected single host address but got %d %s", len(addrs), strings.Join(addrs, ", ")) } - self := fmt.Sprintf("%s/p2p/%s", host.Addrs()[0].String(), host.ID().Pretty()) - log.Info("starting p2p router", "id", self) - - err = b.Run(ctx, self) - if err != nil { - return nil, err - } - + log := logr.FromContextOrDiscard(ctx).WithName("p2p") bootstrapPeerOpt := dht.BootstrapPeersFunc(func() []peer.AddrInfo { - addrInfo, err := b.GetAddress() + addrInfo, err := bootstrapper.Get() if err != nil { log.Error(err, "could not get bootstrap addresses") return nil @@ -112,13 +103,10 @@ func NewP2PRouter(ctx context.Context, addr string, b Bootstrapper, registryPort if err != nil { return nil, fmt.Errorf("could not create distributed hash table: %w", err) } - if err = kdht.Bootstrap(ctx); err != nil { - return nil, fmt.Errorf("could not boostrap distributed hash table: %w", err) - } rd := routing.NewRoutingDiscovery(kdht) return &P2PRouter{ - b: b, + bootstrapper: bootstrapper, host: host, kdht: kdht, rd: rd, @@ -126,12 +114,25 @@ func NewP2PRouter(ctx context.Context, addr string, b Bootstrapper, registryPort }, nil } +func (r *P2PRouter) Run(ctx context.Context) error { + self := fmt.Sprintf("%s/p2p/%s", r.host.Addrs()[0].String(), r.host.ID().Pretty()) + logr.FromContextOrDiscard(ctx).WithName("p2p").Info("starting p2p router", "id", self) + if err := r.kdht.Bootstrap(ctx); err != nil { + return fmt.Errorf("could not boostrap distributed hash table: %w", err) + } + err := r.bootstrapper.Run(ctx, self) + if err != nil { + return err + } + return nil +} + func (r *P2PRouter) Close() error { return r.host.Close() } func (r *P2PRouter) HasMirrors() (bool, error) { - addrInfo, err := r.b.GetAddress() + addrInfo, err := r.bootstrapper.Get() if err != nil { return false, err } diff --git a/main.go b/main.go index 28c27283..af92fb32 100644 --- a/main.go +++ b/main.go @@ -117,6 +117,7 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) { return err } + // Metrics metrics.Register() mux := http.NewServeMux() mux.Handle("/metrics", promhttp.HandlerFor(metrics.DefaultGatherer, promhttp.HandlerOpts{})) @@ -137,6 +138,7 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) { return metricsSrv.Shutdown(shutdownCtx) }) + // Router _, registryPort, err := net.SplitHostPort(args.RegistryAddr) if err != nil { return err @@ -146,15 +148,21 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) { if err != nil { return err } + g.Go(func() error { + return router.Run(ctx) + }) g.Go(func() error { <-ctx.Done() return router.Close() }) + + // State tracking g.Go(func() error { state.Track(ctx, ociClient, router, args.ResolveLatestTag) return nil }) + // Registry reg := registry.NewRegistry(ociClient, router, args.LocalAddr, args.MirrorResolveRetries, args.MirrorResolveTimeout, args.ResolveLatestTag) regSrv := reg.Server(args.RegistryAddr, log) g.Go(func() error { @@ -170,7 +178,7 @@ func registryCommand(ctx context.Context, args *RegistryCmd) (err error) { return regSrv.Shutdown(shutdownCtx) }) - log.Info("running registry", "addr", args.RegistryAddr) + log.Info("running Spegel", "registry", args.RegistryAddr, "router", args.RouterAddr) err = g.Wait() if err != nil { return err