Skip to content

Commit

Permalink
Fix fabric8io#3228: Add support for Dynamic informers in KubernetesCl…
Browse files Browse the repository at this point in the history
…ient

Introduce a new method in SharedInformerFactory which will only accept
CustomResourceDefinitionContext, types would automatically be assumed
to be GenericKubernetesResource and GenericKubernetesResourceList.

Till now we were deciding whether a resource is namespaced or not
depending upon whether it implements Namespaced interface or not. But
having dynamic informers required change to consider `scope` field in
CustomResourceDefinitionContext as well, hence I introduced a new class
SharedInformerOperationsImpl which overrides `isResourceNamespaced()` so
that it can be configured from calling methods.
  • Loading branch information
rohanKanojia committed Jun 10, 2021
1 parent fb5e638 commit a6e3cdc
Show file tree
Hide file tree
Showing 11 changed files with 363 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
* Add DSL support for `user.openshift.io/v1` Identity in OpenShiftClient DSL
* Add DSL support for OpenShift Whereabouts CNI Model `whereabouts.cni.cncf.io` to OpenShiftClient DSL
* Add DSL support for OpenShift Kube Storage Version Migrator resources in OpenShiftClient DSL
* Fix #3228: Add support for Dynamic informers for custom resources in KubernetesClient

#### _**Note**_: Breaking changes in the API
##### DSL Changes:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation;
import io.fabric8.kubernetes.client.dsl.base.OperationContext;

public class SharedInformerOperationsImpl<T extends HasMetadata, L extends KubernetesResourceList<T>, R extends Resource<T>> extends HasMetadataOperation<T, L, R> {
private final boolean resourceNamespaced;

public SharedInformerOperationsImpl(OperationContext context, boolean resourceNamespaced) {
super(context);
this.resourceNamespaced = resourceNamespaced;
}

@Override
public boolean isResourceNamespaced() {
return resourceNamespaced;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ public boolean isStatusSubresource() {
return statusSubresource;
}

public boolean isNamespaceScoped() {
if (scope != null) {
return scope.equals("Namespaced");
}
return false;
}

@SuppressWarnings("rawtypes")
public static CustomResourceDefinitionBuilder v1beta1CRDFromCustomResourceType(Class<? extends CustomResource> customResource) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,28 @@

import static io.fabric8.kubernetes.client.utils.KubernetesResourceUtil.inferListType;

import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import io.fabric8.kubernetes.api.model.GenericKubernetesResourceList;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.ListOptions;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.Handlers;
import io.fabric8.kubernetes.client.ResourceHandler;
import io.fabric8.kubernetes.client.SharedInformerOperationsImpl;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.base.BaseOperation;
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
import io.fabric8.kubernetes.client.dsl.base.OperationContext;
import io.fabric8.kubernetes.client.informers.impl.DefaultSharedIndexInformer;
import io.fabric8.kubernetes.client.utils.ApiVersionUtil;
import io.fabric8.kubernetes.client.utils.Utils;
import io.fabric8.kubernetes.internal.KubernetesDeserializer;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -57,8 +65,6 @@ public class SharedInformerFactory extends BaseOperation {

private final ExecutorService informerExecutor;

private final BaseOperation baseOperation;

private final ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners = new ConcurrentLinkedQueue<>();

private boolean allowShutdown = true;
Expand All @@ -82,7 +88,6 @@ public SharedInformerFactory(ExecutorService threadPool, OkHttpClient okHttpClie
super(new OperationContext().withOkhttpClient(okHttpClient).withConfig(configuration));
initOperationContext(configuration);
this.informerExecutor = threadPool;
this.baseOperation = this.newInstance(context);
this.namespace = null;
}

Expand Down Expand Up @@ -122,7 +127,7 @@ public SharedInformerFactory withName(String name) {
* @return the shared index informer
*/
public synchronized <T extends HasMetadata> SharedIndexInformer<T> sharedIndexInformerFor(Class<T> apiTypeClass, long resyncPeriodInMillis) {
return sharedIndexInformerFor(apiTypeClass, inferListType(apiTypeClass), null, resyncPeriodInMillis);
return sharedIndexInformerFor(apiTypeClass, inferListType(apiTypeClass), null, resyncPeriodInMillis, Utils.isResourceNamespaced(apiTypeClass));
}

/**
Expand All @@ -137,7 +142,7 @@ public synchronized <T extends HasMetadata> SharedIndexInformer<T> sharedIndexIn
* @return the shared index informer
*/
public synchronized <T extends HasMetadata> SharedIndexInformer<T> sharedIndexInformerFor(Class<T> apiTypeClass, OperationContext operationContext, long resyncPeriodInMillis) {
return sharedIndexInformerFor(apiTypeClass, inferListType(apiTypeClass), operationContext, resyncPeriodInMillis);
return sharedIndexInformerFor(apiTypeClass, inferListType(apiTypeClass), operationContext, resyncPeriodInMillis, Utils.isResourceNamespaced(apiTypeClass));
}

/**
Expand Down Expand Up @@ -165,7 +170,28 @@ public synchronized <T extends HasMetadata> SharedIndexInformer<T> sharedIndexIn
.withApiGroupName(customResourceContext.getGroup())
.withApiGroupVersion(customResourceContext.getVersion())
.withPlural(customResourceContext.getPlural())
.withIsNamespaceConfiguredFromGlobalConfig(context.isNamespaceFromGlobalConfig()), resyncPeriodInMillis);
.withIsNamespaceConfiguredFromGlobalConfig(context.isNamespaceFromGlobalConfig()), resyncPeriodInMillis, Utils.isResourceNamespaced(apiTypeClass));
}

/**
* Constructs and returns a shared index informer with resync period specified for a Custom Resource. You
* can use it for scenarios where you don't have a POJO for your custom type by specifying group, version and plural in
* {@link CustomResourceDefinitionContext}
*
* <b>Note:</b>It watches for events in <b>ALL NAMESPACES</b>.
*
* @param genericResourceContext object containing details about resource like apiGroup, version and plural, etc.
* @param resyncPeriodInMillis resync period in milliseconds.
* @return {@link SharedIndexInformer} for GenericKubernetesResource
*/
public synchronized SharedIndexInformer<GenericKubernetesResource> sharedIndexInformerForCustomResource(CustomResourceDefinitionContext genericResourceContext, long resyncPeriodInMillis) {
validateCustomResourceProvided(genericResourceContext);
return sharedIndexInformerFor(GenericKubernetesResource.class, GenericKubernetesResourceList.class, new OperationContext()
.withApiGroupName(genericResourceContext.getGroup())
.withApiGroupVersion(genericResourceContext.getVersion())
.withPlural(genericResourceContext.getPlural())
.withNamespace(this.namespace)
.withIsNamespaceConfiguredFromGlobalConfig(context.isNamespaceFromGlobalConfig()), resyncPeriodInMillis, genericResourceContext.isNamespaceScoped());
}

/**
Expand All @@ -178,7 +204,7 @@ public synchronized <T extends HasMetadata> SharedIndexInformer<T> sharedIndexIn
*/
public synchronized <T extends CustomResource<?,?>> SharedIndexInformer<T> sharedIndexInformerForCustomResource(
Class<T> apiTypeClass, OperationContext operationContext, long resyncPeriodInMillis) {
return sharedIndexInformerFor(apiTypeClass, inferListType(apiTypeClass), operationContext, resyncPeriodInMillis);
return sharedIndexInformerFor(apiTypeClass, inferListType(apiTypeClass), operationContext, resyncPeriodInMillis, Utils.isResourceNamespaced(apiTypeClass));
}

/**
Expand Down Expand Up @@ -208,7 +234,7 @@ public synchronized <T extends CustomResource<?,?>> SharedIndexInformer<T> share
* @return the shared index informer
*/
public synchronized <T extends CustomResource<?,?>, L extends KubernetesResourceList<T>> SharedIndexInformer<T> sharedIndexInformerForCustomResource(Class<T> apiTypeClass, Class<L> apiListTypeClass, long resyncPeriodInMillis) {
return sharedIndexInformerFor(apiTypeClass, apiListTypeClass, null, resyncPeriodInMillis);
return sharedIndexInformerFor(apiTypeClass, apiListTypeClass, null, resyncPeriodInMillis, Utils.isResourceNamespaced(apiTypeClass));
}

/**
Expand All @@ -224,8 +250,8 @@ public synchronized <T extends CustomResource<?,?>, L extends KubernetesResource
* @param <L> the type's list parameter (should extend {@link io.fabric8.kubernetes.api.model.KubernetesResourceList}
* @return the shared index informer
*/
private synchronized <T extends HasMetadata, L extends KubernetesResourceList<T>> SharedIndexInformer<T> sharedIndexInformerFor(Class<T> apiTypeClass, Class<L> apiListTypeClass, OperationContext operationContext, long resyncPeriodInMillis) {
ListerWatcher<T, L> listerWatcher = listerWatcherFor(apiTypeClass, apiListTypeClass);
private synchronized <T extends HasMetadata, L extends KubernetesResourceList<T>> SharedIndexInformer<T> sharedIndexInformerFor(Class<T> apiTypeClass, Class<L> apiListTypeClass, OperationContext operationContext, long resyncPeriodInMillis, boolean isResourceNamespaced) {
ListerWatcher<T, L> listerWatcher = listerWatcherFor(apiTypeClass, apiListTypeClass, isResourceNamespaced);
OperationContext context = this.context.withApiGroupName(HasMetadata.getGroup(apiTypeClass))
.withApiGroupVersion(HasMetadata.getVersion(apiTypeClass))
.withPlural(HasMetadata.getPlural(apiTypeClass))
Expand All @@ -248,19 +274,19 @@ private synchronized <T extends HasMetadata, L extends KubernetesResourceList<T>
return informer;
}

private <T extends HasMetadata, L extends KubernetesResourceList<T>> ListerWatcher<T, L> listerWatcherFor(Class<T> apiTypeClass, Class<L> apiListTypeClass) {
private <T extends HasMetadata, L extends KubernetesResourceList<T>> ListerWatcher<T, L> listerWatcherFor(Class<T> apiTypeClass, Class<L> apiListTypeClass, boolean isResourceNamespaced) {

return new ListerWatcher<T, L>() {
@Override
public L list(ListOptions params, String namespace, OperationContext context) {
BaseOperation<T, L, ?> listBaseOperation = getConfiguredBaseOperation(namespace, context, apiTypeClass, apiListTypeClass);
BaseOperation<T, L, ?> listBaseOperation = getConfiguredBaseOperation(namespace, context, apiTypeClass, apiListTypeClass, isResourceNamespaced);
registerKindToKubernetesDeserializer(apiTypeClass);
return listBaseOperation.list();
}

@Override
public Watch watch(ListOptions params, String namespace, OperationContext context, Watcher<T> resourceWatcher) {
BaseOperation<T, L, ?> watchBaseOperation = getConfiguredBaseOperation(namespace, context, apiTypeClass, apiListTypeClass);
BaseOperation<T, L, ?> watchBaseOperation = getConfiguredBaseOperation(namespace, context, apiTypeClass, apiListTypeClass, isResourceNamespaced);
registerKindToKubernetesDeserializer(apiTypeClass);
return watchBaseOperation.watch(params.getResourceVersion(), resourceWatcher);
}
Expand Down Expand Up @@ -372,23 +398,23 @@ private static <T> boolean isKeyOfType(String key, Class<T> apiTypeClass) {
return key.contains(plural);
}

private <T extends HasMetadata, L extends KubernetesResourceList<T>> BaseOperation<T, L, ?> getConfiguredBaseOperation(String namespace, OperationContext context, Class<T> apiTypeClass, Class<L> apiListTypeClass) {
BaseOperation<T, L, ?> baseOperationWithContext;
private <T extends HasMetadata, L extends KubernetesResourceList<T>> BaseOperation<T, L, ?> getConfiguredBaseOperation(String namespace, OperationContext context, Class<T> apiTypeClass, Class<L> apiListTypeClass, boolean isNamespacedScoped) {
SharedInformerOperationsImpl<T, L, Resource<T>> sharedInformerOperations;
// Avoid adding Namespace if it's picked from Global Configuration
if (context.isNamespaceFromGlobalConfig()) {
// SharedInformer default behavior is to watch in all namespaces
// unless we specify namespace explicitly in OperationContext
baseOperationWithContext = baseOperation.newInstance(context
sharedInformerOperations = new SharedInformerOperationsImpl<>(context
.withConfig(new ConfigBuilder(config)
.withNamespace(null)
.build())
.withNamespace(null));
.withNamespace(null), isNamespacedScoped);
} else {
baseOperationWithContext = baseOperation.newInstance(context.withNamespace(namespace));
sharedInformerOperations = new SharedInformerOperationsImpl<>(context.withNamespace(namespace), isNamespacedScoped);
}
baseOperationWithContext.setType(apiTypeClass);
baseOperationWithContext.setListType(apiListTypeClass);
return baseOperationWithContext;
sharedInformerOperations.setType(apiTypeClass);
sharedInformerOperations.setListType(apiListTypeClass);
return sharedInformerOperations;
}

private void initOperationContext(Config configuration) {
Expand All @@ -402,4 +428,15 @@ private <T extends HasMetadata> void registerKindToKubernetesDeserializer(Class<
KubernetesDeserializer.registerCustomKind(HasMetadata.getApiVersion(apiTypeClass), apiTypeClass.getSimpleName(), apiTypeClass);
}
}

private void validateCustomResourceProvided(CustomResourceDefinitionContext genericResourceContext) {
if (Utils.isNullOrEmpty(genericResourceContext.getKind())) {
throw new IllegalArgumentException("CustomResourceDefinitionContext.kind: required value.");
}
ResourceHandler<HasMetadata, ?> resourceHandler = Handlers.get(genericResourceContext.getKind(), ApiVersionUtil.joinApiGroupAndVersion(genericResourceContext.getGroup(), genericResourceContext.getVersion()));

if (resourceHandler != null) {
throw new IllegalArgumentException("Using sharedIndexInformerDynamicResource for core type. Please use sharedIndexInformerFor(Class<T>, long) instead.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,18 @@ public static String trimGroupOrNull(String apiVersion) {
}
return null;
}

/**
* Join group and version strings to form apiVersion key in Kubernetes objects
*
* @param group ApiGroup for resource
* @param version ApiVersion for resource
* @return version if group is null or empty, joined string otherwise.
*/
public static String joinApiGroupAndVersion(String group, String version) {
if (Utils.isNullOrEmpty(group)) {
return version;
}
return group + "/" + version;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,15 @@ void fromCrdV1beta1OldVersionStyle() {
.hasFieldOrPropertyWithValue("plural", "foobars")
.hasFieldOrPropertyWithValue("kind", "Foobar");
}

@Test
void isNamespaceScoped() {
// Given
CustomResourceDefinitionContext crdc1 = new CustomResourceDefinitionContext.Builder().withScope("Namespaced").build();
CustomResourceDefinitionContext crdc2 = new CustomResourceDefinitionContext.Builder().withScope("Cluster").build();

// When + Then
assertThat(crdc1.isNamespaceScoped()).isTrue();
assertThat(crdc2.isNamespaceScoped()).isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
import io.fabric8.kubernetes.client.dsl.base.OperationContext;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.Kind;
Expand All @@ -33,6 +34,8 @@

import static io.fabric8.kubernetes.client.informers.SharedInformerFactory.getInformerKey;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

class SharedInformerFactoryTest {
private OkHttpClient mockClient;
Expand Down Expand Up @@ -98,6 +101,44 @@ void testInformersCreatedWithSameNameButDifferentCRDContext() {
.hasSize(2);
}

@Test
void testSharedIndexInformerForCustomResourceNoType() {
// Given
SharedInformerFactory sharedInformerFactory = new SharedInformerFactory(executorService, mockClient, config);
CustomResourceDefinitionContext context = new CustomResourceDefinitionContext.Builder()
.withKind("Dummy")
.withScope("Namespaced")
.withVersion("v1")
.withGroup("demos.fabric8.io")
.withPlural("dummies")
.build();

// When
sharedInformerFactory.inNamespace("ns1").sharedIndexInformerForCustomResource(context, 10 * 1000L);

// Then
assertThat(sharedInformerFactory.getInformers())
.hasSize(1)
.containsKey("demos.fabric8.io/v1/dummies/ns1");
}

@Test
void testSharedIndexInformerForCustomResourceThrowsIllegalArgumentExceptionOnCoreType() {
// Given
SharedInformerFactory sharedInformerFactory = new SharedInformerFactory(executorService, mockClient, config);
CustomResourceDefinitionContext context = new CustomResourceDefinitionContext.Builder()
.withKind("Service")
.withScope("Namespaced")
.withVersion("v1")
.withGroup("")
.withPlural("services")
.build();

// When + Then
IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> sharedInformerFactory.sharedIndexInformerForCustomResource(context, 10 * 1000L));
assertEquals("Using sharedIndexInformerDynamicResource for core type. Please use sharedIndexInformerFor(Class<T>, long) instead.", exception.getMessage());
}

@Test
void testGetExistingSharedIndexInformer() {
// Given
Expand Down
Loading

0 comments on commit a6e3cdc

Please sign in to comment.