Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

isolate input sources using fixed hash tag and backpressure #1688

Merged
merged 1 commit into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ spec:
loggingRef: infra
inputTail:
storage.type: filesystem
forwardOptions:
Workers: 0
syslogng_output:
Workers: 0
positiondb:
hostPath:
path: ""
Expand All @@ -63,6 +59,9 @@ spec:
path: ""
network:
connectTimeout: 2
metrics: {}
image:
tag: 2.1.8-debug
---
apiVersion: logging.banzaicloud.io/v1beta1
kind: LoggingRoute
Expand Down
55 changes: 55 additions & 0 deletions docs/fluentbit-flow-control.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
## Flow control with durability in a multi tenant setup

Resources:
- https://docs.fluentbit.io/manual/administration/backpressure
- https://docs.fluentbit.io/manual/administration/buffering-and-storage
- https://docs.fluentbit.io/manual/pipeline/inputs/tail#sqlite-and-write-ahead-logging
- https://docs.fluentbit.io/manual/administration/monitoring
- https://docs.fluentbit.io/manual/administration/troubleshooting#dump-internals-signal

### Context

Let's consider we have multiple separate inputs, each sending data to their respective dedicated outputs (using tenant ids in the tags).

### Durability

According to the referenced resources we need `storage.type filesystem` for *every input*
where we want to avoid losing data. If we just enable this option, there will be no limit
on how many data fluent-bit should keep on disk.

> Note: we also have to configure the position db to avoid fluent-bit
> reading the same files from the beginning after a restart

### Memory limit

The limit that is applied by default is `storage.max_chunks_up 128` on the *service* which is a global limit.
But this only means, that even if fluent-bit writes all chunks to disk, there is a limit on how many
chunks it can read up and handle in memory at the same time.
Without any further configuration fluent-bit will write chunks to disk indefinitely and this setting will only
affect the overall throughput.

### Disk usage limit

In case we want to limit the actual disk usage we need to set `storage.total_limit_size` for
every *output* individually. This sounds good, but the problem with this option is that it doesn't
cause any backpressure, rather just starts to discard the oldest data, which obviously results in data loss,
so this option should be used with care.

### Backpressure

Backpressure can be enabled using `storage.pause_on_chunks_overlimit on` on the *input* which is great, but one important
caveat again: the limit this setting considers as the trigger event is `storage.max_chunks_up` which is a global limit.

Going back to our main scenario, when one of the outputs is down (tenant is down), chunks for that output start to pile up
on disk and in memory. When there are more than `storage.max_chunks_up` chunks in memory globally, fluent-bit pauses inputs that
tries to load additional chunks. It's not clear how fluent-bit decides which output should be paused, but based on our
observations (using `config/samples/multitenant-routing` for example) this works as expected as only the input that belongs
to the faulty output is paused and when the output gets back online, the input is resumed immediately.

Also based on fluent-bit's metrics, if an output is permanently down, the chunks that are waiting for that output to be sent
are not kept in memory, so other input/output pairs are not limited by the throughput.

In case we configure `storage.pause_on_chunks_overlimit` in the inputs we can make sure the disk usage is bounded.

As long as pods are not restarting, the backpressure can prevent log loss, but keep in mind, that since the input is paused,
data in log files that gets deleted by the container runtime during the output's downtime will get lost.
55 changes: 30 additions & 25 deletions pkg/resources/fluentbit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,14 @@ var fluentBitConfigTemplate = `
{{- end }}
{{- end }}

[INPUT]
Name tail
{{- range $key, $value := .Input.Values }}
{{- if $value }}
{{ $key }} {{$value}}
{{- end }}
{{- end }}
{{- range $id, $v := .Input.ParserN }}
{{- if $v }}
Parse_{{ $id}} {{$v}}
{{- end }}
{{- end }}
{{- if .Input.MultilineParser }}
multiline.parser {{- range $i, $v := .Input.MultilineParser }}{{ if $i }},{{ end}} {{ $v }}{{ end }}
{{- end }}
{{- if .Inputs }}
{{- range $input := .Inputs }}
# Tenant: {{ $input.Tenant }}
{{- template "input" $input }}
{{- end }}
{{- else }}
{{- template "input" .Input }}
{{- end }}

{{- if not .DisableKubernetesFilter }}
[FILTER]
Expand Down Expand Up @@ -111,11 +104,7 @@ var fluentBitConfigTemplate = `
{{- range $target := $out.Targets }}
[OUTPUT]
Name forward
{{- if $target.AllNamespaces }}
Match *
{{- else }}
Match_Regex {{ $target.NamespaceRegex }}
{{- end }}
Match {{ $target.Match }}
{{- if $out.Upstream.Enabled }}
Upstream {{ $out.Upstream.Config.Path }}
{{- else }}
Expand Down Expand Up @@ -149,11 +138,7 @@ var fluentBitConfigTemplate = `
{{- range $target := $out.Targets }}
[OUTPUT]
Name tcp
{{- if $target.AllNamespaces }}
Match *
{{- else }}
Match_Regex {{ $target.NamespaceRegex }}
{{- end }}
Match {{ $target.Match }}
Host {{ $target.Host }}
Port {{ $target.Port }}
Format json_lines
Expand Down Expand Up @@ -203,6 +188,26 @@ var fluentbitNetworkTemplate = `
{{- end }}
`

var fluentbitInputTemplate = `
{{- define "input" }}
[INPUT]
Name tail
{{- range $key, $value := .Values }}
{{- if $value }}
{{ $key }} {{$value}}
{{- end }}
{{- end }}
{{- range $id, $v := .ParserN }}
{{- if $v }}
Parse_{{ $id}} {{$v}}
{{- end }}
{{- end }}
{{- if .MultilineParser }}
multiline.parser {{- range $i, $v := .MultilineParser }}{{ if $i }},{{ end}} {{ $v }}{{ end }}
{{- end }}
{{- end }}
`

var upstreamConfigTemplate = `
[UPSTREAM]
Name {{ .Config.Name }}
Expand Down
29 changes: 22 additions & 7 deletions pkg/resources/fluentbit/configsecret.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ type fluentbitInputConfig struct {
MultilineParser []string
}

type fluentbitInputConfigWithTenant struct {
Tenant string
Values map[string]string
ParserN []string
MultilineParser []string
}

type upstreamNode struct {
Name string
Host string
Expand All @@ -63,6 +70,7 @@ type fluentBitConfig struct {
CoroStackSize int32
Output map[string]string
Input fluentbitInputConfig
Inputs []fluentbitInputConfigWithTenant
DisableKubernetesFilter bool
KubernetesFilter map[string]string
AwsFilter map[string]string
Expand All @@ -86,8 +94,8 @@ type fluentForwardOutputConfig struct {
}

type forwardTargetConfig struct {
AllNamespaces bool
NamespaceRegex string
Match string
Host string
Port int32
}
Expand Down Expand Up @@ -373,22 +381,25 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er
for _, a := range loggingResources.LoggingRoutes {
tenants = append(tenants, a.Status.Tenants...)
}
if err := r.configureInputsForTenants(tenants, &input); err != nil {
return nil, nil, errors.WrapIf(err, "configuring inputs for target tenants")
}
if err := r.configureOutputsForTenants(ctx, tenants, &input); err != nil {
return nil, nil, errors.WrapIf(err, "configuring outputs for target tenants")
}
} else {
// compatibility with existing configuration
if input.FluentForwardOutput != nil {
input.FluentForwardOutput.Targets = append(input.FluentForwardOutput.Targets, forwardTargetConfig{
AllNamespaces: true,
Host: input.FluentForwardOutput.TargetHost,
Port: input.FluentForwardOutput.TargetPort,
Match: "*",
Host: input.FluentForwardOutput.TargetHost,
Port: input.FluentForwardOutput.TargetPort,
})
} else if input.SyslogNGOutput != nil {
input.SyslogNGOutput.Targets = append(input.SyslogNGOutput.Targets, forwardTargetConfig{
AllNamespaces: true,
Host: input.SyslogNGOutput.Host,
Port: input.SyslogNGOutput.Port,
Match: "*",
Host: input.SyslogNGOutput.Host,
Port: input.SyslogNGOutput.Port,
})
}
}
Expand Down Expand Up @@ -455,6 +466,10 @@ func generateConfig(input fluentBitConfig) (string, error) {
if err != nil {
return "", errors.WrapIf(err, "parsing fluentbit network nested template")
}
tmpl, err = tmpl.Parse(fluentbitInputTemplate)
if err != nil {
return "", errors.WrapIf(err, "parsing fluentbit input nested template")
}
err = tmpl.Execute(output, input)
if err != nil {
return "", errors.WrapIf(err, "executing fluentbit config template")
Expand Down
78 changes: 53 additions & 25 deletions pkg/resources/fluentbit/tenants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package fluentbit

import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"sort"
"strings"

"emperror.dev/errors"
"golang.org/x/exp/slices"
"golang.org/x/exp/maps"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -71,31 +73,17 @@ func FindTenants(ctx context.Context, target metav1.LabelSelector, reader client
}
}

sort.Slice(tenants, func(i, j int) bool {
sort.SliceStable(tenants, func(i, j int) bool {
return tenants[i].Name < tenants[j].Name
})
// Make sure our tenant list is stable
slices.SortStableFunc(tenants, func(a, b Tenant) int {
if a.Name < b.Name {
return -1
}
if a.Name == b.Name {
return 0
}
return 1
})

return tenants, nil
}

func (r *Reconciler) configureOutputsForTenants(ctx context.Context, tenants []v1beta1.Tenant, input *fluentBitConfig) error {
var errs error
for _, t := range tenants {
allNamespaces := len(t.Namespaces) == 0
namespaceRegex := `.`
if !allNamespaces {
namespaceRegex = fmt.Sprintf("^[^_]+_(%s)_", strings.Join(t.Namespaces, "|"))
}
match := fmt.Sprintf("kubernetes.%s.*", hashFromTenantName(t.Name))
logging := &v1beta1.Logging{}
if err := r.resourceReconciler.Client.Get(ctx, types.NamespacedName{Name: t.Name}, logging); err != nil {
return errors.WrapIf(err, "getting logging resource")
Expand All @@ -113,24 +101,64 @@ func (r *Reconciler) configureOutputsForTenants(ctx context.Context, tenants []v
input.FluentForwardOutput = &fluentForwardOutputConfig{}
}
input.FluentForwardOutput.Targets = append(input.FluentForwardOutput.Targets, forwardTargetConfig{
AllNamespaces: allNamespaces,
NamespaceRegex: namespaceRegex,
Host: aggregatorEndpoint(logging, fluentd.ServiceName),
Port: fluentd.ServicePort,
Match: match,
Host: aggregatorEndpoint(logging, fluentd.ServiceName),
Port: fluentd.ServicePort,
})
} else if _, syslogNGSPec := loggingResources.GetSyslogNGSpec(); syslogNGSPec != nil {
if input.SyslogNGOutput == nil {
input.SyslogNGOutput = newSyslogNGOutputConfig()
}
input.SyslogNGOutput.Targets = append(input.SyslogNGOutput.Targets, forwardTargetConfig{
AllNamespaces: allNamespaces,
NamespaceRegex: namespaceRegex,
Host: aggregatorEndpoint(logging, syslogng.ServiceName),
Port: syslogng.ServicePort,
Match: match,
Host: aggregatorEndpoint(logging, syslogng.ServiceName),
Port: syslogng.ServicePort,
})
} else {
errs = errors.Append(errs, errors.Errorf("logging %s does not provide any aggregator configured", t.Name))
}
}
return errs
}

func (r *Reconciler) configureInputsForTenants(tenants []v1beta1.Tenant, input *fluentBitConfig) error {
var errs error
for _, t := range tenants {
allNamespaces := len(t.Namespaces) == 0
tenantValues := maps.Clone(input.Input.Values)
if !allNamespaces {
var paths []string
for _, n := range t.Namespaces {
paths = append(paths, fmt.Sprintf("/var/log/containers/*_%s_*.log", n))
}
tenantValues["Path"] = strings.Join(paths, ",")
} else {
tenantValues["Path"] = "/var/log/containers/*.log"
}

tenantValues["DB"] = fmt.Sprintf("/tail-db/tail-containers-state-%s.db", t.Name)
tenantValues["Tag"] = fmt.Sprintf("kubernetes.%s.*", hashFromTenantName(t.Name))
// This helps to make sure we apply backpressure on the input, see https://docs.fluentbit.io/manual/administration/backpressure
tenantValues["storage.pause_on_chunks_overlimit"] = "on"
input.Inputs = append(input.Inputs, fluentbitInputConfigWithTenant{
Tenant: t.Name,
Values: tenantValues,
ParserN: input.Input.ParserN,
MultilineParser: input.Input.MultilineParser,
})
}
// the regex will work only if we cut the prefix off. fluentbit doesn't care about the content, just the length
input.KubernetesFilter["Kube_Tag_Prefix"] = `kubernetes.0000000000.var.log.containers.`
return errs
}

func hashFromTenantName(input string) string {
hasher := sha256.New()
hasher.Write([]byte(input))
hashBytes := hasher.Sum(nil)

// Convert the hash to a hex string
hashString := hex.EncodeToString(hashBytes)

return hashString[0:10]
}
Loading