Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK] Update MetricProducer interface to match spec #3044

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions exporters/prometheus/test/collector_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
#include <thread>

using opentelemetry::exporter::metrics::PrometheusCollector;
using opentelemetry::sdk::metrics::MetricProducer;
using opentelemetry::sdk::metrics::ResourceMetrics;
namespace metric_api = opentelemetry::metrics;
namespace metric_sdk = opentelemetry::sdk::metrics;
namespace metric_exporter = opentelemetry::exporter::metrics;

class MockMetricProducer : public opentelemetry::sdk::metrics::MetricProducer
class MockMetricProducer : public MetricProducer
{
TestDataPoints test_data_points_;

Expand All @@ -26,13 +27,12 @@ class MockMetricProducer : public opentelemetry::sdk::metrics::MetricProducer
: sleep_ms_{sleep_ms}
{}

bool Collect(nostd::function_ref<bool(ResourceMetrics &)> callback) noexcept override
MetricProducer::Result Produce() noexcept override
{
std::this_thread::sleep_for(sleep_ms_);
data_sent_size_++;
ResourceMetrics data = test_data_points_.CreateSumPointData();
callback(data);
return true;
return {data, MetricProducer::Status::kSuccess};
}

size_t GetDataCount() { return data_sent_size_; }
Expand Down Expand Up @@ -70,15 +70,13 @@ class MockMetricReader : public opentelemetry::sdk::metrics::MetricReader
*/
TEST(PrometheusCollector, BasicTests)
{
MockMetricReader *reader = new MockMetricReader();
MockMetricProducer *producer = new MockMetricProducer();
reader->SetMetricProducer(producer);
PrometheusCollector collector(reader, true, false);
MockMetricReader reader;
MockMetricProducer producer;
reader.SetMetricProducer(&producer);
PrometheusCollector collector(&reader, true, false);
auto data = collector.Collect();

// Collection size should be the same as the size
// of the records collection produced by MetricProducer.
ASSERT_EQ(data.size(), 2);
delete reader;
delete producer;
}
38 changes: 27 additions & 11 deletions sdk/include/opentelemetry/sdk/metrics/export/metric_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <vector>

#include "opentelemetry/nostd/function_ref.h"
#include "opentelemetry/nostd/variant.h"
#include "opentelemetry/sdk/metrics/data/metric_data.h"
#include "opentelemetry/version.h"

Expand Down Expand Up @@ -70,27 +71,42 @@ struct ResourceMetrics
};

/**
* MetricProducer is the interface that is used to make metric data available to the
* OpenTelemetry exporters. Implementations should be stateful, in that each call to
* `Collect` will return any metric generated since the last call was made.
* MetricProducer defines the interface which bridges to third-party metric sources MUST implement,
* so they can be plugged into an OpenTelemetry MetricReader as a source of aggregated metric data.
*
* <p>Implementations must be thread-safe.
* Implementations must be thread-safe, and should accept configuration for the
* AggregationTemporality of produced metrics.
*/

class MetricProducer
{
public:
MetricProducer() = default;
virtual ~MetricProducer() = default;

MetricProducer(const MetricProducer &) = delete;
MetricProducer(const MetricProducer &&) = delete;
void operator=(const MetricProducer &) = delete;
void operator=(const MetricProducer &&) = delete;

enum class Status
{
kSuccess,
kFailure,
kTimeout,
};

struct Result
{
ResourceMetrics points_;
Status status_;
};

/**
* The callback to be called for each metric exporter. This will only be those
* metrics that have been produced since the last time this method was called.
*
* @return a status of completion of method.
* Produce returns a batch of Metric Points, with a single instrumentation scope that identifies
* the MetricProducer. Implementations may return successfully collected points even if there is a
* partial failure.
*/
virtual bool Collect(
nostd::function_ref<bool(ResourceMetrics &metric_data)> callback) noexcept = 0;
virtual Result Produce() noexcept = 0;
};

} // namespace metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class MetricCollector : public MetricProducer, public CollectorHandle
*
* @return a status of completion of method.
*/
bool Collect(nostd::function_ref<bool(ResourceMetrics &metric_data)> callback) noexcept override;
Result Produce() noexcept override;

bool ForceFlush(std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept;

Expand Down
12 changes: 10 additions & 2 deletions sdk/src/metrics/metric_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ bool MetricReader::Collect(
if (!metric_producer_)
{
OTEL_INTERNAL_LOG_WARN(
"MetricReader::Collect Cannot invoke Collect(). No MetricProducer registered for "
"MetricReader::Collect Cannot invoke Produce(). No MetricProducer registered for "
"collection!")
return false;
}
Expand All @@ -36,7 +36,15 @@ bool MetricReader::Collect(
OTEL_INTERNAL_LOG_WARN("MetricReader::Collect invoked while Shutdown in progress!");
}

return metric_producer_->Collect(callback);
auto result = metric_producer_->Produce();

// According to the spec,
// When the Produce operation fails, the MetricProducer MAY return successfully collected
// results and a failed reasons list to the caller.
// So we invoke the callback with whatever points we get back, even if the overall operation may
// have failed.
auto success = callback(result.points_);
punya marked this conversation as resolved.
Show resolved Hide resolved
return (result.status_ == MetricProducer::Status::kSuccess) && success;
}

bool MetricReader::Shutdown(std::chrono::microseconds timeout) noexcept
Expand Down
9 changes: 4 additions & 5 deletions sdk/src/metrics/state/metric_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace sdk
{
namespace metrics
{
using opentelemetry::sdk::resource::Resource;

MetricCollector::MetricCollector(opentelemetry::sdk::metrics::MeterContext *context,
std::shared_ptr<MetricReader> metric_reader)
Expand All @@ -38,14 +39,13 @@ AggregationTemporality MetricCollector::GetAggregationTemporality(
return metric_reader_->GetAggregationTemporality(instrument_type);
}

bool MetricCollector::Collect(
nostd::function_ref<bool(ResourceMetrics &metric_data)> callback) noexcept
MetricProducer::Result MetricCollector::Produce() noexcept
{
if (!meter_context_)
{
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::Collect] - Error during collecting."
<< "The metric context is invalid");
return false;
return {{}, MetricProducer::Status::kFailure};
}
ResourceMetrics resource_metrics;
meter_context_->ForEachMeter([&](const std::shared_ptr<Meter> &meter) noexcept {
Expand All @@ -61,8 +61,7 @@ bool MetricCollector::Collect(
return true;
});
resource_metrics.resource_ = &meter_context_->GetResource();
callback(resource_metrics);
return true;
return {resource_metrics, MetricProducer::Status::kSuccess};
}

bool MetricCollector::ForceFlush(std::chrono::microseconds timeout) noexcept
Expand Down
2 changes: 1 addition & 1 deletion sdk/test/metrics/metric_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ TEST(MetricReaderTest, BasicTests)
std::shared_ptr<MeterContext> meter_context2(new MeterContext());
std::shared_ptr<MetricProducer> metric_producer{
new MetricCollector(meter_context2.get(), std::move(metric_reader2))};
metric_producer->Collect([](ResourceMetrics & /* metric_data */) { return true; });
metric_producer->Produce();
}
5 changes: 2 additions & 3 deletions sdk/test/metrics/periodic_exporting_metric_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,12 @@ class MockMetricProducer : public MetricProducer
: sleep_ms_{sleep_ms}
{}

bool Collect(nostd::function_ref<bool(ResourceMetrics &)> callback) noexcept override
MetricProducer::Result Produce() noexcept override
{
std::this_thread::sleep_for(sleep_ms_);
data_sent_size_++;
ResourceMetrics data;
callback(data);
return true;
return {data, MetricProducer::Status::kSuccess};
}

size_t GetDataCount() { return data_sent_size_; }
Expand Down