Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: pass configured timeouts to plugins. #109

Merged
merged 5 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ ginkgo-tests:
--coverprofile coverprofile \
--succinct \
--skip-package $(SKIPPED_PKGS) \
-r .; \
-r && \
$(GO_CMD) tool cover -html=$(COVERAGE_PATH)/coverprofile -o $(COVERAGE_PATH)/coverage.html

test-ulimits:
Expand Down
34 changes: 33 additions & 1 deletion pkg/adaptation/adaptation.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Adaptation struct {
serverOpts []ttrpc.ServerOpt
listener net.Listener
plugins []*plugin
syncLock sync.RWMutex
}

var (
Expand Down Expand Up @@ -135,6 +136,7 @@ func New(name, version string, syncFn SyncFn, updateFn UpdateFn, opts ...Option)
pluginPath: DefaultPluginPath,
dropinPath: DefaultPluginConfigPath,
socketPath: DefaultSocketPath,
syncLock: sync.RWMutex{},
}

for _, o := range opts {
Expand Down Expand Up @@ -464,6 +466,8 @@ func (r *Adaptation) acceptPluginConnections(l net.Listener) error {
continue
}

r.requestPluginSync()

err = r.syncFn(ctx, p.synchronize)
if err != nil {
log.Infof(ctx, "failed to synchronize plugin: %v", err)
Expand All @@ -472,9 +476,10 @@ func (r *Adaptation) acceptPluginConnections(l net.Listener) error {
r.plugins = append(r.plugins, p)
r.sortPlugins()
r.Unlock()

log.Infof(ctx, "plugin %q connected and synchronized", p.name())
}

r.finishedPluginSync()
}
}()

Expand Down Expand Up @@ -545,3 +550,30 @@ func (r *Adaptation) sortPlugins() {
}
}
}

func (r *Adaptation) requestPluginSync() {
r.syncLock.Lock()
}

func (r *Adaptation) finishedPluginSync() {
r.syncLock.Unlock()
}

type PluginSyncBlock struct {
r *Adaptation
}

// BlockPluginSync blocks plugins from being synchronized/fully registered.
func (r *Adaptation) BlockPluginSync() *PluginSyncBlock {
r.syncLock.RLock()
return &PluginSyncBlock{r: r}
}

// Unblock a plugin sync. block put in place by BlockPluginSync. Safe to call
// multiple times but only from a single goroutine.
func (b *PluginSyncBlock) Unblock() {
if b != nil && b.r != nil {
b.r.syncLock.RUnlock()
b.r = nil
}
}
102 changes: 84 additions & 18 deletions pkg/adaptation/adaptation_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,12 +582,12 @@ var _ = Describe("Plugin container creation adjustments", func() {
s.Startup()

podReq := &api.RunPodSandboxRequest{Pod: pod}
Expect(runtime.runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
Expect(runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
ctrReq := &api.CreateContainerRequest{
Pod: pod,
Container: ctr,
}
reply, err := runtime.runtime.CreateContainer(ctx, ctrReq)
reply, err := runtime.CreateContainer(ctx, ctrReq)
Expect(err).To(BeNil())
Expect(stripAdjustment(reply.Adjust)).Should(Equal(stripAdjustment(expected)))
},
Expand Down Expand Up @@ -792,12 +792,12 @@ var _ = Describe("Plugin container creation adjustments", func() {
s.Startup()

podReq := &api.RunPodSandboxRequest{Pod: pod}
Expect(runtime.runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
Expect(runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
ctrReq := &api.CreateContainerRequest{
Pod: pod,
Container: ctr,
}
reply, err := runtime.runtime.CreateContainer(ctx, ctrReq)
reply, err := runtime.CreateContainer(ctx, ctrReq)
if shouldFail {
Expect(err).ToNot(BeNil())
} else {
Expand Down Expand Up @@ -983,21 +983,21 @@ var _ = Describe("Plugin container updates during creation", func() {
s.Startup()

podReq := &api.RunPodSandboxRequest{Pod: pod0}
Expect(runtime.runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
Expect(runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
ctrReq := &api.CreateContainerRequest{
Pod: pod0,
Container: ctr0,
}
_, err := runtime.runtime.CreateContainer(ctx, ctrReq)
_, err := runtime.CreateContainer(ctx, ctrReq)
Expect(err).To(BeNil())

podReq = &api.RunPodSandboxRequest{Pod: pod1}
Expect(runtime.runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
Expect(runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
ctrReq = &api.CreateContainerRequest{
Pod: pod1,
Container: ctr1,
}
reply, err = runtime.runtime.CreateContainer(ctx, ctrReq)
reply, err = runtime.CreateContainer(ctx, ctrReq)
Expect(err).To(BeNil())

Expect(len(reply.Update)).To(Equal(1))
Expand Down Expand Up @@ -1137,21 +1137,21 @@ var _ = Describe("Plugin container updates during creation", func() {
s.Startup()

podReq := &api.RunPodSandboxRequest{Pod: pod0}
Expect(runtime.runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
Expect(runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
ctrReq := &api.CreateContainerRequest{
Pod: pod0,
Container: ctr0,
}
_, err := runtime.runtime.CreateContainer(ctx, ctrReq)
_, err := runtime.CreateContainer(ctx, ctrReq)
Expect(err).To(BeNil())

podReq = &api.RunPodSandboxRequest{Pod: pod1}
Expect(runtime.runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
Expect(runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
ctrReq = &api.CreateContainerRequest{
Pod: pod1,
Container: ctr1,
}
reply, err = runtime.runtime.CreateContainer(ctx, ctrReq)
reply, err = runtime.CreateContainer(ctx, ctrReq)
if which == "both" {
Expect(err).ToNot(BeNil())
} else {
Expand Down Expand Up @@ -1341,12 +1341,12 @@ var _ = Describe("Solicited container updates by plugins", func() {
s.Startup()

podReq := &api.RunPodSandboxRequest{Pod: pod0}
Expect(runtime.runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
Expect(runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
ctrReq := &api.CreateContainerRequest{
Pod: pod0,
Container: ctr0,
}
_, err := runtime.runtime.CreateContainer(ctx, ctrReq)
_, err := runtime.CreateContainer(ctx, ctrReq)
Expect(err).To(BeNil())

updReq := &api.UpdateContainerRequest{
Expand Down Expand Up @@ -1374,7 +1374,7 @@ var _ = Describe("Solicited container updates by plugins", func() {
},
},
}
reply, err = runtime.runtime.UpdateContainer(ctx, updReq)
reply, err = runtime.UpdateContainer(ctx, updReq)

Expect(len(reply.Update)).To(Equal(1))
Expect(err).To(BeNil())
Expand Down Expand Up @@ -1580,12 +1580,12 @@ var _ = Describe("Solicited container updates by plugins", func() {
s.Startup()

podReq := &api.RunPodSandboxRequest{Pod: pod0}
Expect(runtime.runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
Expect(runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
ctrReq := &api.CreateContainerRequest{
Pod: pod0,
Container: ctr0,
}
_, err := runtime.runtime.CreateContainer(ctx, ctrReq)
_, err := runtime.CreateContainer(ctx, ctrReq)
Expect(err).To(BeNil())

updReq := &api.UpdateContainerRequest{
Expand Down Expand Up @@ -1613,7 +1613,7 @@ var _ = Describe("Solicited container updates by plugins", func() {
},
},
}
reply, err = runtime.runtime.UpdateContainer(ctx, updReq)
reply, err = runtime.UpdateContainer(ctx, updReq)
if which == "both" {
Expect(err).ToNot(BeNil())
} else {
Expand Down Expand Up @@ -1869,6 +1869,72 @@ var _ = Describe("Unsolicited container update requests", func() {
})
})

var _ = Describe("Plugin configuration request", func() {
var (
s = &Suite{}
)

AfterEach(func() {
s.Cleanup()
})

BeforeEach(func() {
s.Prepare(&mockRuntime{}, &mockPlugin{idx: "00", name: "test"})
})

It("should pass runtime version information to plugins", func() {
var (
runtimeName = "test-runtime"
runtimeVersion = "1.2.3"
)

s.runtime.name = runtimeName
s.runtime.version = runtimeVersion

s.Startup()

Expect(s.plugins[0].RuntimeName()).To(Equal(runtimeName))
Expect(s.plugins[0].RuntimeVersion()).To(Equal(runtimeVersion))
})

When("unchanged", func() {
It("should pass default timeout information to plugins", func() {
var (
registerTimeout = nri.DefaultPluginRegistrationTimeout
requestTimeout = nri.DefaultPluginRequestTimeout
)

s.Startup()
Expect(s.plugins[0].stub.RegistrationTimeout()).To(Equal(registerTimeout))
Expect(s.plugins[0].stub.RequestTimeout()).To(Equal(requestTimeout))
})
})

When("reconfigured", func() {
var (
registerTimeout = nri.DefaultPluginRegistrationTimeout + 5*time.Millisecond
requestTimeout = nri.DefaultPluginRequestTimeout + 7*time.Millisecond
)

BeforeEach(func() {
nri.SetPluginRegistrationTimeout(registerTimeout)
nri.SetPluginRequestTimeout(requestTimeout)
s.Prepare(&mockRuntime{}, &mockPlugin{idx: "00", name: "test"})
})

AfterEach(func() {
nri.SetPluginRegistrationTimeout(nri.DefaultPluginRegistrationTimeout)
nri.SetPluginRequestTimeout(nri.DefaultPluginRequestTimeout)
})

It("should pass configured timeout information to plugins", func() {
s.Startup()
Expect(s.plugins[0].stub.RegistrationTimeout()).To(Equal(registerTimeout))
Expect(s.plugins[0].stub.RequestTimeout()).To(Equal(requestTimeout))
})
})
})

// Notes:
//
// XXX FIXME KLUDGE
Expand Down
12 changes: 7 additions & 5 deletions pkg/adaptation/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ import (

const (
// DefaultPluginRegistrationTimeout is the default timeout for plugin registration.
DefaultPluginRegistrationTimeout = 5 * time.Second
DefaultPluginRegistrationTimeout = api.DefaultPluginRegistrationTimeout
// DefaultPluginRequestTimeout is the default timeout for plugins to handle a request.
DefaultPluginRequestTimeout = 2 * time.Second
DefaultPluginRequestTimeout = api.DefaultPluginRequestTimeout
)

var (
Expand Down Expand Up @@ -384,9 +384,11 @@ func (p *plugin) configure(ctx context.Context, name, version, config string) er
defer cancel()

rpl, err := p.stub.Configure(ctx, &ConfigureRequest{
Config: config,
RuntimeName: name,
RuntimeVersion: version,
Config: config,
RuntimeName: name,
RuntimeVersion: version,
RegistrationTimeout: getPluginRegistrationTimeout().Milliseconds(),
RequestTimeout: getPluginRequestTimeout().Milliseconds(),
})
if err != nil {
return fmt.Errorf("failed to configure plugin: %w", err)
Expand Down
Loading
Loading