Skip to content

Commit

Permalink
feat: implement extension services
Browse files Browse the repository at this point in the history
Fixes #4694

User services run alongside with Talos system services.
Every user service container root filesystem should be already present
in the Talos root filesystem.

Signed-off-by: Andrey Smirnov <[email protected]>
  • Loading branch information
smira committed Feb 22, 2022
1 parent 063a9e1 commit b2bf311
Show file tree
Hide file tree
Showing 38 changed files with 1,156 additions and 45 deletions.
14 changes: 8 additions & 6 deletions .drone.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -358,13 +358,13 @@ local integration_provision_tests_track_0 = Step("provision-tests-track-0", priv
local integration_provision_tests_track_1 = Step("provision-tests-track-1", privileged=true, depends_on=[integration_provision_tests_prepare], environment={"IMAGE_REGISTRY": local_registry});
local integration_provision_tests_track_2 = Step("provision-tests-track-2", privileged=true, depends_on=[integration_provision_tests_prepare], environment={"IMAGE_REGISTRY": local_registry});

local integration_gvisor = Step("e2e-gvisor", target="e2e-qemu", privileged=true, depends_on=[load_artifacts], environment={
local integration_extensions = Step("e2e-extensions", target="e2e-qemu", privileged=true, depends_on=[load_artifacts], environment={
"SHORT_INTEGRATION_TEST": "yes",
"WITH_CONFIG_PATCH": '[{"op":"add","path":"/machine/install/extensions","value":[{"image":"ghcr.io/talos-systems/gvisor:54b831d"},{"image":"ghcr.io/talos-systems/intel-ucode:54b831d"}]},{"op":"add","path":"/machine/sysctls","value":{"user.max_user_namespaces": "11255"}}]',
"WITH_TEST": "run_gvisor_test",
"WITH_CONFIG_PATCH": '[{"op":"add","path":"/machine/install/extensions","value":[{"image":"ghcr.io/talos-systems/gvisor:54b831d"},{"image":"ghcr.io/talos-systems/intel-ucode:54b831d"},{"image":"ghcr.io/talos-systems/hello-world-service:a05f558"}]},{"op":"add","path":"/machine/sysctls","value":{"user.max_user_namespaces": "11255"}}]',
"WITH_TEST": "run_extensions_test",
"IMAGE_REGISTRY": local_registry,
});
local integration_cilium = Step("e2e-cilium-1.9.10", target="e2e-qemu", privileged=true, depends_on=[integration_gvisor], environment={
local integration_cilium = Step("e2e-cilium-1.9.10", target="e2e-qemu", privileged=true, depends_on=[integration_extensions], environment={
"SHORT_INTEGRATION_TEST": "yes",
"CUSTOM_CNI_URL": "https://raw.githubusercontent.com/cilium/cilium/v1.9.10/install/kubernetes/quick-install.yaml",
"WITH_CONFIG_PATCH": '[{"op": "replace", "path": "/cluster/network/podSubnets", "value": ["10.0.0.0/8"]}]', # use Pod CIDRs as hardcoded in Cilium's quick-install
Expand Down Expand Up @@ -451,7 +451,8 @@ local integration_pipelines = [
Pipeline('integration-provision-0', default_pipeline_steps + [integration_provision_tests_prepare, integration_provision_tests_track_0]) + integration_trigger(['integration-provision', 'integration-provision-0']),
Pipeline('integration-provision-1', default_pipeline_steps + [integration_provision_tests_prepare, integration_provision_tests_track_1]) + integration_trigger(['integration-provision', 'integration-provision-1']),
Pipeline('integration-provision-2', default_pipeline_steps + [integration_provision_tests_prepare, integration_provision_tests_track_2]) + integration_trigger(['integration-provision', 'integration-provision-2']),
Pipeline('integration-misc', default_pipeline_steps + [integration_gvisor, integration_cilium, integration_uefi, integration_disk_image, integration_canal_reset, integration_no_cluster_discovery, integration_kubespan]) + integration_trigger(['integration-misc']),
Pipeline('integration-misc', default_pipeline_steps + [integration_extensions
, integration_cilium, integration_uefi, integration_disk_image, integration_canal_reset, integration_no_cluster_discovery, integration_kubespan]) + integration_trigger(['integration-misc']),
Pipeline('integration-qemu-encrypted-vip', default_pipeline_steps + [integration_qemu_encrypted_vip]) + integration_trigger(['integration-qemu-encrypted-vip']),
Pipeline('integration-qemu-race', default_pipeline_steps + [build_race, integration_qemu_race]) + integration_trigger(['integration-qemu-race']),
Pipeline('integration-qemu-csi', default_pipeline_steps + [integration_qemu_csi]) + integration_trigger(['integration-qemu-csi']),
Expand All @@ -462,7 +463,8 @@ local integration_pipelines = [
Pipeline('cron-integration-provision-0', default_pipeline_steps + [integration_provision_tests_prepare, integration_provision_tests_track_0], [default_cron_pipeline]) + cron_trigger(['thrice-daily', 'nightly']),
Pipeline('cron-integration-provision-1', default_pipeline_steps + [integration_provision_tests_prepare, integration_provision_tests_track_1], [default_cron_pipeline]) + cron_trigger(['thrice-daily', 'nightly']),
Pipeline('cron-integration-provision-2', default_pipeline_steps + [integration_provision_tests_prepare, integration_provision_tests_track_2], [default_cron_pipeline]) + cron_trigger(['thrice-daily', 'nightly']),
Pipeline('cron-integration-misc', default_pipeline_steps + [integration_gvisor, integration_cilium, integration_uefi, integration_disk_image, integration_canal_reset, integration_no_cluster_discovery, integration_kubespan], [default_cron_pipeline]) + cron_trigger(['thrice-daily', 'nightly']),
Pipeline('cron-integration-misc', default_pipeline_steps + [integration_extensions
, integration_cilium, integration_uefi, integration_disk_image, integration_canal_reset, integration_no_cluster_discovery, integration_kubespan], [default_cron_pipeline]) + cron_trigger(['thrice-daily', 'nightly']),
Pipeline('cron-integration-qemu-encrypted-vip', default_pipeline_steps + [integration_qemu_encrypted_vip], [default_cron_pipeline]) + cron_trigger(['thrice-daily', 'nightly']),
Pipeline('cron-integration-qemu-race', default_pipeline_steps + [build_race, integration_qemu_race], [default_cron_pipeline]) + cron_trigger(['nightly']),
Pipeline('cron-integration-qemu-csi', default_pipeline_steps + [integration_qemu_csi], [default_cron_pipeline]) + cron_trigger(['nightly']),
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ COPY --from=go-generate /src/pkg/machinery/resources/kubespan/ /pkg/machinery/re
COPY --from=go-generate /src/pkg/machinery/resources/network/ /pkg/machinery/resources/network/
COPY --from=go-generate /src/pkg/machinery/config/types/v1alpha1/ /pkg/machinery/config/types/v1alpha1/
COPY --from=go-generate /src/pkg/machinery/nethelpers/ /pkg/machinery/nethelpers/
COPY --from=go-generate /src/pkg/machinery/extensions/ /pkg/machinery/extensions/

# The base target provides a container that can be used to build all Talos
# assets.
Expand Down
7 changes: 7 additions & 0 deletions hack/release.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ Old behavior can be achieved by specifiying empty flag value: `--kubernetes-vers
description="""\
Pod Security Policy Kubernetes feature is deprecated and is going to be removed in Kubernetes 1.25.
Talos by default skips setting up PSP now (see machine configuration `.cluster.apiServer.disablePodSecurityPolicy`).
"""

[notes.extservices]
title = "Extension Services"
description = """\
Talos now provides a way to extend set of system services Talos runs with extension services.
Extension services should be included in the Talos root filesystem (e.g. via system extensions).
"""

[make_deps]
Expand Down
9 changes: 8 additions & 1 deletion hack/test/e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,17 @@ function build_registry_mirrors {
fi
}

function run_gvisor_test {
function run_extensions_test {
echo "Testing gVsisor..."
${KUBECTL} apply -f ${PWD}/hack/test/gvisor/manifest.yaml
sleep 10
${KUBECTL} wait --for=condition=ready pod nginx-gvisor --timeout=1m

echo "Testing firmware extension..."
${TALOSCTL} ls -lr /lib/firmware | grep intel-ucode

echo "Testing extension service..."
curl http://172.20.1.2/ | grep Hello
}

function run_csi_tests {
Expand Down
22 changes: 2 additions & 20 deletions internal/app/machined/pkg/controllers/k8s/kubelet_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/talos-systems/talos/pkg/machinery/resources/files"
"github.com/talos-systems/talos/pkg/machinery/resources/k8s"
"github.com/talos-systems/talos/pkg/machinery/resources/secrets"
"github.com/talos-systems/talos/pkg/machinery/resources/v1alpha1"
)

// ServiceManager is the interface to the v1alpha1 services subsystems.
Expand Down Expand Up @@ -64,14 +63,8 @@ func (ctrl *KubeletServiceController) Outputs() []controller.Output {
//
//nolint:gocyclo,cyclop
func (ctrl *KubeletServiceController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
// initially, wait for the cri to be up and for machine-id to be generated
// initially, wait for the machine-id to be generated
if err := r.UpdateInputs([]controller.Input{
{
Namespace: v1alpha1.NamespaceName,
Type: v1alpha1.ServiceType,
ID: pointer.ToString("cri"),
Kind: controller.InputWeak,
},
{
Namespace: files.NamespaceName,
Type: files.EtcFileStatusType,
Expand All @@ -98,18 +91,7 @@ func (ctrl *KubeletServiceController) Run(ctx context.Context, r controller.Runt
return fmt.Errorf("error getting etc file status: %w", err)
}

svc, err := r.Get(ctx, resource.NewMetadata(v1alpha1.NamespaceName, v1alpha1.ServiceType, "cri", resource.VersionUndefined))
if err != nil {
if state.IsNotFoundError(err) {
continue
}

return fmt.Errorf("error getting service: %w", err)
}

if svc.(*v1alpha1.Service).Healthy() && svc.(*v1alpha1.Service).Running() {
break
}
break
}

// normal reconcile loop, ignore cri state
Expand Down
131 changes: 131 additions & 0 deletions internal/app/machined/pkg/controllers/runtime/extension_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package runtime

import (
"context"
"fmt"
"os"
"path/filepath"

"github.com/cosi-project/runtime/pkg/controller"
"go.uber.org/zap"
"gopkg.in/yaml.v3"

"github.com/talos-systems/talos/internal/app/machined/pkg/system"
"github.com/talos-systems/talos/internal/app/machined/pkg/system/services"
extservices "github.com/talos-systems/talos/pkg/machinery/extensions/services"
)

// ServiceManager is the interface to the v1alpha1 services subsystems.
type ServiceManager interface {
Load(services ...system.Service) []string
Start(serviceIDs ...string) error
}

// ExtensionServiceController creates extension services based on the extension service configuration found in the rootfs.
type ExtensionServiceController struct {
V1Alpha1Services ServiceManager
ConfigPath string
}

// Name implements controller.Controller interface.
func (ctrl *ExtensionServiceController) Name() string {
return "runtime.ExtensionServiceController"
}

// Inputs implements controller.Controller interface.
func (ctrl *ExtensionServiceController) Inputs() []controller.Input {
return nil
}

// Outputs implements controller.Controller interface.
func (ctrl *ExtensionServiceController) Outputs() []controller.Output {
return nil
}

// Run implements controller.Controller interface.
//
//nolint:gocyclo
func (ctrl *ExtensionServiceController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
}

// controller runs only once, as services are static
serviceFiles, err := os.ReadDir(ctrl.ConfigPath)
if err != nil {
if os.IsNotExist(err) {
// directory not present, skip completely
logger.Debug("extension service directory is not found")

return nil
}

return err
}

extServices := map[string]struct{}{}

for _, serviceFile := range serviceFiles {
if filepath.Ext(serviceFile.Name()) != ".yaml" {
logger.Debug("skipping config file", zap.String("filename", serviceFile.Name()))

continue
}

spec, err := ctrl.loadSpec(filepath.Join(ctrl.ConfigPath, serviceFile.Name()))
if err != nil {
logger.Error("error loading extension service spec", zap.String("filename", serviceFile.Name()), zap.Error(err))

continue
}

if err = spec.Validate(); err != nil {
logger.Error("error validating extension service spec", zap.String("filename", serviceFile.Name()), zap.Error(err))

continue
}

if _, exists := extServices[spec.Name]; exists {
logger.Error("duplicate service spec", zap.String("filename", serviceFile.Name()), zap.String("name", spec.Name))

continue
}

extServices[spec.Name] = struct{}{}

svc := &services.Extension{
Spec: spec,
}

ctrl.V1Alpha1Services.Load(svc)

if err = ctrl.V1Alpha1Services.Start(svc.ID(nil)); err != nil {
return fmt.Errorf("error starting %q service: %w", spec.Name, err)
}
}

return nil
}

func (ctrl *ExtensionServiceController) loadSpec(path string) (*extservices.Spec, error) {
var spec extservices.Spec

f, err := os.Open(path)
if err != nil {
return nil, err
}

defer f.Close() //nolint:errcheck

if err = yaml.NewDecoder(f).Decode(&spec); err != nil {
return nil, fmt.Errorf("error unmarshalling extension service config: %w", err)
}

return &spec, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package runtime_test

import (
"fmt"
"reflect"
"sort"
"sync"
"testing"
"time"

"github.com/stretchr/testify/suite"
"github.com/talos-systems/go-retry/retry"

runtimecontrollers "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/runtime"
"github.com/talos-systems/talos/internal/app/machined/pkg/system"
"github.com/talos-systems/talos/internal/app/machined/pkg/system/services"
)

type ExtensionServiceSuite struct {
RuntimeSuite
}

type serviceMock struct {
mu sync.Mutex
services map[string]system.Service
}

func (mock *serviceMock) Load(services ...system.Service) []string {
mock.mu.Lock()
defer mock.mu.Unlock()

ids := []string{}

for _, svc := range services {
mock.services[svc.ID(nil)] = svc
ids = append(ids, svc.ID(nil))
}

return ids
}

func (mock *serviceMock) Start(serviceIDs ...string) error {
return nil
}

func (mock *serviceMock) getIDs() []string {
mock.mu.Lock()
defer mock.mu.Unlock()

ids := []string{}

for id := range mock.services {
ids = append(ids, id)
}

sort.Strings(ids)

return ids
}

func (mock *serviceMock) get(id string) system.Service {
mock.mu.Lock()
defer mock.mu.Unlock()

return mock.services[id]
}

func (suite *ExtensionServiceSuite) TestReconcile() {
svcMock := &serviceMock{
services: map[string]system.Service{},
}

suite.Require().NoError(suite.runtime.RegisterController(&runtimecontrollers.ExtensionServiceController{
V1Alpha1Services: svcMock,
ConfigPath: "testdata/extservices/",
}))

suite.startRuntime()

suite.Assert().NoError(retry.Constant(10*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
func() error {
ids := svcMock.getIDs()

if !reflect.DeepEqual(ids, []string{"ext-hello-world"}) {
return retry.ExpectedError(fmt.Errorf("services registered: %q", ids))
}

return nil
},
))

helloSvc := svcMock.get("ext-hello-world")
suite.Require().IsType(&services.Extension{}, helloSvc)

suite.Assert().Equal("./hello-world", helloSvc.(*services.Extension).Spec.Container.Entrypoint)
}

func TestExtensionServiceSuite(t *testing.T) {
suite.Run(t, new(ExtensionServiceSuite))
}
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
name: hello-world
container:
entrypoint: ./hello-world
args:
- --msg
- Talos Linux Extension Service
depends:
- network:
- addresses
restart: always
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name: invalid
container:
entrypoint: ./hello-world
args:
- --msg
- Talos Linux Extension Service
depends:
- nothing: true
restart: random
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name: hello-world
container:
entrypoint: ./duplicate
args:
- should not get registered
depends:
- network:
- addresses
restart: always
Loading

0 comments on commit b2bf311

Please sign in to comment.