diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8a24d42a08b..f3bb1acc9a3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -113,6 +113,7 @@ fields added to events containing the Beats version. {pull}37553[37553] *Metricbeat* +- Fix Azure Monitor 429 error by causing metricbeat to retry the request again. {pull}38294[38294] - Fix fields not being parsed correctly in postgresql/database {issue}25301[25301] {pull}37720[37720] *Osquerybeat* diff --git a/x-pack/metricbeat/module/azure/mock_service.go b/x-pack/metricbeat/module/azure/mock_service.go index 65f606dde12..9626952fa6d 100644 --- a/x-pack/metricbeat/module/azure/mock_service.go +++ b/x-pack/metricbeat/module/azure/mock_service.go @@ -29,8 +29,8 @@ func (client *MockService) GetResourceDefinitions(id []string, group []string, r return args.Get(0).([]*armresources.GenericResourceExpanded), args.Error(1) } -// GetMetricDefinitions is a mock function for the azure service -func (client *MockService) GetMetricDefinitions(resourceId string, namespace string) (armmonitor.MetricDefinitionCollection, error) { +// GetMetricDefinitionsWithRetry is a mock function for the azure service +func (client *MockService) GetMetricDefinitionsWithRetry(resourceId string, namespace string) (armmonitor.MetricDefinitionCollection, error) { args := client.Called(resourceId, namespace) return args.Get(0).(armmonitor.MetricDefinitionCollection), args.Error(1) } diff --git a/x-pack/metricbeat/module/azure/monitor/client_helper.go b/x-pack/metricbeat/module/azure/monitor/client_helper.go index 9d69f67f687..5fa5b9964e6 100644 --- a/x-pack/metricbeat/module/azure/monitor/client_helper.go +++ b/x-pack/metricbeat/module/azure/monitor/client_helper.go @@ -20,12 +20,24 @@ const missingNamespace = "no metric definitions were found for resource %s and n // mapMetrics should validate and map the metric related configuration to relevant azure monitor api parameters func mapMetrics(client *azure.Client, resources []*armresources.GenericResourceExpanded, resourceConfig azure.ResourceConfig) ([]azure.Metric, error) { var metrics []azure.Metric + for _, resource := range resources { + + // We use this map to avoid calling the metrics definition function for the same namespace and same resource + // multiple times. + namespaceMetrics := make(map[string]armmonitor.MetricDefinitionCollection) + for _, metric := range resourceConfig.Metrics { - // get all metrics supported by the namespace provided - metricDefinitions, err := client.AzureMonitorService.GetMetricDefinitions(*resource.ID, metric.Namespace) - if err != nil { - return nil, fmt.Errorf("no metric definitions were found for resource %s and namespace %s %w", *resource.ID, metric.Namespace, err) + + var err error + + metricDefinitions, exists := namespaceMetrics[metric.Namespace] + if !exists { + metricDefinitions, err = client.AzureMonitorService.GetMetricDefinitionsWithRetry(*resource.ID, metric.Namespace) + if err != nil { + return nil, err + } + namespaceMetrics[metric.Namespace] = metricDefinitions } if len(metricDefinitions.Value) == 0 { diff --git a/x-pack/metricbeat/module/azure/monitor/client_helper_test.go b/x-pack/metricbeat/module/azure/monitor/client_helper_test.go index d5c89bbbd78..782d941166b 100644 --- a/x-pack/metricbeat/module/azure/monitor/client_helper_test.go +++ b/x-pack/metricbeat/module/azure/monitor/client_helper_test.go @@ -88,7 +88,7 @@ func TestMapMetric(t *testing.T) { client := azure.NewMockClient() t.Run("return error when no metric definitions were found", func(t *testing.T) { m := &azure.MockService{} - m.On("GetMetricDefinitions", mock.Anything, mock.Anything).Return(armmonitor.MetricDefinitionCollection{}, fmt.Errorf("invalid resource ID")) + m.On("GetMetricDefinitionsWithRetry", mock.Anything, mock.Anything).Return(armmonitor.MetricDefinitionCollection{}, fmt.Errorf("invalid resource ID")) client.AzureMonitorService = m metric, err := mapMetrics(client, []*armresources.GenericResourceExpanded{resource}, resourceConfig) assert.Error(t, err) @@ -97,7 +97,7 @@ func TestMapMetric(t *testing.T) { }) t.Run("return all metrics when all metric names and aggregations were configured", func(t *testing.T) { m := &azure.MockService{} - m.On("GetMetricDefinitions", mock.Anything, mock.Anything).Return(metricDefinitions, nil) + m.On("GetMetricDefinitionsWithRetry", mock.Anything, mock.Anything).Return(metricDefinitions, nil) client.AzureMonitorService = m metricConfig.Name = []string{"*"} resourceConfig.Metrics = []azure.MetricConfig{metricConfig} @@ -112,7 +112,7 @@ func TestMapMetric(t *testing.T) { }) t.Run("return all metrics when specific metric names and aggregations were configured", func(t *testing.T) { m := &azure.MockService{} - m.On("GetMetricDefinitions", mock.Anything, mock.Anything).Return(metricDefinitions, nil) + m.On("GetMetricDefinitionsWithRetry", mock.Anything, mock.Anything).Return(metricDefinitions, nil) client.AzureMonitorService = m metricConfig.Name = []string{"TotalRequests", "Capacity"} metricConfig.Aggregations = []string{"Average"} diff --git a/x-pack/metricbeat/module/azure/monitor_service.go b/x-pack/metricbeat/module/azure/monitor_service.go index 823a9cdf22a..70d79729920 100644 --- a/x-pack/metricbeat/module/azure/monitor_service.go +++ b/x-pack/metricbeat/module/azure/monitor_service.go @@ -6,8 +6,13 @@ package azure import ( "context" + "errors" "fmt" + "net/http" "strings" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/elastic/elastic-agent-libs/logp" @@ -195,8 +200,43 @@ func (service *MonitorService) GetMetricNamespaces(resourceId string) (armmonito return metricNamespaceCollection, nil } -// GetMetricDefinitions will return all supported metrics based on the resource id and namespace -func (service *MonitorService) GetMetricDefinitions(resourceId string, namespace string) (armmonitor.MetricDefinitionCollection, error) { +// sleepIfPossible will check for the error 429 in the azure response, and look for the retry after header. +// If the header is present, then metricbeat will sleep for that duration, otherwise it will return an error. +func (service *MonitorService) sleepIfPossible(err error, resourceId string, namespace string) error { + errorMsg := "no metric definitions were found for resource " + resourceId + " and namespace " + namespace + + var respError *azcore.ResponseError + ok := errors.As(err, &respError) + if !ok { + return fmt.Errorf("%s, failed to cast error to azcore.ResponseError", errorMsg) + } + // Check for TooManyRequests error and retry if it is the case + if respError.StatusCode != http.StatusTooManyRequests { + return fmt.Errorf("%s, %w", errorMsg, err) + } + + // Check if the error has the header Retry After. + // If it is present, then we should try to make this request again. + retryAfter := respError.RawResponse.Header.Get("Retry-After") + if retryAfter == "" { + return fmt.Errorf("%s %w, failed to find Retry-After header", errorMsg, err) + } + + duration, errD := time.ParseDuration(retryAfter + "s") + if errD != nil { + return fmt.Errorf("%s, failed to parse duration %s from header retry after", errorMsg, retryAfter) + } + + service.log.Infof("%s, metricbeat will try again after %s seconds", errorMsg, retryAfter) + time.Sleep(duration) + service.log.Infof("%s, metricbeat finished sleeping and will try again now", errorMsg) + + return nil +} + +// GetMetricDefinitionsWithRetry will return all supported metrics based on the resource id and namespace +// It will check for an error when moving the pager to the next page, and retry if possible. +func (service *MonitorService) GetMetricDefinitionsWithRetry(resourceId string, namespace string) (armmonitor.MetricDefinitionCollection, error) { opts := &armmonitor.MetricDefinitionsClientListOptions{} if namespace != "" { @@ -210,9 +250,12 @@ func (service *MonitorService) GetMetricDefinitions(resourceId string, namespace for pager.More() { nextPage, err := pager.NextPage(service.context) if err != nil { - return armmonitor.MetricDefinitionCollection{}, err + retryError := service.sleepIfPossible(err, resourceId, namespace) + if retryError != nil { + return armmonitor.MetricDefinitionCollection{}, err + } + continue } - metricDefinitionCollection.Value = append(metricDefinitionCollection.Value, nextPage.Value...) } diff --git a/x-pack/metricbeat/module/azure/service_interface.go b/x-pack/metricbeat/module/azure/service_interface.go index 39a7da63621..cb524c7f6ea 100644 --- a/x-pack/metricbeat/module/azure/service_interface.go +++ b/x-pack/metricbeat/module/azure/service_interface.go @@ -13,7 +13,7 @@ import ( type Service interface { GetResourceDefinitionById(id string) (armresources.GenericResource, error) GetResourceDefinitions(id []string, group []string, rType string, query string) ([]*armresources.GenericResourceExpanded, error) - GetMetricDefinitions(resourceId string, namespace string) (armmonitor.MetricDefinitionCollection, error) + GetMetricDefinitionsWithRetry(resourceId string, namespace string) (armmonitor.MetricDefinitionCollection, error) GetMetricNamespaces(resourceId string) (armmonitor.MetricNamespaceCollection, error) GetMetricValues(resourceId string, namespace string, timegrain string, timespan string, metricNames []string, aggregations string, filter string) ([]armmonitor.Metric, string, error) } diff --git a/x-pack/metricbeat/module/azure/storage/client_helper.go b/x-pack/metricbeat/module/azure/storage/client_helper.go index 393607be7ae..e60b9472a57 100644 --- a/x-pack/metricbeat/module/azure/storage/client_helper.go +++ b/x-pack/metricbeat/module/azure/storage/client_helper.go @@ -41,13 +41,13 @@ func mapMetrics(client *azure.Client, resources []*armresources.GenericResourceE } // get all metric definitions supported by the namespace provided - metricDefinitions, err := client.AzureMonitorService.GetMetricDefinitions(resourceID, namespace) + metricDefinitions, err := client.AzureMonitorService.GetMetricDefinitionsWithRetry(resourceID, namespace) if err != nil { - return nil, fmt.Errorf("no metric definitions were found for resource %s and namespace %s %w", resourceID, namespace, err) + return nil, err } if len(metricDefinitions.Value) == 0 { - return nil, fmt.Errorf("no metric definitions were found for resource %s and namespace %s %w", resourceID, namespace, err) + return nil, fmt.Errorf("no metric definitions were found for resource %s and namespace %s", resourceID, namespace) } var filteredMetricDefinitions []armmonitor.MetricDefinition diff --git a/x-pack/metricbeat/module/azure/storage/client_helper_test.go b/x-pack/metricbeat/module/azure/storage/client_helper_test.go index ecdf4941ac9..14121c3a0b3 100644 --- a/x-pack/metricbeat/module/azure/storage/client_helper_test.go +++ b/x-pack/metricbeat/module/azure/storage/client_helper_test.go @@ -119,17 +119,17 @@ func TestMapMetric(t *testing.T) { client := azure.NewMockClient() t.Run("return error when no metric definitions were found", func(t *testing.T) { m := &azure.MockService{} - m.On("GetMetricDefinitions", mock.Anything, mock.Anything).Return(emptyMetricDefinitions, nil) + m.On("GetMetricDefinitionsWithRetry", mock.Anything, mock.Anything).Return(emptyMetricDefinitions, nil) client.AzureMonitorService = m metric, err := mapMetrics(client, []*armresources.GenericResourceExpanded{resource}, resourceConfig) assert.Error(t, err) - assert.Equal(t, err.Error(), "no metric definitions were found for resource 123 and namespace Microsoft.Storage/storageAccounts %!w()") + assert.Equal(t, err.Error(), "no metric definitions were found for resource 123 and namespace Microsoft.Storage/storageAccounts") assert.Equal(t, metric, []azure.Metric(nil)) m.AssertExpectations(t) }) t.Run("return mapped metrics correctly", func(t *testing.T) { m := &azure.MockService{} - m.On("GetMetricDefinitions", mock.Anything, mock.Anything).Return(metricDefinitions, nil) + m.On("GetMetricDefinitionsWithRetry", mock.Anything, mock.Anything).Return(metricDefinitions, nil) client.AzureMonitorService = m metrics, err := mapMetrics(client, []*armresources.GenericResourceExpanded{resource}, resourceConfig) assert.NoError(t, err)