Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CPLB virtual address fall back for kube API URL #772

Merged
merged 4 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 1 addition & 42 deletions phase/configure_k0s.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (p *ConfigureK0s) generateDefaultConfig() (string, error) {
func (p *ConfigureK0s) Run() error {
controllers := p.Config.Spec.Hosts.Controllers().Filter(func(h *cluster.Host) bool {
return !h.Reset && len(h.Metadata.K0sNewConfig) > 0
})
})
return p.parallelDo(controllers, p.configureK0s)
}

Expand Down Expand Up @@ -270,19 +270,6 @@ func (p *ConfigureK0s) configureK0s(h *cluster.Host) error {
return nil
}

func addUnlessExist(slice *[]string, s string) {
var found bool
for _, v := range *slice {
if v == s {
found = true
break
}
}
if !found {
*slice = append(*slice, s)
}
}

func (p *ConfigureK0s) configFor(h *cluster.Host) (string, error) {
var cfg dig.Mapping

Expand All @@ -298,40 +285,12 @@ func (p *ConfigureK0s) configFor(h *cluster.Host) (string, error) {
cfg = p.newBaseConfig.Dup()
}

var sans []string

var addr string
if h.PrivateAddress != "" {
addr = h.PrivateAddress
} else {
addr = h.Address()
}
cfg.DigMapping("spec", "api")["address"] = addr
addUnlessExist(&sans, addr)

oldsans := cfg.Dig("spec", "api", "sans")
switch oldsans := oldsans.(type) {
case []interface{}:
for _, v := range oldsans {
if s, ok := v.(string); ok {
addUnlessExist(&sans, s)
}
}
case []string:
for _, v := range oldsans {
addUnlessExist(&sans, v)
}
}

var controllers cluster.Hosts = p.Config.Spec.Hosts.Controllers()
for _, c := range controllers {
addUnlessExist(&sans, c.Address())
if c.PrivateAddress != "" {
addUnlessExist(&sans, c.PrivateAddress)
}
}
addUnlessExist(&sans, "127.0.0.1")
cfg.DigMapping("spec", "api")["sans"] = sans

if cfg.Dig("spec", "storage", "etcd", "peerAddress") != nil || h.PrivateAddress != "" {
cfg.DigMapping("spec", "storage", "etcd")["peerAddress"] = addr
Expand Down
44 changes: 1 addition & 43 deletions phase/get_kubeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@ package phase

import (
"fmt"
"strings"

"github.com/alessio/shellescape"
"github.com/k0sproject/dig"
"github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster"
"github.com/k0sproject/rig/exec"
"gopkg.in/yaml.v2"
"k8s.io/client-go/tools/clientcmd"
)

Expand All @@ -31,24 +28,6 @@ var readKubeconfig = func(h *cluster.Host) (string, error) {
return output, nil
}

var k0sConfig = func(h *cluster.Host) (dig.Mapping, error) {
cfgContent, err := h.Configurer.ReadFile(h, h.Configurer.K0sConfigPath())
if err != nil {
return nil, fmt.Errorf("read k0s config from host: %w", err)
}

var cfg dig.Mapping
if err := yaml.Unmarshal([]byte(cfgContent), &cfg); err != nil {
return nil, fmt.Errorf("unmarshal k0s config: %w", err)
}

if err != nil {
return nil, fmt.Errorf("parse k0s config: %w", err)
}

return cfg, nil
}

func (p *GetKubeconfig) DryRun() error {
p.DryMsg(p.Config.Spec.Hosts.Controllers()[0], "get admin kubeconfig")
return nil
Expand All @@ -58,34 +37,13 @@ func (p *GetKubeconfig) DryRun() error {
func (p *GetKubeconfig) Run() error {
h := p.Config.Spec.Hosts.Controllers()[0]

cfg, err := k0sConfig(h)
if err != nil {
return err
}

output, err := readKubeconfig(h)
if err != nil {
return fmt.Errorf("read kubeconfig from host: %w", err)
}

if p.APIAddress == "" {
// the controller admin.conf is aways pointing to localhost, thus we need to change the address
// something usable from outside
address := h.Address()
if a, ok := cfg.Dig("spec", "api", "externalAddress").(string); ok && a != "" {
address = a
}

port := 6443
if p, ok := cfg.Dig("spec", "api", "port").(int); ok && p != 0 {
port = p
}

if strings.Contains(address, ":") {
p.APIAddress = fmt.Sprintf("https://[%s]:%d", address, port)
} else {
p.APIAddress = fmt.Sprintf("https://%s:%d", address, port)
}
p.APIAddress = p.Config.Spec.KubeAPIURL()
}

cfgString, err := kubeConfig(output, p.Config.Metadata.Name, p.APIAddress)
Expand Down
6 changes: 0 additions & 6 deletions phase/get_kubeconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,6 @@ func TestGetKubeconfig(t *testing.T) {
defer func() { readKubeconfig = origReadKubeconfig }()
readKubeconfig = fakeReader

origK0sConfig := k0sConfig
defer func() { k0sConfig = origK0sConfig }()
k0sConfig = func(h *cluster.Host) (dig.Mapping, error) {
return cfg.Spec.K0s.Config, nil
}

p := GetKubeconfig{GenericPhase: GenericPhase{Config: cfg}}
require.NoError(t, p.Run())
conf, err := clientcmd.Load([]byte(cfg.Metadata.Kubeconfig))
Expand Down
8 changes: 1 addition & 7 deletions phase/initialize_k0s.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func (p *InitializeK0s) Run() error {
}
return nil
})

if err != nil {
return err
}
Expand All @@ -116,18 +115,13 @@ func (p *InitializeK0s) Run() error {
return err
}

port := 6443
if p, ok := p.Config.Spec.K0s.Config.Dig("spec", "api", "port").(int); ok {
port = p
}
log.Infof("%s: waiting for kubernetes api to respond", h)
if err := retry.Timeout(context.TODO(), retry.DefaultTimeout, node.KubeAPIReadyFunc(h, port)); err != nil {
if err := retry.Timeout(context.TODO(), retry.DefaultTimeout, node.KubeAPIReadyFunc(h, p.Config)); err != nil {
return err
}

return nil
})

if err != nil {
return err
}
Expand Down
9 changes: 2 additions & 7 deletions phase/install_controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (p *InstallControllers) After() error {

// Run the phase
func (p *InstallControllers) Run() error {
url := p.Config.Spec.KubeAPIURL()
url := p.Config.Spec.InternalKubeAPIURL()
healthz := fmt.Sprintf("%s/healthz", url)

err := p.parallelDo(p.hosts, func(h *cluster.Host) error {
Expand Down Expand Up @@ -191,11 +191,6 @@ func (p *InstallControllers) Run() error {
}

func (p *InstallControllers) waitJoined(h *cluster.Host) error {
port := 6443
if p, ok := p.Config.Spec.K0s.Config.Dig("spec", "api", "port").(int); ok {
port = p
}

log.Infof("%s: waiting for kubernetes api to respond", h)
return retry.Timeout(context.TODO(), retry.DefaultTimeout, node.KubeAPIReadyFunc(h, port))
return retry.Timeout(context.TODO(), retry.DefaultTimeout, node.KubeAPIReadyFunc(h, p.Config))
}
2 changes: 1 addition & 1 deletion phase/install_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (p *InstallWorkers) After() error {

// Run the phase
func (p *InstallWorkers) Run() error {
url := p.Config.Spec.KubeAPIURL()
url := p.Config.Spec.InternalKubeAPIURL()
healthz := fmt.Sprintf("%s/healthz", url)

err := p.parallelDo(p.hosts, func(h *cluster.Host) error {
Expand Down
6 changes: 1 addition & 5 deletions phase/upgrade_controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,9 @@ func (p *UpgradeControllers) Run() error {
if err != nil {
return err
}
port := 6443
if p, ok := p.Config.Spec.K0s.Config.Dig("spec", "api", "port").(int); ok {
port = p
}

if p.IsWet() {
if err := retry.Timeout(context.TODO(), retry.DefaultTimeout, node.KubeAPIReadyFunc(h, port)); err != nil {
if err := retry.Timeout(context.TODO(), retry.DefaultTimeout, node.KubeAPIReadyFunc(h, p.Config)); err != nil {
return fmt.Errorf("kube api did not become ready: %w", err)
}
}
Expand Down
75 changes: 62 additions & 13 deletions pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package cluster

import (
"fmt"
"strings"

"github.com/creasty/defaults"
"github.com/jellydator/validation"
"github.com/k0sproject/dig"
)

// Spec defines cluster config spec section
Expand Down Expand Up @@ -80,24 +82,71 @@ func (s *Spec) Validate() error {
)
}

// KubeAPIURL returns an url to the cluster's kube api
func (s *Spec) KubeAPIURL() string {
var caddr string
func (s *Spec) clusterExternalAddress() string {
if a := s.K0s.Config.DigString("spec", "api", "externalAddress"); a != "" {
caddr = a
} else {
leader := s.K0sLeader()
if leader.PrivateAddress != "" {
caddr = leader.PrivateAddress
} else {
caddr = leader.Address()
return a
}

if cplb, ok := s.K0s.Config.Dig("spec", "network", "controlPlaneLoadBalancing").(dig.Mapping); ok {
if enabled, ok := cplb.Dig("enabled").(bool); ok && enabled {
vrrpAddresses := cplb.Dig("virtualServers").([]string)
if len(vrrpAddresses) > 0 {
return vrrpAddresses[0]
}
}
}

cport := 6443
return s.K0sLeader().Address()
}

func (s *Spec) clusterInternalAddress() string {
leader := s.K0sLeader()
if leader.PrivateAddress != "" {
return leader.PrivateAddress
} else {
return leader.Address()
}
}

const defaultAPIPort = 6443

func (s *Spec) APIPort() int {
if p, ok := s.K0s.Config.Dig("spec", "api", "port").(int); ok {
cport = p
return p
}
return defaultAPIPort
}

return fmt.Sprintf("https://%s:%d", caddr, cport)
// KubeAPIURL returns an external url to the cluster's kube API
func (s *Spec) KubeAPIURL() string {
return fmt.Sprintf("https://%s:%d", formatIPV6(s.clusterExternalAddress()), s.APIPort())
}

// InternalKubeAPIURL returns a cluster internal url to the cluster's kube API
func (s *Spec) InternalKubeAPIURL() string {
return fmt.Sprintf("https://%s:%d", formatIPV6(s.clusterInternalAddress()), s.APIPort())
}

// NodeInternalKubeAPIURL returns a cluster internal url to the node's kube API
func (s *Spec) NodeInternalKubeAPIURL(h *Host) string {
addr := "127.0.0.1"

// spec.api.onlyBindToAddress was introduced in k0s 1.30. Setting it to true will make the API server only
// listen on the IP address configured by the `address` option.
if onlyBindAddr, ok := s.K0s.Config.Dig("spec", "api", "onlyBindToAddress").(bool); ok && onlyBindAddr {
if h.PrivateAddress != "" {
addr = h.PrivateAddress
} else {
addr = h.Address()
}
}

return fmt.Sprintf("https://%s:%d", formatIPV6(addr), s.APIPort())
}

func formatIPV6(address string) string {
if strings.Contains(address, ":") {
return fmt.Sprintf("[%s]", address)
}
return address
}
5 changes: 3 additions & 2 deletions pkg/node/statusfunc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"time"

"github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1"
"github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster"
"github.com/k0sproject/rig/exec"

Expand Down Expand Up @@ -195,8 +196,8 @@ func ServiceStoppedFunc(h *cluster.Host, service string) retryFunc {
}

// KubeAPIReadyFunc returns a function that returns an error unless the host's local kube api responds to /version
func KubeAPIReadyFunc(h *cluster.Host, port int) retryFunc {
func KubeAPIReadyFunc(h *cluster.Host, config *v1beta1.Cluster) retryFunc {
// If the anon-auth is disabled on kube api the version endpoint will give 401
// thus we need to accept both 200 and 401 as valid statuses when checking kube api
return HTTPStatusFunc(h, fmt.Sprintf("https://localhost:%d/version", port), 200, 401)
return HTTPStatusFunc(h, fmt.Sprintf("%s/version", config.Spec.NodeInternalKubeAPIURL(h)), 200, 401)
}
Loading