Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add local storage and retry logic for Azure Metrics Exporter + flush telemetry on exit #845

Merged
merged 10 commits into from
Jan 30, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging

import requests

from opencensus.common import utils as common_utils
from opencensus.ext.azure.common import Options, utils
from opencensus.ext.azure.common.protocol import (
Expand All @@ -25,6 +22,8 @@
Envelope,
MetricData,
)
from opencensus.ext.azure.common.storage import LocalFileStorage
from opencensus.ext.azure.common.transport import TransportMixin
from opencensus.ext.azure.metrics_exporter import standard_metrics
from opencensus.metrics import transport
from opencensus.metrics.export.metric_descriptor import MetricDescriptorType
Expand All @@ -35,49 +34,65 @@
logger = logging.getLogger(__name__)


class MetricsExporter(object):
class MetricsExporter(TransportMixin):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetricsExporter isn't a BaseExporter because we're using the export thread from get_exporter_thread instead of BaseExporter's worker?

It looks like there's a fair amount of duplicated code here, and that BaseExporter should be refactored or made specific to tracing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it seems like MetricsExporter and AzureLogHandler have very similar behaviour in terms of the exporter thread. I would like to eventually have all of them be a BaseExporter. Was there any discussions with you and Reiley why this shouldn't be the case (why it was like this in the first place)? In any case, I'd like to keep those refactoring changes separate from a feature PR like this one, but I will cut a ticket open for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[#852]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there any discussions with you and Reiley why this shouldn't be the case (why it was like this in the first place)?

It looks like BaseExporter was factored out in #642, but that PR didn't touch the StackdriverStatsExporter, which influenced the design of the other stats exporters. In any case this can be a problem for another PR.

"""Metrics exporter for Microsoft Azure Monitor."""

def __init__(self, options=None):
if options is None:
options = Options()
self.options = options
def __init__(self, **options):
self.options = Options(**options)
utils.validate_instrumentation_key(self.options.instrumentation_key)
if self.options.max_batch_size <= 0:
raise ValueError('Max batch size must be at least 1.')
self.export_interval = self.options.export_interval
self.max_batch_size = self.options.max_batch_size
self.storage = LocalFileStorage(
path=self.options.storage_path,
max_size=self.options.storage_max_size,
maintenance_period=self.options.storage_maintenance_period,
retention_period=self.options.storage_retention_period,
)
super(MetricsExporter, self).__init__()

def export_metrics(self, metrics):
if metrics:
envelopes = []
for metric in metrics:
# No support for histogram aggregations
type_ = metric.descriptor.type
if type_ != MetricDescriptorType.CUMULATIVE_DISTRIBUTION:
md = metric.descriptor
# Each time series will be uniquely identified by its
# label values
for time_series in metric.time_series:
# Using stats, time_series should only have one point
# which contains the aggregated value
data_point = self.create_data_points(
time_series, md)[0]
# The timestamp is when the metric was recorded
time_stamp = time_series.points[0].timestamp
# Get the properties using label keys from metric and
# label values of the time series
properties = self.create_properties(time_series, md)
envelopes.append(self.create_envelope(data_point,
time_stamp,
properties))
# Send data in batches of max_batch_size
if envelopes:
batched_envelopes = list(common_utils.window(
envelopes, self.max_batch_size))
for batch in batched_envelopes:
self._transmit_without_retry(batch)

def create_data_points(self, time_series, metric_descriptor):
envelopes = []
for metric in metrics:
envelopes.extend(self.metric_to_envelopes(metric))
# Send data in batches of max_batch_size
batched_envelopes = list(common_utils.window(
envelopes, self.max_batch_size))
for batch in batched_envelopes:
result = self._transmit(batch)
if result > 0:
self.storage.put(batch, result)

# If there is still room to transmit envelopes, transmit from storage
# if available
if len(envelopes) < self.options.max_batch_size:
self._transmit_from_storage()

def metric_to_envelopes(self, metric):
envelopes = []
# No support for histogram aggregations
type_ = metric.descriptor.type
lzchen marked this conversation as resolved.
Show resolved Hide resolved
if type_ != MetricDescriptorType.CUMULATIVE_DISTRIBUTION:
lzchen marked this conversation as resolved.
Show resolved Hide resolved
md = metric.descriptor
# Each time series will be uniquely identified by its
# label values
for time_series in metric.time_series:
# Using stats, time_series should only have one
# point which contains the aggregated value
data_point = self._create_data_points(
time_series, md)[0]
# The timestamp is when the metric was recorded
time_stamp = time_series.points[0].timestamp
lzchen marked this conversation as resolved.
Show resolved Hide resolved
# Get the properties using label keys from metric
# and label values of the time series
properties = self._create_properties(time_series, md)
envelopes.append(self._create_envelope(data_point,
time_stamp,
properties))
return envelopes

def _create_data_points(self, time_series, metric_descriptor):
"""Convert a metric's OC time series to list of Azure data points."""
data_points = []
for point in time_series.points:
Expand All @@ -88,7 +103,7 @@ def create_data_points(self, time_series, metric_descriptor):
data_points.append(data_point)
return data_points

def create_properties(self, time_series, metric_descriptor):
def _create_properties(self, time_series, metric_descriptor):
properties = {}
# We construct a properties map from the label keys and values. We
# assume the ordering is already correct
Expand All @@ -100,7 +115,7 @@ def create_properties(self, time_series, metric_descriptor):
properties[metric_descriptor.label_keys[i].key] = value
return properties

def create_envelope(self, data_point, time_stamp, properties):
def _create_envelope(self, data_point, time_stamp, properties):
envelope = Envelope(
iKey=self.options.instrumentation_key,
tags=dict(utils.azure_monitor_context),
Expand All @@ -114,125 +129,13 @@ def create_envelope(self, data_point, time_stamp, properties):
envelope.data = Data(baseData=data, baseType="MetricData")
return envelope

def _transmit_without_retry(self, envelopes):
# Contains logic from transport._transmit
# TODO: Remove this function from exporter and consolidate with
# transport._transmit to cover all exporter use cases. Uses cases
# pertain to properly handling failures and implementing a retry
# policy for this exporter.
# TODO: implement retry policy
"""
Transmit the data envelopes to the ingestion service.
Does not perform retry logic. For partial success and
non-retryable failure, simply outputs result to logs.
This function should never throw exception.
"""
try:
response = requests.post(
url=self.options.endpoint,
data=json.dumps(envelopes),
headers={
'Accept': 'application/json',
'Content-Type': 'application/json; charset=utf-8',
},
timeout=self.options.timeout,
)
except Exception as ex:
# No retry policy, log output
logger.warning('Transient client side error %s.', ex)
return

text = 'N/A'
data = None
# Handle the possible results from the response
if response is None:
logger.warning('Error: cannot read response.')
return
try:
status_code = response.status_code
except Exception as ex:
logger.warning('Error while reading response status code %s.', ex)
return
try:
text = response.text
except Exception as ex:
logger.warning('Error while reading response body %s.', ex)
return
try:
data = json.loads(text)
except Exception as ex:
logger.warning('Error while loading ' +
'json from response body %s.', ex)
return
if status_code == 200:
logger.info('Transmission succeeded: %s.', text)
return
# Check for retryable partial content
if status_code == 206:
if data:
try:
retryable_envelopes = []
for error in data['errors']:
if error['statusCode'] in (
429, # Too Many Requests
500, # Internal Server Error
503, # Service Unavailable
):
retryable_envelopes.append(
envelopes[error['index']])
else:
logger.error(
'Data drop %s: %s %s.',
error['statusCode'],
error['message'],
envelopes[error['index']],
)
# show the envelopes that can be retried manually for
# visibility
if retryable_envelopes:
logger.warning(
'Error while processing data. Data dropped. ' +
'Consider manually retrying for envelopes: %s.',
retryable_envelopes
)
return
except Exception:
logger.exception(
'Error while processing %s: %s.',
status_code,
text
)
return
# Check for non-retryable result
if status_code in (
206, # Partial Content
429, # Too Many Requests
500, # Internal Server Error
503, # Service Unavailable
):
# server side error (retryable)
logger.warning(
'Transient server side error %s: %s. ' +
'Consider manually trying.',
status_code,
text,
)
else:
# server side error (non-retryable)
logger.error(
'Non-retryable server side error %s: %s.',
status_code,
text,
)


def new_metrics_exporter(**options):
options_ = Options(**options)
exporter = MetricsExporter(options=options_)
exporter = MetricsExporter(**options)
producers = [stats_module.stats]
if options_.enable_standard_metrics:
if exporter.options.enable_standard_metrics:
producers.append(standard_metrics.producer)
transport.get_exporter_thread(producers,
exporter,
interval=options_.export_interval)
interval=exporter.options.export_interval)
return exporter
Loading