Skip to content

Commit

Permalink
fix: pick etcd adverised addresses from 'current' addresses
Browse files Browse the repository at this point in the history
Fixes #7947

This way etcd advertised address can be picked from the `external IPs`
of the machine.

Signed-off-by: Andrey Smirnov <[email protected]>
  • Loading branch information
smira committed Dec 1, 2023
1 parent 6b5bc8b commit c6835de
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 26 deletions.
45 changes: 34 additions & 11 deletions internal/app/machined/pkg/controllers/etcd/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func (ctrl *SpecController) Inputs() []controller.Input {
{
Namespace: network.NamespaceName,
Type: network.NodeAddressType,
ID: optional.Some(network.FilteredNodeAddressID(network.NodeAddressRoutedID, k8s.NodeAddressFilterNoK8s)),
Kind: controller.InputWeak,
},
}
Expand Down Expand Up @@ -94,7 +93,7 @@ func (ctrl *SpecController) Run(ctx context.Context, r controller.Runtime, logge
return fmt.Errorf("error getting hostname status: %w", err)
}

nodeAddrs, err := safe.ReaderGet[*network.NodeAddress](
nodeRoutedAddrs, err := safe.ReaderGet[*network.NodeAddress](
ctx,
r,
resource.NewMetadata(
Expand All @@ -112,10 +111,29 @@ func (ctrl *SpecController) Run(ctx context.Context, r controller.Runtime, logge
return fmt.Errorf("error getting addresses: %w", err)
}

addrs := nodeAddrs.TypedSpec().IPs()
nodeCurrentAddrs, err := safe.ReaderGet[*network.NodeAddress](
ctx,
r,
resource.NewMetadata(
network.NamespaceName,
network.NodeAddressType,
network.FilteredNodeAddressID(network.NodeAddressCurrentID, k8s.NodeAddressFilterNoK8s),
resource.VersionUndefined,
),
)
if err != nil {
if state.IsNotFoundError(err) {
continue
}

return fmt.Errorf("error getting addresses: %w", err)
}

routedAddrs := nodeRoutedAddrs.TypedSpec().IPs()
currentAddrs := nodeCurrentAddrs.TypedSpec().IPs()

// need at least a single address
if len(addrs) == 0 {
if len(routedAddrs) == 0 {
continue
}

Expand All @@ -137,7 +155,7 @@ func (ctrl *SpecController) Run(ctx context.Context, r controller.Runtime, logge
defaultListenAddress := netip.AddrFrom4([4]byte{0, 0, 0, 0})
loopbackAddress := netip.AddrFrom4([4]byte{127, 0, 0, 1})

for _, ip := range addrs {
for _, ip := range routedAddrs {
if ip.Is6() {
defaultListenAddress = netip.IPv6Unspecified()
loopbackAddress = netip.MustParseAddr("::1")
Expand All @@ -152,20 +170,25 @@ func (ctrl *SpecController) Run(ctx context.Context, r controller.Runtime, logge
listenClientIPs []netip.Addr
)

advertisedIPs, err = net.FilterIPs(addrs, advertisedCIDRs)
if err != nil {
return fmt.Errorf("error filtering IPs: %w", err)
}

if len(etcdConfig.TypedSpec().AdvertiseValidSubnets) == 0 {
advertisedIPs, err = net.FilterIPs(routedAddrs, advertisedCIDRs)
if err != nil {
return fmt.Errorf("error filtering IPs: %w", err)
}

// if advertise subnet is not set, advertise the first address
if len(advertisedIPs) > 0 {
advertisedIPs = advertisedIPs[:1]
}
} else {
advertisedIPs, err = net.FilterIPs(currentAddrs, advertisedCIDRs)
if err != nil {
return fmt.Errorf("error filtering IPs: %w", err)
}
}

if len(listenCIDRs) > 0 {
listenPeerIPs, err = net.FilterIPs(addrs, listenCIDRs)
listenPeerIPs, err = net.FilterIPs(routedAddrs, listenCIDRs)
if err != nil {
return fmt.Errorf("error filtering IPs: %w", err)
}
Expand Down
36 changes: 21 additions & 15 deletions internal/app/machined/pkg/controllers/etcd/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ import (
"testing"
"time"

"github.com/cosi-project/runtime/pkg/safe"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/siderolabs/talos/internal/app/machined/pkg/controllers/ctest"
Expand All @@ -26,6 +24,7 @@ func TestSpecSuite(t *testing.T) {

suite.Run(t, &SpecSuite{
DefaultSuite: ctest.DefaultSuite{
Timeout: 3 * time.Second,
AfterSetup: func(suite *ctest.DefaultSuite) {
suite.Require().NoError(suite.Runtime().RegisterController(&etcdctrl.SpecController{}))
},
Expand All @@ -43,20 +42,32 @@ func (suite *SpecSuite) TestReconcile() {
hostnameStatus.TypedSpec().Domainname = "some.domain"
suite.Require().NoError(suite.State().Create(suite.Ctx(), hostnameStatus))

addresses := network.NewNodeAddress(
routedAddresses := network.NewNodeAddress(
network.NamespaceName,
network.FilteredNodeAddressID(network.NodeAddressRoutedID, k8s.NodeAddressFilterNoK8s),
)

addresses.TypedSpec().Addresses = []netip.Prefix{
routedAddresses.TypedSpec().Addresses = []netip.Prefix{
netip.MustParsePrefix("10.0.0.5/24"),
netip.MustParsePrefix("192.168.1.1/24"),
netip.MustParsePrefix("192.168.1.50/32"),
netip.MustParsePrefix("2001:0db8:85a3:0000:0000:8a2e:0370:7334/64"),
netip.MustParsePrefix("2002:0db8:85a3:0000:0000:8a2e:0370:7335/64"),
}

suite.Require().NoError(suite.State().Create(suite.Ctx(), addresses))
suite.Require().NoError(suite.State().Create(suite.Ctx(), routedAddresses))

currentAddrs := network.NewNodeAddress(
network.NamespaceName,
network.FilteredNodeAddressID(network.NodeAddressCurrentID, k8s.NodeAddressFilterNoK8s),
)

currentAddrs.TypedSpec().Addresses = append(
[]netip.Prefix{netip.MustParsePrefix("1.3.5.7/32")},
routedAddresses.TypedSpec().Addresses...,
)

suite.Require().NoError(suite.State().Create(suite.Ctx(), currentAddrs))

for _, tt := range []struct {
name string
Expand Down Expand Up @@ -116,6 +127,7 @@ func (suite *SpecSuite) TestReconcile() {
Image: "foo/bar:v1.0.0",
AdvertiseValidSubnets: []string{
"192.168.0.0/16",
"1.3.5.7/32",
},
},
expected: etcd.SpecSpec{
Expand All @@ -124,6 +136,7 @@ func (suite *SpecSuite) TestReconcile() {
AdvertisedAddresses: []netip.Addr{
netip.MustParseAddr("192.168.1.1"),
netip.MustParseAddr("192.168.1.50"),
netip.MustParseAddr("1.3.5.7"),
},
ListenPeerAddresses: []netip.Addr{
netip.IPv6Unspecified(),
Expand Down Expand Up @@ -197,16 +210,9 @@ func (suite *SpecSuite) TestReconcile() {

suite.Require().NoError(suite.State().Create(suite.Ctx(), etcdConfig))

suite.AssertWithin(3*time.Second, 100*time.Millisecond, ctest.WrapRetry(func(assert *assert.Assertions, require *require.Assertions) {
etcdSpec, err := safe.StateGet[*etcd.Spec](suite.Ctx(), suite.State(), etcd.NewSpec(etcd.NamespaceName, etcd.SpecID).Metadata())
if err != nil {
assert.NoError(err)

return
}

assert.Equal(tt.expected, *etcdSpec.TypedSpec(), "spec %v", *etcdSpec.TypedSpec())
}))
ctest.AssertResource(suite, etcd.SpecID, func(etcdSpec *etcd.Spec, asrt *assert.Assertions) {
asrt.Equal(tt.expected, *etcdSpec.TypedSpec(), "spec %v", *etcdSpec.TypedSpec())
})

suite.Require().NoError(suite.State().Destroy(suite.Ctx(), etcdConfig.Metadata()))
})
Expand Down

0 comments on commit c6835de

Please sign in to comment.