Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix fluentdconfig and syslogngconfig deletion #1672

Merged
merged 9 commits into from
Feb 28, 2024
6 changes: 6 additions & 0 deletions charts/logging-operator/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ rules:
- get
- patch
- update
- apiGroups:
- logging.banzaicloud.io
resources:
- loggings/finalizers
verbs:
- update
- apiGroups:
- logging.banzaicloud.io
resources:
Expand Down
6 changes: 6 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ rules:
- get
- patch
- update
- apiGroups:
- logging.banzaicloud.io
resources:
- loggings/finalizers
verbs:
- update
- apiGroups:
- logging.banzaicloud.io
resources:
Expand Down
18 changes: 18 additions & 0 deletions config/samples/fluentdconfig.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: v1
kind: Namespace
metadata:
name: logging
---
apiVersion: logging.banzaicloud.io/v1beta1
kind: Logging
metadata:
name: fluentd-config
spec:
controlNamespace: logging
---
apiVersion: logging.banzaicloud.io/v1beta1
kind: FluentdConfig
metadata:
name: fluentd-config
namespace: logging
spec: {}
97 changes: 84 additions & 13 deletions controllers/logging/logging_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import (
"github.com/prometheus/client_golang/prometheus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -52,23 +54,26 @@ import (
)

// NewLoggingReconciler returns a new LoggingReconciler instance
func NewLoggingReconciler(client client.Client, log logr.Logger) *LoggingReconciler {
func NewLoggingReconciler(client client.Client, eventRecorder record.EventRecorder, log logr.Logger) *LoggingReconciler {
return &LoggingReconciler{
Client: client,
Log: log,
Client: client,
EventRecorder: eventRecorder,
Log: log,
}
}

// LoggingReconciler reconciles a Logging object
type LoggingReconciler struct {
client.Client
Log logr.Logger
EventRecorder record.EventRecorder
Log logr.Logger
}

// +kubebuilder:rbac:groups=logging.banzaicloud.io,resources=loggings;fluentbitagents;flows;clusterflows;outputs;clusteroutputs;nodeagents;fluentdconfigs;syslogngconfigs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=logging.banzaicloud.io,resources=loggings/status;fluentbitagents/status;flows/status;clusterflows/status;outputs/status;clusteroutputs/status;nodeagents/status;fluentdconfigs/status;syslogngconfigs/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=logging.banzaicloud.io,resources=syslogngflows;syslogngclusterflows;syslogngoutputs;syslogngclusteroutputs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=logging.banzaicloud.io,resources=syslogngflows/status;syslogngclusterflows/status;syslogngoutputs/status;syslogngclusteroutputs/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=logging.banzaicloud.io,resources=loggings/finalizers,verbs=update
// +kubebuilder:rbac:groups="",resources=configmaps;secrets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=extensions;apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=extensions;networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -127,7 +132,8 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return reconcile.Result{}, errors.WrapIfWithDetails(err, "failed to get logging resources", "logging", logging)
}

r.dynamicDefaults(ctx, log, loggingResources.GetSyslogNGSpec())
_, syslogNGSPec := loggingResources.GetSyslogNGSpec()
r.dynamicDefaults(ctx, log, syslogNGSPec)

// metrics
defer func() {
Expand Down Expand Up @@ -176,7 +182,7 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct

var loggingDataProvider loggingdataprovider.LoggingDataProvider

fluentdSpec := loggingResources.GetFluentdSpec()
fluentdExternal, fluentdSpec := loggingResources.GetFluentd()
if fluentdSpec != nil {
fluentdConfig, secretList, err := r.clusterConfigurationFluentd(loggingResources)
if err != nil {
Expand All @@ -187,12 +193,12 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
} else {
log.V(1).Info("flow configuration", "config", fluentdConfig)

reconcilers = append(reconcilers, fluentd.New(r.Client, r.Log, &logging, fluentdSpec, &fluentdConfig, secretList, reconcilerOpts).Reconcile)
reconcilers = append(reconcilers, fluentd.New(r.Client, r.Log, &logging, fluentdSpec, fluentdExternal, &fluentdConfig, secretList, reconcilerOpts).Reconcile)
}
loggingDataProvider = fluentd.NewDataProvider(r.Client, &logging, fluentdSpec)
loggingDataProvider = fluentd.NewDataProvider(r.Client, &logging, fluentdSpec, fluentdExternal)
}

syslogNGSpec := loggingResources.GetSyslogNGSpec()
syslogNGExternal, syslogNGSpec := loggingResources.GetSyslogNGSpec()
if syslogNGSpec != nil {
syslogNGConfig, secretList, err := r.clusterConfigurationSyslogNG(loggingResources)
if err != nil {
Expand All @@ -203,9 +209,9 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
} else {
log.V(1).Info("flow configuration", "config", syslogNGConfig)

reconcilers = append(reconcilers, syslogng.New(r.Client, r.Log, &logging, syslogNGSpec, syslogNGConfig, secretList, reconcilerOpts).Reconcile)
reconcilers = append(reconcilers, syslogng.New(r.Client, r.Log, &logging, syslogNGSpec, syslogNGExternal, syslogNGConfig, secretList, reconcilerOpts).Reconcile)
}
loggingDataProvider = syslogng.NewDataProvider(r.Client, &logging)
loggingDataProvider = syslogng.NewDataProvider(r.Client, &logging, syslogNGExternal)
}

switch len(loggingResources.Fluentbits) {
Expand Down Expand Up @@ -260,7 +266,7 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
log.Error(errors.New("nodeagent definition conflict"), problem)
}
}
reconcilers = append(reconcilers, nodeagent.New(r.Client, r.Log, &logging, fluentdSpec, agents, reconcilerOpts, fluentd.NewDataProvider(r.Client, &logging, fluentdSpec)).Reconcile)
reconcilers = append(reconcilers, nodeagent.New(r.Client, r.Log, &logging, fluentdSpec, agents, reconcilerOpts, fluentd.NewDataProvider(r.Client, &logging, fluentdSpec, fluentdExternal)).Reconcile)
}

for _, rec := range reconcilers {
Expand All @@ -274,9 +280,73 @@ func (r *LoggingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}
}

if shouldReturn, err := r.fluentdConfigFinalizer(ctx, &logging, fluentdExternal); shouldReturn || err != nil {
return ctrl.Result{}, err
}

if shouldReturn, err := r.syslogNGConfigFinalizer(ctx, &logging, syslogNGExternal); shouldReturn || err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

func (r *LoggingReconciler) fluentdConfigFinalizer(ctx context.Context, logging *loggingv1beta1.Logging, externalFluentd *loggingv1beta1.FluentdConfig) (bool, error) {
fluentdConfigFinalizer := "fluentdconfig.logging.banzaicloud.io/finalizer"

if logging.DeletionTimestamp.IsZero() {
if externalFluentd != nil && !controllerutil.ContainsFinalizer(logging, fluentdConfigFinalizer) {
r.Log.Info("adding fluentdconfig finalizer")
controllerutil.AddFinalizer(logging, fluentdConfigFinalizer)
if err := r.Update(ctx, logging); err != nil {
return true, err
}
}
} else if externalFluentd != nil {
msg := fmt.Sprintf("refused to delete logging resource while fluentdConfig %s exists", client.ObjectKeyFromObject(externalFluentd))
r.EventRecorder.Event(logging, corev1.EventTypeWarning, "DeletionRefused", msg)
return false, errors.New(msg)
}

if controllerutil.ContainsFinalizer(logging, fluentdConfigFinalizer) && externalFluentd == nil {
r.Log.Info("removing fluentdconfig finalizer")
controllerutil.RemoveFinalizer(logging, fluentdConfigFinalizer)
if err := r.Update(ctx, logging); err != nil {
OverOrion marked this conversation as resolved.
Show resolved Hide resolved
return true, err
}
}

return false, nil
}

func (r *LoggingReconciler) syslogNGConfigFinalizer(ctx context.Context, logging *loggingv1beta1.Logging, externalSyslogNG *loggingv1beta1.SyslogNGConfig) (bool, error) {
syslogNGConfigFinalizer := "syslogngconfig.logging.banzaicloud.io/finalizer"

if logging.DeletionTimestamp.IsZero() {
if externalSyslogNG != nil && !controllerutil.ContainsFinalizer(logging, syslogNGConfigFinalizer) {
r.Log.Info("adding syslogngconfig finalizer")
controllerutil.AddFinalizer(logging, syslogNGConfigFinalizer)
if err := r.Update(ctx, logging); err != nil {
return true, err
}
}
} else if externalSyslogNG != nil {
msg := fmt.Sprintf("refused to delete logging resource while syslogNGConfig %s exists", client.ObjectKeyFromObject(externalSyslogNG))
r.EventRecorder.Event(logging, corev1.EventTypeWarning, "DeletionRefused", msg)
return false, errors.New(msg)
}

if controllerutil.ContainsFinalizer(logging, syslogNGConfigFinalizer) && externalSyslogNG == nil {
r.Log.Info("removing syslogngconfig finalizer")
controllerutil.RemoveFinalizer(logging, syslogNGConfigFinalizer)
if err := r.Update(ctx, logging); err != nil {
return true, err
}
}

return false, nil
}

func (r *LoggingReconciler) dynamicDefaults(ctx context.Context, log logr.Logger, syslogNGSpec *v1beta1.SyslogNGSpec) {
nodes := corev1.NodeList{}
if err := r.Client.List(ctx, &nodes); err != nil {
Expand Down Expand Up @@ -371,6 +441,7 @@ func (r *LoggingReconciler) clusterConfigurationSyslogNG(resources model.Logging
Path: syslogng.OutputSecretPath,
}

_, syslogngSpec := resources.GetSyslogNGSpec()
in := syslogngconfig.Input{
Name: resources.Logging.Name,
Namespace: resources.Logging.Namespace,
Expand All @@ -380,7 +451,7 @@ func (r *LoggingReconciler) clusterConfigurationSyslogNG(resources model.Logging
Flows: resources.SyslogNG.Flows,
SecretLoaderFactory: &slf,
SourcePort: syslogng.ServicePort,
SyslogNGSpec: resources.GetSyslogNGSpec(),
SyslogNGSpec: syslogngSpec,
}
var b strings.Builder
if err := syslogngconfig.RenderConfigInto(in, &b); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions controllers/logging/logging_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

controllers "github.com/kube-logging/logging-operator/controllers/logging"
"github.com/kube-logging/logging-operator/pkg/resources/fluentd"
"github.com/kube-logging/logging-operator/pkg/resources/model"
"github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1"
"github.com/kube-logging/logging-operator/pkg/sdk/logging/model/output"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)

var (
Expand Down Expand Up @@ -1410,7 +1411,7 @@ func beforeEachWithError(t *testing.T, errors chan<- error) func() {
})
g.Expect(err).NotTo(gomega.HaveOccurred())

flowReconciler := controllers.NewLoggingReconciler(mgr.GetClient(), ctrl.Log.WithName("controllers").WithName("Flow"))
flowReconciler := controllers.NewLoggingReconciler(mgr.GetClient(), mgr.GetEventRecorderFor("logging-operator"), ctrl.Log.WithName("controllers").WithName("Flow"))

var stopped bool
wrappedReconciler := duplicateRequest(t, flowReconciler, &stopped, errors)
Expand Down
7 changes: 2 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,7 @@ func main() {
os.Exit(1)
}

if !PSPEnabled(mgr.GetConfig()) {
setupLog.Info("WARNING PodSecurityPolicies are disabled. Can be enabled manually with PSP_ENABLED=1")
}

loggingReconciler := loggingControllers.NewLoggingReconciler(mgr.GetClient(), ctrl.Log.WithName("logging"))
loggingReconciler := loggingControllers.NewLoggingReconciler(mgr.GetClient(), mgr.GetEventRecorderFor("logging-operator"), ctrl.Log.WithName("logging"))

if err := (&extensionsControllers.EventTailerReconciler{
Client: mgr.GetClient(),
Expand Down Expand Up @@ -224,6 +220,7 @@ func main() {

// +kubebuilder:scaffold:builder
setupLog.Info("starting manager")

if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
Expand Down
5 changes: 3 additions & 2 deletions pkg/resources/fluentbit/configsecret.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er
return nil, nil, errs
}

fluentdSpec := loggingResources.GetFluentdSpec()
_, fluentdSpec := loggingResources.GetFluentd()

if fluentdSpec != nil {
fluentbitTargetHost := r.fluentbitSpec.TargetHost
Expand Down Expand Up @@ -361,7 +361,8 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er
}
}

if loggingResources.GetSyslogNGSpec() != nil {
_, syslogNGSPec := loggingResources.GetSyslogNGSpec()
if syslogNGSPec != nil {
input.SyslogNGOutput = newSyslogNGOutputConfig()
input.SyslogNGOutput.Host = aggregatorEndpoint(r.Logging, syslogng.ServiceName)
input.SyslogNGOutput.Port = syslogng.ServicePort
Expand Down
5 changes: 3 additions & 2 deletions pkg/resources/fluentbit/tenants.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func (r *Reconciler) configureOutputsForTenants(ctx context.Context, tenants []v
continue
}

if loggingResources.GetFluentdSpec() != nil {
_, fluentdSpec := loggingResources.GetFluentd()
if fluentdSpec != nil {
if input.FluentForwardOutput == nil {
input.FluentForwardOutput = &fluentForwardOutputConfig{}
}
Expand All @@ -117,7 +118,7 @@ func (r *Reconciler) configureOutputsForTenants(ctx context.Context, tenants []v
Host: aggregatorEndpoint(logging, fluentd.ServiceName),
Port: fluentd.ServicePort,
})
} else if loggingResources.GetSyslogNGSpec() != nil {
} else if _, syslogNGSPec := loggingResources.GetSyslogNGSpec(); syslogNGSPec != nil {
if input.SyslogNGOutput == nil {
input.SyslogNGOutput = newSyslogNGOutputConfig()
}
Expand Down
18 changes: 10 additions & 8 deletions pkg/resources/fluentd/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,25 @@ import (
)

type DataProvider struct {
client client.Client
logging *v1beta1.Logging
fluentdSpec *v1beta1.FluentdSpec
client client.Client
logging *v1beta1.Logging
fluentdSpec *v1beta1.FluentdSpec
fluentdConfig *v1beta1.FluentdConfig
}

func NewDataProvider(client client.Client, logging *v1beta1.Logging, fluentdSpec *v1beta1.FluentdSpec) *DataProvider {
func NewDataProvider(client client.Client, logging *v1beta1.Logging, fluentdSpec *v1beta1.FluentdSpec, fluentdConfig *v1beta1.FluentdConfig) *DataProvider {
return &DataProvider{
client: client,
logging: logging,
fluentdSpec: fluentdSpec,
client: client,
logging: logging,
fluentdSpec: fluentdSpec,
fluentdConfig: fluentdConfig,
}
}

func (p *DataProvider) GetReplicaCount(ctx context.Context) (*int32, error) {
if p.fluentdSpec != nil {
sts := &v1.StatefulSet{}
om := p.logging.FluentdObjectMeta(StatefulSetName, ComponentFluentd, *p.fluentdSpec)
om := p.logging.FluentdObjectMeta(StatefulSetName, ComponentFluentd, *p.fluentdSpec, p.fluentdConfig)
err := p.client.Get(ctx, types.NamespacedName{Namespace: om.Namespace, Name: om.Name}, sts)
if err != nil {
return nil, errors.WrapIf(client.IgnoreNotFound(err), "getting fluentd statefulset")
Expand Down
10 changes: 6 additions & 4 deletions pkg/resources/fluentd/fluentd.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ const (

// Reconciler holds info what resource to reconcile
type Reconciler struct {
Logging *v1beta1.Logging
fluentdSpec *v1beta1.FluentdSpec
Logging *v1beta1.Logging
fluentdSpec *v1beta1.FluentdSpec
fluentdConfig *v1beta1.FluentdConfig
*reconciler.GenericResourceReconciler
config *string
secrets *secret.MountSecrets
Expand Down Expand Up @@ -112,10 +113,11 @@ func (r *Reconciler) getServiceAccount() string {
}

func New(client client.Client, log logr.Logger,
logging *v1beta1.Logging, fluentdSpec *v1beta1.FluentdSpec, config *string, secrets *secret.MountSecrets, opts reconciler.ReconcilerOpts) *Reconciler {
logging *v1beta1.Logging, fluentdSpec *v1beta1.FluentdSpec, fluentdConfig *v1beta1.FluentdConfig, config *string, secrets *secret.MountSecrets, opts reconciler.ReconcilerOpts) *Reconciler {
return &Reconciler{
Logging: logging,
fluentdSpec: fluentdSpec,
fluentdConfig: fluentdConfig,
GenericResourceReconciler: reconciler.NewGenericReconciler(client, log, opts),
config: config,
secrets: secrets,
Expand Down Expand Up @@ -318,7 +320,7 @@ func (r *Reconciler) reconcileDrain(ctx context.Context) (*reconcile.Result, err
}
}

replicaCount, err := NewDataProvider(r.Client, r.Logging, r.fluentdSpec).GetReplicaCount(ctx)
replicaCount, err := NewDataProvider(r.Client, r.Logging, r.fluentdSpec, r.fluentdConfig).GetReplicaCount(ctx)
if err != nil {
return nil, errors.WrapIf(err, "get replica count for fluentd")
}
Expand Down
Loading
Loading