Skip to content

Commit

Permalink
tmc e2e : Split SyncerFixture...
Browse files Browse the repository at this point in the history
... to provide a way to only maintain
heartbeat and import apis without
effectively syncing.

And change all the candidate tests to use the new function.

Signed-off-by: David Festal <[email protected]>
  • Loading branch information
davidfestal committed Feb 6, 2023
1 parent 669dd8d commit 8a9580d
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 94 deletions.
17 changes: 12 additions & 5 deletions pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/klog/v2"

workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned"
kcpclusterclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions"
kcpfeatures "github.com/kcp-dev/kcp/pkg/features"
Expand Down Expand Up @@ -394,6 +395,14 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i
go startSyncerTunnel(ctx, upstreamConfig, downstreamConfig, logicalcluster.From(syncTarget), cfg.SyncTargetName)
}

StartHeartbeatKeeper(ctx, kcpSyncTargetClient, cfg.SyncTargetName, cfg.SyncTargetUID)

return nil
}

func StartHeartbeatKeeper(ctx context.Context, kcpSyncTargetClient kcpclientset.Interface, syncTargetName, syncTargetUID string) {
logger := klog.FromContext(ctx)

// Attempt to heartbeat every interval
go wait.UntilWithContext(ctx, func(ctx context.Context) {
var heartbeatTime time.Time
Expand All @@ -402,20 +411,18 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i
// Attempt to heartbeat every second until successful. Errors are logged instead of being returned so the
// poll error can be safely ignored.
_ = wait.PollImmediateInfiniteWithContext(ctx, 1*time.Second, func(ctx context.Context) (bool, error) {
patchBytes := []byte(fmt.Sprintf(`[{"op":"test","path":"/metadata/uid","value":%q},{"op":"replace","path":"/status/lastSyncerHeartbeatTime","value":%q}]`, cfg.SyncTargetUID, time.Now().Format(time.RFC3339)))
syncTarget, err = kcpSyncTargetClient.WorkloadV1alpha1().SyncTargets().Patch(ctx, cfg.SyncTargetName, types.JSONPatchType, patchBytes, metav1.PatchOptions{}, "status")
patchBytes := []byte(fmt.Sprintf(`[{"op":"test","path":"/metadata/uid","value":%q},{"op":"replace","path":"/status/lastSyncerHeartbeatTime","value":%q}]`, syncTargetUID, time.Now().Format(time.RFC3339)))
syncTarget, err := kcpSyncTargetClient.WorkloadV1alpha1().SyncTargets().Patch(ctx, syncTargetName, types.JSONPatchType, patchBytes, metav1.PatchOptions{}, "status")
if err != nil {
logger.Error(err, "failed to set status.lastSyncerHeartbeatTime")
return false, nil //nolint:nilerr
return false, nil
}

heartbeatTime = syncTarget.Status.LastSyncerHeartbeatTime.Time
return true, nil
})
logger.V(5).Info("Heartbeat set", "heartbeatTime", heartbeatTime)
}, heartbeatInterval)

return nil
}

type filteredGVRSource struct {
Expand Down
112 changes: 95 additions & 17 deletions test/e2e/framework/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -44,6 +45,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/yaml"

apiresourcev1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apiresource/v1alpha1"
Expand All @@ -52,6 +54,7 @@ import (
"github.com/kcp-dev/kcp/pkg/apis/third_party/conditions/util/conditions"
workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions"
workloadcliplugin "github.com/kcp-dev/kcp/pkg/cliplugins/workload/plugin"
"github.com/kcp-dev/kcp/pkg/syncer"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
Expand Down Expand Up @@ -133,13 +136,35 @@ func WithDownstreamPreparation(prepare func(config *rest.Config, isFakePCluster
func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
t.Helper()

useDeployedSyncer := len(TestConfig.PClusterKubeconfig()) > 0

artifactDir, _, err := ScratchDirs(t)
if err != nil {
t.Errorf("failed to create temp dir for syncer artifacts: %v", err)
}

downstreamConfig, downstreamKubeconfigPath, downstreamKubeClient, syncerConfig, syncerID := sf.createAndApplySyncTarget(t, useDeployedSyncer, artifactDir)

ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)

sf.startSyncer(ctx, t, useDeployedSyncer, downstreamKubeClient, syncerConfig, syncerID, artifactDir, downstreamKubeconfigPath)

startedSyncer := sf.buildStartedSyncerFixture(ctx, t, downstreamConfig, downstreamKubeClient, syncerConfig, syncerID)

// The sync target becoming ready indicates the syncer is healthy and has
// successfully sent a heartbeat to kcp.
startedSyncer.WaitForClusterReady(ctx, t)

return startedSyncer
}

func (sf *syncerFixture) createAndApplySyncTarget(t *testing.T, useDeployedSyncer bool, artifactDir string) (downstreamConfig *rest.Config, downstreamKubeconfigPath string, downstreamKubeClient kubernetesclient.Interface, syncerConfig *syncer.SyncerConfig, syncerID string) {
// Write the upstream logical cluster config to disk for the workspace plugin
upstreamRawConfig, err := sf.upstreamServer.RawConfig()
require.NoError(t, err)
_, kubeconfigPath := WriteLogicalClusterConfig(t, upstreamRawConfig, "base", sf.syncTargetPath)

useDeployedSyncer := len(TestConfig.PClusterKubeconfig()) > 0

syncerImage := TestConfig.SyncerImage()
if useDeployedSyncer {
require.NotZero(t, len(syncerImage), "--syncer-image must be specified if testing with a deployed syncer")
Expand Down Expand Up @@ -169,8 +194,6 @@ func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
}
syncerYAML := RunKcpCliPlugin(t, kubeconfigPath, pluginArgs)

var downstreamConfig *rest.Config
var downstreamKubeconfigPath string
if useDeployedSyncer {
// The syncer will target the pcluster identified by `--pcluster-kubeconfig`.
downstreamKubeconfigPath = TestConfig.PClusterKubeconfig()
Expand Down Expand Up @@ -200,11 +223,6 @@ func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
// Apply the yaml output from the plugin to the downstream server
KubectlApply(t, downstreamKubeconfigPath, syncerYAML)

artifactDir, _, err := ScratchDirs(t)
if err != nil {
t.Errorf("failed to create temp dir for syncer artifacts: %v", err)
}

// collect both in deployed and in-process mode
t.Cleanup(func() {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(wait.ForeverTestTimeout))
Expand Down Expand Up @@ -266,7 +284,6 @@ func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
// Extract the configuration for an in-process syncer from the resources that were
// applied to the downstream server. This maximizes the parity between the
// configuration of a deployed and in-process syncer.
var syncerID string
for _, doc := range strings.Split(string(syncerYAML), "\n---\n") {
var manifest struct {
metav1.ObjectMeta `json:"metadata"`
Expand All @@ -280,14 +297,15 @@ func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
}
require.NotEmpty(t, syncerID, "failed to extract syncer namespace from yaml produced by plugin:\n%s", string(syncerYAML))

syncerConfig := syncerConfigFromCluster(t, downstreamConfig, syncerID, syncerID)

ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)
syncerConfig = syncerConfigFromCluster(t, downstreamConfig, syncerID, syncerID)

downstreamKubeClient, err := kubernetesclient.NewForConfig(downstreamConfig)
downstreamKubeClient, err = kubernetesclient.NewForConfig(downstreamConfig)
require.NoError(t, err)

return
}

func (sf *syncerFixture) startSyncer(ctx context.Context, t *testing.T, useDeployedSyncer bool, downstreamKubeClient kubernetesclient.Interface, syncerConfig *syncer.SyncerConfig, syncerID, artifactDir, downstreamKubeconfigPath string) {
if useDeployedSyncer {
t.Cleanup(func() {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(wait.ForeverTestTimeout))
Expand Down Expand Up @@ -333,7 +351,7 @@ func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
}

t.Logf("Deleting syncer resources for sync target %s|%s", syncerConfig.SyncTargetPath, syncerConfig.SyncTargetName)
err = downstreamKubeClient.CoreV1().Namespaces().Delete(ctx, syncerID, metav1.DeleteOptions{})
err := downstreamKubeClient.CoreV1().Namespaces().Delete(ctx, syncerID, metav1.DeleteOptions{})
if err != nil {
t.Errorf("failed to delete Namespace %q: %v", syncerID, err)
}
Expand Down Expand Up @@ -456,7 +474,9 @@ func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
require.NoError(t, err)
}
}
}

func (sf *syncerFixture) buildStartedSyncerFixture(ctx context.Context, t *testing.T, downstreamConfig *rest.Config, downstreamKubeClient kubernetesclient.Interface, syncerConfig *syncer.SyncerConfig, syncerID string) *StartedSyncerFixture {
rawConfig, err := sf.upstreamServer.RawConfig()
require.NoError(t, err)

Expand Down Expand Up @@ -493,7 +513,7 @@ func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
upsyncerVWConfig = rest.AddUserAgent(rest.CopyConfig(upsyncerVWConfig), t.Name())
require.NoError(t, err)

startedSyncer := &StartedSyncerFixture{
return &StartedSyncerFixture{
SyncerConfig: syncerConfig,
SyncerID: syncerID,
SyncTargetClusterName: syncTargetClusterName,
Expand All @@ -503,6 +523,64 @@ func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
SyncerVirtualWorkspaceConfig: syncerVWConfig,
UpsyncerVirtualWorkspaceConfig: upsyncerVWConfig,
}
}

// SimulateStartWithoutSyncing simulates the start of a Syncer and performs what the Syncer does to bring and maitain
// the SyncTarget to a Ready status (heartbeat, and API schema compatibility).
// No resource will be without effectively synced after calling this method.
func (sf *syncerFixture) SimulateStartWithoutSyncing(t *testing.T) *StartedSyncerFixture {
t.Helper()

artifactDir, _, err := ScratchDirs(t)
if err != nil {
t.Errorf("failed to create temp dir for syncer artifacts: %v", err)
}

downstreamConfig, _, downstreamKubeClient, syncerConfig, syncerID := sf.createAndApplySyncTarget(t, false, artifactDir)

ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)

kcpBootstrapClusterClient, err := kcpclientset.NewForConfig(syncerConfig.UpstreamConfig)
require.NoError(t, err)
kcpSyncTargetClient := kcpBootstrapClusterClient.Cluster(syncerConfig.SyncTargetPath)

// Artificially consider that all the resources to sync are authorized on the physical cluster side.
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
syncTarget, err := kcpSyncTargetClient.WorkloadV1alpha1().SyncTargets().Get(ctx, syncerConfig.SyncTargetName, metav1.GetOptions{})
require.NoError(t, err)
conditions.Set(syncTarget, conditions.TrueCondition(workloadv1alpha1.SyncerAuthorized))
_, err = kcpSyncTargetClient.WorkloadV1alpha1().SyncTargets().UpdateStatus(ctx, syncTarget, metav1.UpdateOptions{})
return err
})
require.NoError(t, err)

// Import the resource schemas of the resources to sync from the physical cludster, to enable compatibility check in KCP.
resources := syncerConfig.ResourcesToSync.List()
kcpSyncTargetInformerFactory := kcpinformers.NewSharedScopedInformerFactoryWithOptions(kcpSyncTargetClient, 10*time.Hour, kcpinformers.WithTweakListOptions(
func(listOptions *metav1.ListOptions) {
listOptions.FieldSelector = fields.OneTermEqualSelector("metadata.name", syncerConfig.SyncTargetName).String()
},
))
kcpImporterInformerFactory := kcpinformers.NewSharedScopedInformerFactoryWithOptions(kcpSyncTargetClient, 10*time.Hour)
apiImporter, err := syncer.NewAPIImporter(
syncerConfig.UpstreamConfig, syncerConfig.DownstreamConfig,
kcpSyncTargetInformerFactory.Workload().V1alpha1().SyncTargets(),
kcpImporterInformerFactory.Apiresource().V1alpha1().APIResourceImports(),
resources,
syncerConfig.SyncTargetPath, syncerConfig.SyncTargetName, types.UID(syncerConfig.SyncTargetUID))
require.NoError(t, err)

kcpImporterInformerFactory.Start(ctx.Done())
kcpSyncTargetInformerFactory.Start(ctx.Done())
kcpSyncTargetInformerFactory.WaitForCacheSync(ctx.Done())

go apiImporter.Start(klog.NewContext(ctx, klog.FromContext(ctx).WithValues("resources", resources)), 5*time.Second)

// Start the heartbeat keeper to have the SyncTarget always ready during the e2e test.
syncer.StartHeartbeatKeeper(ctx, kcpSyncTargetClient, syncerConfig.SyncTargetName, syncerConfig.SyncTargetUID)

startedSyncer := sf.buildStartedSyncerFixture(ctx, t, downstreamConfig, downstreamKubeClient, syncerConfig, syncerID)

// The sync target becoming ready indicates the syncer is healthy and has
// successfully sent a heartbeat to kcp.
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/reconciler/namespace/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestNamespaceScheduler(t *testing.T) {
// controller just needs ready clusters which can be accomplished without a syncer by having the
// heartbeater update the sync target so the heartbeat controller can set the cluster ready.
syncerFixture := framework.NewSyncerFixture(t, server, server.path,
framework.WithExtraResources("services")).Start(t)
framework.WithExtraResources("services")).SimulateStartWithoutSyncing(t)
syncTargetName := syncerFixture.SyncerConfig.SyncTargetName

t.Logf("Bind to location workspace")
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/reconciler/scheduling/api_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ func TestSchedulingOnSupportedAPI(t *testing.T) {
framework.WithSyncTargetName(firstSyncTargetName),
framework.WithSyncedUserWorkspaces(userWS),
framework.WithAPIExports(""),
).Start(t)
).SimulateStartWithoutSyncing(t)

secondSyncTargetName := fmt.Sprintf("secondsynctarget-%d", +rand.Intn(1000000))
t.Logf("Creating a SyncTarget with global kubernetes APIExports and syncer in %s", locationPath)
_ = framework.NewSyncerFixture(t, source, locationPath,
framework.WithSyncTargetName(secondSyncTargetName),
framework.WithSyncedUserWorkspaces(userWS),
).Start(t)
).SimulateStartWithoutSyncing(t)

placementName := "placement-test-supportedapi"
t.Logf("Bind to location workspace")
Expand Down
Loading

0 comments on commit 8a9580d

Please sign in to comment.