Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using cache from informer for operator deleted #150

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 28 additions & 39 deletions operator/src/main/java/org/bf2/operator/operands/AdminServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,45 +117,6 @@ public void delete(ManagedKafka managedKafka, Context<ManagedKafka> context) {
}
}

private Resource<Route> adminRouteResource(ManagedKafka managedKafka) {
return openShiftClient.routes()
.inNamespace(adminServerNamespace(managedKafka))
.withName(adminServerName(managedKafka));
}

private Resource<Service> adminServiceResource(ManagedKafka managedKafka) {
return kubernetesClient.services()
.inNamespace(adminServerNamespace(managedKafka))
.withName(adminServerName(managedKafka));
}

private Resource<Deployment> adminDeploymentResource(ManagedKafka managedKafka){
return kubernetesClient.apps().deployments()
.inNamespace(adminServerNamespace(managedKafka))
.withName(adminServerName(managedKafka));
}

@Override
public boolean isDeleted(ManagedKafka managedKafka) {
Deployment deployment = adminDeploymentResource(managedKafka).get();
if (deployment != null ) {
return false;
}

Service service = adminServiceResource(managedKafka).get();
if (service != null) {
return false;
}

if (openShiftClient != null) {
Route route = adminRouteResource(managedKafka).get();
if (route != null) {
return false;
}
}
return true;
}

/* test */
protected Deployment deploymentFrom(ManagedKafka managedKafka, Deployment current) {
String adminServerName = adminServerName(managedKafka);
Expand Down Expand Up @@ -314,6 +275,16 @@ public boolean isError(ManagedKafka managedKafka) {
return false;
}

@Override
public boolean isDeleted(ManagedKafka managedKafka) {
boolean isDeleted = cachedDeployment(managedKafka) == null && cachedService(managedKafka) == null;
if (openShiftClient != null) {
isDeleted = isDeleted && cachedRoute(managedKafka) == null;
}
log.info("Admin Server isDeleted = {}", isDeleted);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you plan on going back and adjusting the log levels? Seems like the logs will fill up pretty quickly at an info level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's planned and I would love to use the same logging library across operator and sync. We should make a final decision.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same library and/or same usage of levels? Didn't @kornys have a pr at one point to switch the operator to jboss logging? Any chance we could resurrect that along with a tweak of the logging levels?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering why we should use the jboss one. Tbh I have never seen people using it. Just because it's the default for Quarkus?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's the major reason.

I'm not too concerned if we use different logging frameworks - the only issue will be making sure we know which formatting to use.

But we should try to be consistent with the levels.

return isDeleted;
}

private Deployment cachedDeployment(ManagedKafka managedKafka) {
return informerManager.getLocalDeployment(adminServerNamespace(managedKafka), adminServerName(managedKafka));
}
Expand All @@ -326,6 +297,24 @@ private Route cachedRoute(ManagedKafka managedKafka) {
return informerManager.getLocalRoute(adminServerNamespace(managedKafka), adminServerName(managedKafka));
}

private Resource<Route> adminRouteResource(ManagedKafka managedKafka) {
return openShiftClient.routes()
.inNamespace(adminServerNamespace(managedKafka))
.withName(adminServerName(managedKafka));
}

private Resource<Service> adminServiceResource(ManagedKafka managedKafka) {
return kubernetesClient.services()
.inNamespace(adminServerNamespace(managedKafka))
.withName(adminServerName(managedKafka));
}

private Resource<Deployment> adminDeploymentResource(ManagedKafka managedKafka){
return kubernetesClient.apps().deployments()
.inNamespace(adminServerNamespace(managedKafka))
.withName(adminServerName(managedKafka));
}

public String Uri(ManagedKafka managedKafka) {
Route route = cachedRoute(managedKafka);
return route != null ? route.getSpec().getHost() : null;
Expand Down
30 changes: 14 additions & 16 deletions operator/src/main/java/org/bf2/operator/operands/Canary.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,6 @@ public void delete(ManagedKafka managedKafka, Context<ManagedKafka> context) {
canaryDeploymentResource(managedKafka).delete();
}

@Override
public boolean isDeleted(ManagedKafka managedKafka) {
Deployment deployment = canaryDeploymentResource(managedKafka).get();
if (deployment != null ) {
return false;
}
return true;
}

private Resource<Deployment> canaryDeploymentResource(ManagedKafka managedKafka) {
return kubernetesClient.apps()
.deployments()
.inNamespace(canaryNamespace(managedKafka))
.withName(canaryName(managedKafka));
}

protected Deployment deploymentFrom(ManagedKafka managedKafka, Deployment current) {
String canaryName = canaryName(managedKafka);

Expand Down Expand Up @@ -167,6 +151,13 @@ public boolean isError(ManagedKafka managedKafka) {
return false;
}

@Override
public boolean isDeleted(ManagedKafka managedKafka) {
boolean isDeleted = cachedDeployment(managedKafka) == null;
log.info("Canary isDeleted = {}", isDeleted);
return isDeleted;
}

private Deployment cachedDeployment(ManagedKafka managedKafka) {
return informerManager.getLocalDeployment(canaryNamespace(managedKafka), canaryName(managedKafka));
}
Expand All @@ -178,4 +169,11 @@ public static String canaryName(ManagedKafka managedKafka) {
public static String canaryNamespace(ManagedKafka managedKafka) {
return managedKafka.getMetadata().getNamespace();
}

private Resource<Deployment> canaryDeploymentResource(ManagedKafka managedKafka) {
return kubernetesClient.apps()
.deployments()
.inNamespace(canaryNamespace(managedKafka))
.withName(canaryName(managedKafka));
}
}
123 changes: 43 additions & 80 deletions operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ public class KafkaCluster implements Operand<ManagedKafka> {
public void createOrUpdate(ManagedKafka managedKafka) {

if (isKafkaExternalCertificateEnabled) {
Secret currentTlsSecret = cachedSecret(managedKafka, kafkaTlsSecretName(managedKafka));
Secret tlsSecret = tlsSecretFrom(managedKafka, currentTlsSecret);
createOrUpdate(tlsSecret);
Secret currentKafkaTlsSecret = cachedSecret(managedKafka, kafkaTlsSecretName(managedKafka));
Secret kafkaTlsSecret = kafkaTlsSecretFrom(managedKafka, currentKafkaTlsSecret);
createOrUpdate(kafkaTlsSecret);
}

if (isKafkaAuthenticationEnabled) {
Expand All @@ -138,7 +138,7 @@ public void createOrUpdate(ManagedKafka managedKafka) {
ConfigMap zooKeeperMetricsConfigMap = configMapFrom(managedKafka, zookeeperMetricsConfigMapName(managedKafka), currentZooKeeperMetricsConfigMap);
createOrUpdate(zooKeeperMetricsConfigMap);

Kafka currentKafka = cachedDeployment(managedKafka);
Kafka currentKafka = cachedKafka(managedKafka);
Kafka kafka = kafkaFrom(managedKafka, currentKafka);
createOrUpdate(kafka);
}
Expand All @@ -147,84 +147,18 @@ public void createOrUpdate(ManagedKafka managedKafka) {
public void delete(ManagedKafka managedKafka, Context<ManagedKafka> context) {
kafkaResourceClient.delete(kafkaClusterNamespace(managedKafka), kafkaClusterName(managedKafka));

kafkaMetricsResource(managedKafka).delete();
zookeeperMetricsResource(managedKafka).delete();
configMapResource(managedKafka, kafkaMetricsConfigMapName(managedKafka)).delete();
configMapResource(managedKafka, zookeeperMetricsConfigMapName(managedKafka)).delete();

if (isKafkaExternalCertificateEnabled) {
tlsSecretResource(managedKafka).delete();
secretResource(managedKafka, kafkaTlsSecretName(managedKafka)).delete();
}
if (isKafkaAuthenticationEnabled) {
ssoSecretResource(managedKafka).delete();
ssoTlsSecretResource(managedKafka).delete();
secretResource(managedKafka, ssoClientSecretName(managedKafka)).delete();
secretResource(managedKafka, ssoTlsSecretName(managedKafka)).delete();
}
}

@Override
public boolean isDeleted(ManagedKafka managedKafka) {
Kafka kafka = kafkaResourceClient.getByName(kafkaClusterNamespace(managedKafka), kafkaClusterName(managedKafka));
if (kafka != null) {
return false;
}

ConfigMap configMap = kafkaMetricsResource(managedKafka).get();
if (configMap != null) {
return false;
}

configMap = zookeeperMetricsResource(managedKafka).get();
if (configMap != null) {
return false;
}

if (isKafkaExternalCertificateEnabled) {
Secret secret = tlsSecretResource(managedKafka).get();
if (secret != null ) {
return false;
}
}
if (isKafkaAuthenticationEnabled) {
Secret secret = ssoSecretResource(managedKafka).get();
if (secret != null) {
return false;
}
secret = ssoTlsSecretResource(managedKafka).get();
if (secret != null) {
return false;
}
}
return true;
}

private Resource<Secret> ssoTlsSecretResource(ManagedKafka managedKafka) {
return kubernetesClient.secrets()
.inNamespace(kafkaClusterNamespace(managedKafka))
.withName(ssoTlsSecretName(managedKafka));
}

private Resource<Secret> ssoSecretResource(ManagedKafka managedKafka) {
return kubernetesClient.secrets()
.inNamespace(kafkaClusterNamespace(managedKafka))
.withName(ssoClientSecretName(managedKafka));
}

private Resource<Secret> tlsSecretResource(ManagedKafka managedKafka) {
return kubernetesClient.secrets()
.inNamespace(kafkaClusterNamespace(managedKafka))
.withName(kafkaTlsSecretName(managedKafka));
}

private Resource<ConfigMap> zookeeperMetricsResource(ManagedKafka managedKafka) {
return kubernetesClient.configMaps()
.inNamespace(kafkaClusterNamespace(managedKafka))
.withName(zookeeperMetricsConfigMapName(managedKafka));
}

private Resource<ConfigMap> kafkaMetricsResource(ManagedKafka managedKafka) {
return kubernetesClient.configMaps()
.inNamespace(kafkaClusterNamespace(managedKafka))
.withName(kafkaMetricsConfigMapName(managedKafka));
}

private void createOrUpdate(Kafka kafka) {
// Kafka resource doesn't exist, has to be created
if (kafkaResourceClient.getByName(kafka.getMetadata().getNamespace(), kafka.getMetadata().getName()) == null) {
Expand Down Expand Up @@ -335,7 +269,7 @@ protected ConfigMap configMapFrom(ManagedKafka managedKafka, String name, Confi
}

/* test */
protected Secret tlsSecretFrom(ManagedKafka managedKafka, Secret current) {
protected Secret kafkaTlsSecretFrom(ManagedKafka managedKafka, Secret current) {

SecretBuilder builder = current != null ? new SecretBuilder(current) : new SecretBuilder();

Expand Down Expand Up @@ -651,7 +585,7 @@ private Map<String, String> getKafkaLabels() {

@Override
public boolean isInstalling(ManagedKafka managedKafka) {
Kafka kafka = cachedDeployment(managedKafka);
Kafka kafka = cachedKafka(managedKafka);
boolean isInstalling = kafka == null || kafka.getStatus() == null ||
(kafkaCondition(kafka).getType().equals("NotReady")
&& kafkaCondition(kafka).getStatus().equals("True")
Expand All @@ -662,7 +596,7 @@ && kafkaCondition(kafka).getStatus().equals("True")

@Override
public boolean isReady(ManagedKafka managedKafka) {
Kafka kafka = cachedDeployment(managedKafka);
Kafka kafka = cachedKafka(managedKafka);
boolean isReady = kafka != null && (kafka.getStatus() == null ||
(kafkaCondition(kafka).getType().equals("Ready") && kafkaCondition(kafka).getStatus().equals("True")));
log.info("KafkaCluster isReady = {}", isReady);
Expand All @@ -671,7 +605,7 @@ public boolean isReady(ManagedKafka managedKafka) {

@Override
public boolean isError(ManagedKafka managedKafka) {
Kafka kafka = cachedDeployment(managedKafka);
Kafka kafka = cachedKafka(managedKafka);
boolean isError = kafka != null && kafka.getStatus() != null
&& kafkaCondition(kafka).getType().equals("NotReady")
&& kafkaCondition(kafka).getStatus().equals("True")
Expand All @@ -680,11 +614,28 @@ && kafkaCondition(kafka).getStatus().equals("True")
return isError;
}

@Override
public boolean isDeleted(ManagedKafka managedKafka) {
boolean isDeleted = cachedKafka(managedKafka) == null &&
cachedConfigMap(managedKafka, kafkaMetricsConfigMapName(managedKafka)) == null &&
cachedConfigMap(managedKafka, zookeeperMetricsConfigMapName(managedKafka)) == null;

if (isKafkaExternalCertificateEnabled) {
isDeleted = isDeleted && cachedSecret(managedKafka, kafkaTlsSecretName(managedKafka)) == null;
}
if (isKafkaAuthenticationEnabled) {
isDeleted = isDeleted && cachedSecret(managedKafka, ssoClientSecretName(managedKafka)) == null &&
cachedSecret(managedKafka, ssoTlsSecretName(managedKafka)) == null;
}
log.info("KafkaCluster isDeleted = {}", isDeleted);
return isDeleted;
}

private Condition kafkaCondition(Kafka kafka) {
return kafka.getStatus().getConditions().get(0);
}

private Kafka cachedDeployment(ManagedKafka managedKafka) {
private Kafka cachedKafka(ManagedKafka managedKafka) {
return informerManager.getLocalKafka(kafkaClusterNamespace(managedKafka), kafkaClusterName(managedKafka));
}

Expand All @@ -696,6 +647,18 @@ private Secret cachedSecret(ManagedKafka managedKafka, String name) {
return informerManager.getLocalSecret(kafkaClusterNamespace(managedKafka), name);
}

private Resource<Secret> secretResource(ManagedKafka managedKafka, String name) {
return kubernetesClient.secrets()
.inNamespace(kafkaClusterNamespace(managedKafka))
.withName(name);
}

private Resource<ConfigMap> configMapResource(ManagedKafka managedKafka, String name) {
return kubernetesClient.configMaps()
.inNamespace(kafkaClusterNamespace(managedKafka))
.withName(name);
}

public static String kafkaClusterName(ManagedKafka managedKafka) {
return managedKafka.getMetadata().getName();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public interface Operand<T extends CustomResource> {
/**
*
* @param customResource custom resource
* @return if resource is deleted
* @return if the operand instance is deleted
*/
boolean isDeleted(T customResource);
}