Skip to content

Commit

Permalink
chore: import siderolink as siderolink-launch subcommand
Browse files Browse the repository at this point in the history
This PR ensures that we can test our siderolink communication using embedded siderolink-agent.
If `--with-siderolink` provided during `talos cluster create` talosctl will embed proper kernel string and setup `siderolink-agent` as a separate process. It should be used with combination of `--skip-injecting-config` and `--with-apply-config` (the latter will use newly generated IPv6 siderolink addresses which talosctl passes to the agent as a "pre-bind").

Fixes #8392

Signed-off-by: Dmitriy Matrenichev <[email protected]>
  • Loading branch information
DmitriyMV committed Mar 23, 2024
1 parent ee51f04 commit 949ad11
Show file tree
Hide file tree
Showing 18 changed files with 534 additions and 11 deletions.
8 changes: 8 additions & 0 deletions .drone.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,13 @@ local integration_cloud_images = Step('cloud-images', depends_on=[integration_im

local integration_reproducibility_test = Step('reproducibility-test', target='reproducibility-test', depends_on=[load_artifacts], environment={ IMAGE_REGISTRY: local_registry });

local integration_siderolink = Step('e2e-siderolink', target='e2e-qemu', privileged=true, depends_on=[integration_default_hostname], environment={
SHORT_INTEGRATION_TEST: 'yes',
WITH_SIDEROLINK_AGENT: 'true',
VIA_MAINTENANCE_MODE: 'true',
REGISTRY: local_registry,
});

local push_edge = {
name: 'push-edge',
image: 'autonomy/build-container:latest',
Expand Down Expand Up @@ -697,6 +704,7 @@ local integration_pipelines = [
integration_no_cluster_discovery,
integration_kubespan,
integration_default_hostname,
integration_siderolink,
]) + integration_trigger(['integration-misc']),
Pipeline('integration-extensions', default_pipeline_steps + integration_extensions) + integration_trigger(['integration-extensions']),
Pipeline('integration-cilium', default_pipeline_steps + [integration_cilium, integration_cilium_strict, integration_cilium_strict_kubespan]) + integration_trigger(['integration-cilium']),
Expand Down
171 changes: 170 additions & 1 deletion cmd/talosctl/cmd/mgmt/cluster/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
"errors"
"fmt"
"math/big"
"net"
"net/netip"
"net/url"
"os"
"path/filepath"
stdruntime "runtime"
"slices"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -172,6 +174,7 @@ var (
diskEncryptionKeyTypes []string
withFirewall string
withUUIDHostnames bool
withSiderolinkAgent bool
)

// createCmd represents the cluster up command.
Expand Down Expand Up @@ -422,6 +425,7 @@ func create(ctx context.Context, flags *pflag.FlagSet) error {
provision.WithTPM2(tpm2Enabled),
provision.WithExtraUEFISearchPaths(extraUEFISearchPaths),
provision.WithTargetArch(targetArch),
provision.WithSiderolinkAgent(withSiderolinkAgent),
}

var configBundleOpts []bundle.Option
Expand Down Expand Up @@ -746,6 +750,40 @@ func create(ctx context.Context, flags *pflag.FlagSet) error {
extraKernelArgs = procfs.NewCmdline(extraBootKernelArgs)
}

wgNodeGen := makeNodeAddrGenerator()

if withSiderolinkAgent {
if extraKernelArgs == nil {
extraKernelArgs = procfs.NewCmdline("")
}

if extraKernelArgs.Get("siderolink.api") != nil || extraKernelArgs.Get("talos.events.sink") != nil || extraKernelArgs.Get("talos.logging.kernel") != nil {
return errors.New("siderolink kernel arguments are already set, cannot run with --with-siderolink")
}

wgHost := gatewayIPs[0].String()

ports, err := getDynamicPorts()
if err != nil {
return err
}

request.SiderolinkRequest.WireguardEndpoint = net.JoinHostPort(wgHost, ports.wgPort)
request.SiderolinkRequest.APIEndpoint = ":" + ports.apiPort
request.SiderolinkRequest.SinkEndpoint = ":" + ports.sinkPort
request.SiderolinkRequest.LogEndpoint = ":" + ports.logPort

agentNodeAddr := wgNodeGen.GetAgentNodeAddr()

apiLink := "grpc://" + net.JoinHostPort(wgHost, ports.apiPort) + "?jointoken=foo"
sinkURL := net.JoinHostPort(agentNodeAddr, ports.sinkPort)
kernelURL := "tcp://" + net.JoinHostPort(agentNodeAddr, ports.logPort)

extraKernelArgs.Append("siderolink.api", apiLink)
extraKernelArgs.Append("talos.events.sink", sinkURL)
extraKernelArgs.Append("talos.logging.kernel", kernelURL)
}

// Add talosconfig to provision options, so we'll have it to parse there
provisionOptions = append(provisionOptions, provision.WithTalosConfig(configBundle.TalosConfig()))

Expand All @@ -760,6 +798,17 @@ func create(ctx context.Context, flags *pflag.FlagSet) error {

nodeUUID := uuid.New()

if withSiderolinkAgent {
var generated netip.Addr

generated, err = wgNodeGen.GenerateRandomNodeAddr()
if err != nil {
return err
}

request.SiderolinkRequest.AddBind(nodeUUID, generated)
}

nodeReq := provision.NodeRequest{
Name: nodeName(clusterName, "controlplane", i+1, nodeUUID),
Type: machine.TypeControlPlane,
Expand Down Expand Up @@ -820,6 +869,17 @@ func create(ctx context.Context, flags *pflag.FlagSet) error {

nodeUUID := uuid.New()

if withSiderolinkAgent {
var generated netip.Addr

generated, err = wgNodeGen.GenerateRandomNodeAddr()
if err != nil {
return err
}

request.SiderolinkRequest.AddBind(nodeUUID, generated)
}

request.Nodes = append(request.Nodes,
provision.NodeRequest{
Name: nodeName(clusterName, "worker", i, nodeUUID),
Expand Down Expand Up @@ -855,7 +915,7 @@ func create(ctx context.Context, flags *pflag.FlagSet) error {
defer clusterAccess.Close() //nolint:errcheck

if applyConfigEnabled {
err = clusterAccess.ApplyConfig(ctx, request.Nodes, os.Stdout)
err = clusterAccess.ApplyConfig(ctx, request.Nodes, request.SiderolinkRequest, os.Stdout)
if err != nil {
return err
}
Expand Down Expand Up @@ -1153,6 +1213,7 @@ func init() {
createCmd.Flags().IntVar(&bandwidth, "with-network-bandwidth", 0, "specify bandwidth restriction (in kbps) on the bridge interface when creating a qemu cluster")
createCmd.Flags().StringVar(&withFirewall, firewallFlag, "", "inject firewall rules into the cluster, value is default policy - accept/block (QEMU only)")
createCmd.Flags().BoolVar(&withUUIDHostnames, "with-uuid-hostnames", false, "use machine UUIDs as default hostnames (QEMU only)")
createCmd.Flags().BoolVar(&withSiderolinkAgent, "with-siderolink", false, "enables the use of siderolink agent as configuration apply mechanism")

Cmd.AddCommand(createCmd)
}
Expand Down Expand Up @@ -1192,3 +1253,111 @@ func checkForDefinedGenFlag(flags *pflag.FlagSet) string {

return ""
}

type generatedPorts struct {
wgPort string
apiPort string
sinkPort string
logPort string
}

func getDynamicPorts() (generatedPorts, error) {
var resultErr error

for range 10 {
wgPort, err := getDynamicPort("udp")
if err != nil {
return generatedPorts{}, fmt.Errorf("failed to get dynamic port for WireGuard: %w", err)
}

apiPort, err := getDynamicPort("tcp")
if err != nil {
return generatedPorts{}, fmt.Errorf("failed to get dynamic port for GRPC API: %w", err)
}

sinkPort, err := getDynamicPort("tcp")
if err != nil {
return generatedPorts{}, fmt.Errorf("failed to get dynamic port for Sink: %w", err)
}

logPort, err := getDynamicPort("tcp")
if err != nil {
return generatedPorts{}, fmt.Errorf("failed to get dynamic port for Log: %w", err)
}

resultErr = checkPortsDontOverlap(wgPort, apiPort, sinkPort, logPort)
if resultErr != nil {
continue
}

return generatedPorts{
wgPort: strconv.Itoa(wgPort),
apiPort: strconv.Itoa(apiPort),
sinkPort: strconv.Itoa(sinkPort),
logPort: strconv.Itoa(logPort),
}, nil
}

return generatedPorts{}, fmt.Errorf("failed to get non-overlapping dynamic ports in 10 attempts: %w", resultErr)
}

func getDynamicPort(network string) (int, error) {
var (
closeFn func() error
addrFn func() net.Addr
)

switch network {
case "tcp", "tcp4", "tcp6":
l, err := net.Listen(network, "127.0.0.1:0")
if err != nil {
return 0, err
}

addrFn, closeFn = l.Addr, l.Close
case "udp", "udp4", "udp6":
l, err := net.ListenPacket(network, "127.0.0.1:0")
if err != nil {
return 0, err
}

addrFn, closeFn = l.LocalAddr, l.Close
default:
return 0, fmt.Errorf("unsupported network: %s", network)
}

_, portStr, err := net.SplitHostPort(addrFn().String())
if err != nil {
return 0, handleCloseErr(err, closeFn())
}

port, err := strconv.Atoi(portStr)
if err != nil {
return 0, err
}

return port, handleCloseErr(nil, closeFn())
}

func handleCloseErr(err error, closeErr error) error {
switch {
case err != nil && closeErr != nil:
return fmt.Errorf("error: %w, close error: %w", err, closeErr)
case err == nil && closeErr != nil:
return closeErr
case err != nil && closeErr == nil:
return err
default:
return nil
}
}

func checkPortsDontOverlap(ports ...int) error {
slices.Sort(ports)

if len(ports) != len(slices.Compact(ports)) {
return errors.New("generated ports overlap")
}

return nil
}
42 changes: 42 additions & 0 deletions cmd/talosctl/cmd/mgmt/cluster/wg_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

//go:build linux

package cluster

import (
"fmt"
"net/netip"

"github.com/siderolabs/siderolink/pkg/wireguard"
)

type nodeAddrGenerator struct {
prefix netip.Prefix
nodeAddr netip.Addr
}

func makeNodeAddrGenerator() nodeAddrGenerator {
prefix := wireguard.NetworkPrefix("")
nodeAddr := prefix.Addr().Next()

return nodeAddrGenerator{
prefix: prefix,
nodeAddr: nodeAddr,
}
}

func (ng *nodeAddrGenerator) GenerateRandomNodeAddr() (netip.Addr, error) {
result, err := wireguard.GenerateRandomNodeAddr(ng.prefix)
if err != nil {
return netip.Addr{}, fmt.Errorf("failed to generate random node address: %w", err)
}

return result.Addr(), nil
}

func (ng *nodeAddrGenerator) GetAgentNodeAddr() string {
return ng.nodeAddr.String()
}
26 changes: 26 additions & 0 deletions cmd/talosctl/cmd/mgmt/cluster/wg_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

//go:build !linux

package cluster

import (
"errors"
"net/netip"
)

type nodeAddrGenerator struct{}

func (ng *nodeAddrGenerator) GenerateRandomNodeAddr() (netip.Addr, error) {
return netip.Addr{}, errors.New("unsupported platform")
}

func (ng *nodeAddrGenerator) GetAgentNodeAddr() string {
return ""
}

func makeNodeAddrGenerator() nodeAddrGenerator {
return nodeAddrGenerator{}
}
Loading

0 comments on commit 949ad11

Please sign in to comment.