Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Logstash support #171

Merged
merged 6 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions internal/diag.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func Run(params Params) error {
"common.k8s.elastic.co/type=beat", // 1.2.0
"common.k8s.elastic.co/type=agent", // 1.4.0
"common.k8s.elastic.co/type=maps", // 1.6.0
"common.k8s.elastic.co/type=logstash", // 2.8.0
}

selectors := make([]labels.Selector, len(operatorLabels))
Expand Down Expand Up @@ -207,6 +208,11 @@ LOOP:
"elasticmapsserver",
}))
}
if maxOperatorVersion.AtLeast(version.MustParseSemantic("2.8.0")) {
zipFile.Add(getResources(kubectl.GetByName, ns, namespaceFilters, []string{
"logstash",
}))
}

zipFile.Add(map[string]func(io.Writer) error{
archive.Path(ns, "secrets.json"): func(writer io.Writer) error {
Expand Down
2 changes: 1 addition & 1 deletion internal/extraction/extraction.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type RemoteSource struct {
func (j *RemoteSource) sourceDirPrefix() string {
prefix := "api-diagnostics"
switch j.Typ {
case "kibana":
case "kibana", "logstash":
prefix = fmt.Sprintf("%s-%s", j.Typ, prefix)
case "agent":
prefix = ""
Expand Down
2 changes: 1 addition & 1 deletion internal/filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

var (
// ValidTypes are the valid types of Elastic resources that are supported by the filtering system.
ValidTypes = []string{"agent", "apm", "beat", "elasticsearch", "enterprisesearch", "kibana", "maps"}
ValidTypes = []string{"agent", "apm", "beat", "elasticsearch", "enterprisesearch", "kibana", "maps", "logstash"}
Empty = Filters{}
)

Expand Down
131 changes: 93 additions & 38 deletions internal/stackdiag.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ const (
// names used to identify different stack diagnostic job types (need to match the names of the corresponding CRDs)
elasticsearchJob = "elasticsearch"
kibanaJob = "kibana"
logstashJob = "logstash"
)

var (
// supportedStackDiagTypes is the list of stack apps supported by elastic/support-diagnostics
supportedStackDiagTypes = []string{elasticsearchJob, kibanaJob, logstashJob}

//go:embed job.tpl.yml
jobTemplate string
// jobPollingInterval is used to configure the informer used to be notified of Pod status changes.
Expand Down Expand Up @@ -136,15 +140,15 @@ func (ds *diagJobState) scheduleJob(typ, esName, resourceName string, tls bool)
return err
}

diagnosticType, shortType := diagnosticTypeForApplication(typ)
diagnosticType, svcSuffix := diagnosticTypeForApplication(typ)

buffer := new(bytes.Buffer)
err = tpl.Execute(buffer, map[string]interface{}{
"PodName": podName,
"DiagnosticImage": ds.diagnosticImage,
"Namespace": ds.ns,
"ESName": esName,
"SVCName": fmt.Sprintf("%s-%s-http", resourceName, shortType),
"SVCName": fmt.Sprintf("%s-%s", resourceName, svcSuffix),
"Type": diagnosticType,
"TLS": tls,
"OutputDir": podOutputDir,
Expand Down Expand Up @@ -198,14 +202,16 @@ func (ds *diagJobState) scheduleJob(typ, esName, resourceName string, tls bool)
return nil
}

// diagnosticTypeForApplication returns the diagnosticType as expected by the stack diagnostics tool and a short type
// matching the shorthand used by ECK in service names for the given application type.
// diagnosticTypeForApplication returns the diagnosticType as expected by the stack diagnostics tool and the suffix
// used by ECK in service names for the given application type.
func diagnosticTypeForApplication(typ string) (string, string) {
switch typ {
case elasticsearchJob:
return "api", "es"
return "api", "es-http"
case kibanaJob:
return "kibana-api", "kb"
return "kibana-api", "kb-http"
case logstashJob:
return "logstash-api", "ls-api"
}
panic("programming error: unknown type")
}
Expand Down Expand Up @@ -358,13 +364,11 @@ func (ds *diagJobState) detectImageErrors(pod *corev1.Pod) error {
func runStackDiagnostics(k *Kubectl, ns string, zipFile *archive.ZipFile, verbose bool, image string, jobTimeout time.Duration, stopCh chan struct{}, filters internal_filters.Filters) {
state := newDiagJobState(k, ns, verbose, image, jobTimeout, stopCh)

if err := scheduleJobs(k, ns, zipFile.AddError, state, elasticsearchJob, filters); err != nil {
zipFile.AddError(err)
return
}
if err := scheduleJobs(k, ns, zipFile.AddError, state, kibanaJob, filters); err != nil {
zipFile.AddError(err)
return
for _, typ := range supportedStackDiagTypes {
if err := scheduleJobs(k, ns, zipFile.AddError, state, typ, filters); err != nil {
zipFile.AddError(err)
return
}
}
// don't start extracting if there is nothing to do
if len(state.jobs) == 0 {
Expand All @@ -379,44 +383,95 @@ func scheduleJobs(k *Kubectl, ns string, recordErr func(error), state *diagJobSt
if err != nil {
return err // not recoverable
}
return resources.Visit(func(info *resource.Info, err error) error {
return resources.Visit(func(ressourceInfo *resource.Info, err error) error {
if err != nil {
// record error but continue trying for other resources
recordErr(err)
}

resourceName := info.Name
es, err := runtime.DefaultUnstructuredConverter.ToUnstructured(info.Object)
isTLS, esName, err := extractEsInfo(typ, ns, ressourceInfo, filters)
if err != nil {
recordErr(err)
return nil
}
disabled, found, err := unstructured.NestedBool(es, "spec", "http", "tls", "selfSignedCertificate", "disabled")
if esName != "" {
logger.Print("scheduleJob", "typ ", typ, "es ", esName)
recordErr(state.scheduleJob(typ, esName, ressourceInfo.Name, isTLS))
}
return nil
})
}

func extractEsInfo(typ string, ns string, resourceInfo *resource.Info, filters internal_filters.Filters) (bool, string, error) {
resourceName := resourceInfo.Name

if !filters.Empty() && !filters.Contains(resourceName, typ) {
return false, "", nil
}

es, err := runtime.DefaultUnstructuredConverter.ToUnstructured(resourceInfo.Object)
if err != nil {
return false, "", err
}

var isTLS bool
switch typ {
case logstashJob:
// Logstash API SSL is not yet configurable via spec.http.tls, try to read the config as a best-effort
// To change after https://github.com/elastic/cloud-on-k8s/issues/6971 is fixed.
enabled, found, err := unstructured.NestedBool(es, "spec", "config", "api.ssl.enabled")
if err != nil {
recordErr(err)
return nil
return false, "", err
}
tls := !(found && disabled)
isTLS = found && enabled

if !filters.Empty() && !filters.Contains(resourceName, typ) {
return nil
default:
disabled, found, err := unstructured.NestedBool(es, "spec", "http", "tls", "selfSignedCertificate", "disabled")
if err != nil {
return false, "", err
}
isTLS = !(found && disabled)
}

esName := resourceName
if typ != "elasticsearch" {
val, found, err := unstructured.NestedString(es, "spec", "elasticsearchRef", "name")
if err != nil {
recordErr(err)
return nil
}
if !found || val == "" {
logger.Printf("Skipping %s/%s as it it not using elasticsearchRef", ns, resourceName)
return nil
}
esName = val
var esName string
switch typ {
case elasticsearchJob:
esName = resourceName
case kibanaJob:
name, found, err := unstructured.NestedString(es, "spec", "elasticsearchRef", "name")
if err != nil {
return false, "", err
}
if !found || name == "" {
logger.Printf("Skipping %s/%s as elasticsearchRef is not defined", ns, resourceName)
return false, "", nil
}
esName = name
case logstashJob:
esRefs, found, err := unstructured.NestedSlice(es, "spec", "elasticsearchRefs")
if err != nil {
return false, "", err
}
if !found || len(esRefs) == 0 {
logger.Printf("Skipping %s/%s as elasticsearchRefs is not defined", ns, resourceName)
return false, "", nil
}
esRef, ok := esRefs[0].(map[string]interface{})
if !ok {
logger.Printf("Skipping %s/%s as elasticsearchRefs[0] is invalid", ns, resourceName)
return false, "", nil
}
name, found, err := unstructured.NestedString(esRef, "name")
if err != nil {
return false, "", err
}
if !found || name == "" {
logger.Printf("Skipping %s/%s as name is not set in elasticsearchRefs[0]", ns, resourceName)
return false, "", nil
}
esName = name
thbkrkr marked this conversation as resolved.
Show resolved Hide resolved
default:
panic("unknown type while extracting es info")
}

recordErr(state.scheduleJob(typ, esName, resourceName, tls))
return nil
})
return isTLS, esName, nil
}