Skip to content

Commit

Permalink
Merge branch 'main' into default-odigos-attributes-kratos-profile
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirdavid1 authored Oct 20, 2024
2 parents bd91f94 + 2985c5c commit 3e0d9cb
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 67 deletions.
44 changes: 44 additions & 0 deletions autoscaler/controllers/common/deployedcondition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package common

import (
"encoding/json"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func GetCollectorsGroupDeployedConditionsPatch(err error) string {

status := metav1.ConditionTrue
if err != nil {
status = metav1.ConditionFalse
}

message := "Gateway collector is deployed in the cluster"
if err != nil {
message = err.Error()
}

reason := "GatewayDeployedCreatedSuccessfully"
if err != nil {
// in the future, we can be more specific and break it down to
// more detailed reasons about what exactly failed
reason = "GatewayDeployedCreationFailed"
}

patch := map[string]interface{}{
"status": map[string]interface{}{
"conditions": []metav1.Condition{{
Type: "Deployed",
Status: status,
Reason: reason,
Message: message,
LastTransitionTime: metav1.NewTime(time.Now()),
}},
},
}

patchData, _ := json.Marshal(patch)
// marshal error is ignored as it is not expected to happen
return string(patchData)
}
14 changes: 13 additions & 1 deletion autoscaler/controllers/datacollection/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -56,17 +57,28 @@ func (dm *DelayManager) RunSyncDaemonSetWithDelayAndSkipNewCalls(delay time.Dura

// Finish the function execution after the delay
time.AfterFunc(delay, func() {
var err error
logger := log.FromContext(ctx)

dm.mu.Lock()
defer dm.mu.Unlock()
defer dm.finishProgress()
var err error
defer func() {
statusPatchString := common.GetCollectorsGroupDeployedConditionsPatch(err)
statusErr := c.Status().Patch(ctx, collection, client.RawPatch(types.MergePatchType, []byte(statusPatchString)))
if statusErr != nil {
logger.Error(statusErr, "Failed to patch collectors group status")
// just log the error, do not fail the reconciliation
}
}()

for i := 0; i < retries; i++ {
_, err = syncDaemonSet(ctx, dests, collection, c, scheme, secrets, version)
if err == nil {
return
}
}

log.FromContext(ctx).Error(err, "Failed to sync DaemonSet")
})
}
Expand Down
45 changes: 3 additions & 42 deletions autoscaler/controllers/gateway/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,15 @@ package gateway

import (
"context"
"encoding/json"
"time"

appsv1 "k8s.io/api/apps/v1"

odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
commonconf "github.com/odigos-io/odigos/autoscaler/controllers/common"
"github.com/odigos-io/odigos/common"
odigoscommon "github.com/odigos-io/odigos/common"
k8sconsts "github.com/odigos-io/odigos/k8sutils/pkg/consts"
"github.com/odigos-io/odigos/k8sutils/pkg/env"
"github.com/odigos-io/odigos/k8sutils/pkg/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -26,42 +23,6 @@ var (
}
)

func getCollectorsGroupDeployedConditionsPatch(err error) string {

status := metav1.ConditionTrue
if err != nil {
status = metav1.ConditionFalse
}

message := "Gateway collector is deployed in the cluster"
if err != nil {
message = err.Error()
}

reason := "GatewayDeployedCreatedSuccessfully"
if err != nil {
// in the future, we can be more specific and break it down to
// more detailed reasons about what exactly failed
reason = "GatewayDeployedCreationFailed"
}

patch := map[string]interface{}{
"status": map[string]interface{}{
"conditions": []metav1.Condition{{
Type: "Deployed",
Status: status,
Reason: reason,
Message: message,
LastTransitionTime: metav1.NewTime(time.Now()),
}},
},
}

patchData, _ := json.Marshal(patch)
// marshal error is ignored as it is not expected to happen
return string(patchData)
}

func Sync(ctx context.Context, k8sClient client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string) error {
logger := log.FromContext(ctx)

Expand Down Expand Up @@ -95,7 +56,7 @@ func Sync(ctx context.Context, k8sClient client.Client, scheme *runtime.Scheme,
}

err = syncGateway(&dests, &processors, &gatewayCollectorGroup, ctx, k8sClient, scheme, imagePullSecrets, odigosVersion, &odigosConfig)
statusPatchString := getCollectorsGroupDeployedConditionsPatch(err)
statusPatchString := commonconf.GetCollectorsGroupDeployedConditionsPatch(err)
statusErr := k8sClient.Status().Patch(ctx, &gatewayCollectorGroup, client.RawPatch(types.MergePatchType, []byte(statusPatchString)))
if statusErr != nil {
logger.Error(statusErr, "Failed to patch collectors group status")
Expand All @@ -106,7 +67,7 @@ func Sync(ctx context.Context, k8sClient client.Client, scheme *runtime.Scheme,

func syncGateway(dests *odigosv1.DestinationList, processors *odigosv1.ProcessorList,
gateway *odigosv1.CollectorsGroup, ctx context.Context,
c client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string, odigosConfig *common.OdigosConfiguration) error {
c client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string, odigosConfig *odigoscommon.OdigosConfiguration) error {
logger := log.FromContext(ctx)
logger.V(0).Info("Syncing gateway")

Expand Down
14 changes: 7 additions & 7 deletions destinations/data/qryn.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ kind: Destination
metadata:
type: qryn-oss
displayName: qryn
category: self-hosted
category: self hosted
spec:
image: qryn.svg
signals:
Expand All @@ -14,6 +14,12 @@ spec:
logs:
supported: true
fields:
- name: QRYN_OSS_URL
displayName: API Url
componentType: input
componentProps:
type: text
required: true
- name: QRYN_OSS_PASSWORD
displayName: Basic auth password
componentType: input
Expand All @@ -25,12 +31,6 @@ spec:
componentType: input
componentProps:
type: text
- name: QRYN_OSS_URL
displayName: API Url
componentType: input
componentProps:
type: text
required: true
- name: QRYN_OSS_RESOURCE_TO_TELEMETRY_CONVERSION
displayName: Convert container attributes to labels
componentType: dropdown
Expand Down
3 changes: 2 additions & 1 deletion docs/mint.json
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@
"backends/dynatrace",
"backends/elasticsearch",
"backends/gcs",
"backends/gigapipe",
"backends/googlecloud",
"backends/grafanacloudloki",
"backends/grafanacloudprometheus",
Expand Down Expand Up @@ -288,7 +289,7 @@
}
],
"footerSocials": {
"X": "https://twitter.com/odigosio",
"x": "https://twitter.com/odigosio",
"github": "https://github.com/odigos-io/odigos",
"slack": "https://odigos.slack.com/join/shared_invite/zt-1d7egaz29-Rwv2T8kyzc3mWP8qKobz~A#/shared-invite/email"
}
Expand Down
115 changes: 99 additions & 16 deletions k8sutils/pkg/describe/odigos.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ type clusterCollectorResources struct {
LatestRevisionPods *corev1.PodList
}

type nodeCollectorResources struct {
CollectorsGroup *odigosv1.CollectorsGroup
DaemonSet *appsv1.DaemonSet
}

type odigosResources struct {
ClusterCollector clusterCollectorResources
NodeCollector nodeCollectorResources
Destinations *odigosv1.DestinationList
InstrumentationConfigs *odigosv1.InstrumentationConfigList
}

func getClusterCollectorResources(ctx context.Context, kubeClient kubernetes.Interface, odigosClient odigosclientset.OdigosV1alpha1Interface, odigosNs string) (clusterCollector clusterCollectorResources, err error) {

clusterCollector = clusterCollectorResources{}
Expand Down Expand Up @@ -77,19 +89,41 @@ func getClusterCollectorResources(ctx context.Context, kubeClient kubernetes.Int
return
}

func getRelevantOdigosResources(ctx context.Context, kubeClient kubernetes.Interface, odigosClient odigosclientset.OdigosV1alpha1Interface, odigosNs string) (clusterCollector clusterCollectorResources, destinations *odigosv1.DestinationList, instrumentationConfigs *odigosv1.InstrumentationConfigList, err error) {
func getNodeCollectorResources(ctx context.Context, kubeClient kubernetes.Interface, odigosClient odigosclientset.OdigosV1alpha1Interface, odigosNs string) (nodeCollector nodeCollectorResources, err error) {

nodeCollector = nodeCollectorResources{}

nodeCollector.CollectorsGroup, err = odigosClient.CollectorsGroups(odigosNs).Get(ctx, consts.OdigosNodeCollectorCollectorGroupName, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return
}

nodeCollector.DaemonSet, err = kubeClient.AppsV1().DaemonSets(odigosNs).Get(ctx, consts.OdigosNodeCollectorDaemonSetName, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return
}

return
}

func getRelevantOdigosResources(ctx context.Context, kubeClient kubernetes.Interface, odigosClient odigosclientset.OdigosV1alpha1Interface, odigosNs string) (odigos odigosResources, err error) {

odigos.ClusterCollector, err = getClusterCollectorResources(ctx, kubeClient, odigosClient, odigosNs)
if err != nil {
return
}

clusterCollector, err = getClusterCollectorResources(ctx, kubeClient, odigosClient, odigosNs)
odigos.NodeCollector, err = getNodeCollectorResources(ctx, kubeClient, odigosClient, odigosNs)
if err != nil {
return
}

destinations, err = odigosClient.Destinations(odigosNs).List(ctx, metav1.ListOptions{})
odigos.Destinations, err = odigosClient.Destinations(odigosNs).List(ctx, metav1.ListOptions{})
if err != nil {
return
}

instrumentationConfigs, err = odigosClient.InstrumentationConfigs("").List(ctx, metav1.ListOptions{})
odigos.InstrumentationConfigs, err = odigosClient.InstrumentationConfigs("").List(ctx, metav1.ListOptions{})
if err != nil {
return
}
Expand All @@ -109,8 +143,8 @@ func printOdigosPipelineStatus(numInstrumentationConfigs, numDestinations int, e
}
}

func printClusterGatewayStatus(clusterCollector clusterCollectorResources, expectingPipeline bool, sb *strings.Builder) {
describeText(sb, 1, "Cluster Gateway:")
func printClusterCollectorStatus(clusterCollector clusterCollectorResources, expectingPipeline bool, sb *strings.Builder) {
describeText(sb, 1, "Cluster Collector:")
if clusterCollector.CollectorsGroup == nil {
describeText(sb, 2, wrapTextSuccessOfFailure("Collectors Group Not Created", !expectingPipeline))
return
Expand All @@ -129,13 +163,16 @@ func printClusterGatewayStatus(clusterCollector clusterCollectorResources, expec
describeText(sb, 2, wrapTextInRed("Deployed: Status Unavailable"))
} else {
if deployedCondition.Status == metav1.ConditionTrue {
describeText(sb, 2, wrapTextInGreen("Deployed: True"))
describeText(sb, 2, wrapTextInGreen("Deployed: true"))
} else {
describeText(sb, 2, wrapTextInRed("Deployed: False"))
describeText(sb, 2, wrapTextInRed("Deployed: false"))
describeText(sb, 2, wrapTextInRed(fmt.Sprintf("Reason: %s", deployedCondition.Message)))
}
}

ready := clusterCollector.CollectorsGroup.Status.Ready
describeText(sb, 2, wrapTextSuccessOfFailure(fmt.Sprintf("Ready: %t", ready), ready))

if clusterCollector.LatestRevisionPods == nil || clusterCollector.Deployment == nil {
describeText(sb, 2, wrapTextInRed("Number of Replicas: Status Unavailable"))
} else {
Expand Down Expand Up @@ -173,23 +210,69 @@ func printClusterGatewayStatus(clusterCollector clusterCollectorResources, expec
}
}

func printOdigosPipeline(clusterCollector clusterCollectorResources, destinations *odigosv1.DestinationList, instrumentationConfigs *odigosv1.InstrumentationConfigList, sb *strings.Builder) {
func printNodeCollectorStatus(nodeCollector nodeCollectorResources, expectingNodeCollector bool, sb *strings.Builder) {
describeText(sb, 1, "Node Collector:")
if nodeCollector.CollectorsGroup == nil {
describeText(sb, 2, wrapTextSuccessOfFailure("Collectors Group Not Created", !expectingNodeCollector))
return
}

describeText(sb, 2, wrapTextSuccessOfFailure("Collectors Group Created", expectingNodeCollector))

var deployedCondition *metav1.Condition
for _, condition := range nodeCollector.CollectorsGroup.Status.Conditions {
if condition.Type == "Deployed" {
deployedCondition = &condition
break
}
}
if deployedCondition == nil {
describeText(sb, 2, wrapTextInRed("Deployed: Status Unavailable"))
} else {
if deployedCondition.Status == metav1.ConditionTrue {
describeText(sb, 2, wrapTextInGreen("Deployed: True"))
} else {
describeText(sb, 2, wrapTextInRed("Deployed: False"))
describeText(sb, 2, wrapTextInRed(fmt.Sprintf("Reason: %s", deployedCondition.Message)))
}
}

ready := nodeCollector.CollectorsGroup.Status.Ready
describeText(sb, 2, wrapTextSuccessOfFailure(fmt.Sprintf("Ready: %t", ready), ready))

// this is copied from k8sutils/pkg/describe/describe.go
// I hope the info is accurate since there can be many edge cases
describeText(sb, 2, "Desired Number of Nodes Scheduled: %d", nodeCollector.DaemonSet.Status.DesiredNumberScheduled)
currentMeetsDesired := nodeCollector.DaemonSet.Status.DesiredNumberScheduled == nodeCollector.DaemonSet.Status.CurrentNumberScheduled
describeText(sb, 2, wrapTextSuccessOfFailure(fmt.Sprintf("Current Number of Nodes Scheduled: %d", nodeCollector.DaemonSet.Status.CurrentNumberScheduled), currentMeetsDesired))
updatedMeetsDesired := nodeCollector.DaemonSet.Status.DesiredNumberScheduled == nodeCollector.DaemonSet.Status.UpdatedNumberScheduled
describeText(sb, 2, wrapTextSuccessOfFailure(fmt.Sprintf("Number of Nodes Scheduled with Up-to-date Pods: %d", nodeCollector.DaemonSet.Status.UpdatedNumberScheduled), updatedMeetsDesired))
availableMeetsDesired := nodeCollector.DaemonSet.Status.DesiredNumberScheduled == nodeCollector.DaemonSet.Status.NumberAvailable
describeText(sb, 2, wrapTextSuccessOfFailure(fmt.Sprintf("Number of Nodes Scheduled with Available Pods: %d", nodeCollector.DaemonSet.Status.NumberAvailable), availableMeetsDesired))
noMisscheduled := nodeCollector.DaemonSet.Status.NumberMisscheduled == 0
describeText(sb, 2, wrapTextSuccessOfFailure(fmt.Sprintf("Number of Nodes Misscheduled: %d", nodeCollector.DaemonSet.Status.NumberMisscheduled), noMisscheduled))
}

func printOdigosPipeline(odigosResources odigosResources, sb *strings.Builder) {
describeText(sb, 0, "Odigos Pipeline:")
numDestinations := len(destinations.Items)
numInstrumentationConfigs := len(instrumentationConfigs.Items)
numDestinations := len(odigosResources.Destinations.Items)
numInstrumentationConfigs := len(odigosResources.InstrumentationConfigs.Items)
// odigos will only initiate pipeline if there are any sources or destinations
expectingPipeline := numDestinations > 0 || numInstrumentationConfigs > 0

printOdigosPipelineStatus(numInstrumentationConfigs, numDestinations, expectingPipeline, sb)
printClusterGatewayStatus(clusterCollector, expectingPipeline, sb)
printClusterCollectorStatus(odigosResources.ClusterCollector, expectingPipeline, sb)
sb.WriteString("\n")
expectingNodeCollector := odigosResources.ClusterCollector.CollectorsGroup != nil && odigosResources.ClusterCollector.CollectorsGroup.Status.Ready && numInstrumentationConfigs > 0
printNodeCollectorStatus(odigosResources.NodeCollector, expectingNodeCollector, sb)
}

func printDescribeOdigos(odigosVersion string, clusterCollector clusterCollectorResources, destinations *odigosv1.DestinationList, instrumentationConfigs *odigosv1.InstrumentationConfigList) string {
func printDescribeOdigos(odigosVersion string, odigosResources odigosResources) string {
var sb strings.Builder

printOdigosVersion(odigosVersion, &sb)
sb.WriteString("\n")
printOdigosPipeline(clusterCollector, destinations, instrumentationConfigs, &sb)
printOdigosPipeline(odigosResources, &sb)

return sb.String()
}
Expand All @@ -201,10 +284,10 @@ func DescribeOdigos(ctx context.Context, kubeClient kubernetes.Interface, odigos
return fmt.Sprintf("Error: %v\n", err)
}

clusterCollector, destinations, instrumentationConfigs, err := getRelevantOdigosResources(ctx, kubeClient, odigosClient, odigosNs)
odigosResources, err := getRelevantOdigosResources(ctx, kubeClient, odigosClient, odigosNs)
if err != nil {
return fmt.Sprintf("Error: %v\n", err)
}

return printDescribeOdigos(odigosVersion, clusterCollector, destinations, instrumentationConfigs)
return printDescribeOdigos(odigosVersion, odigosResources)
}
Loading

0 comments on commit 3e0d9cb

Please sign in to comment.