Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cmd/telemetrygen]: Allow float values for rate #33984

Merged
merged 10 commits into from
Jul 16, 2024
27 changes: 27 additions & 0 deletions .chloggen/telemetrygen-float-rate.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: telemetrygen

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "telemetrygen `--rate` flag changed from Int64 to Float64"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33984]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
4 changes: 2 additions & 2 deletions cmd/telemetrygen/internal/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (v *KeyValue) Type() string {

type Config struct {
WorkerCount int
Rate int64
Rate float64
TotalDuration time.Duration
ReportingInterval time.Duration
SkipSettingGRPCLogger bool
Expand Down Expand Up @@ -115,7 +115,7 @@ func (c *Config) GetTelemetryAttributes() []attribute.KeyValue {
// CommonFlags registers common config flags.
func (c *Config) CommonFlags(fs *pflag.FlagSet) {
fs.IntVar(&c.WorkerCount, "workers", 1, "Number of workers (goroutines) to run")
fs.Int64Var(&c.Rate, "rate", 0, "Approximately how many metrics per second each worker should generate. Zero means no throttling.")
fs.Float64Var(&c.Rate, "rate", 0, "Approximately how many metrics/spans/logs per second each worker should generate. Zero means no throttling.")
fs.DurationVar(&c.TotalDuration, "duration", 0, "For how long to run the test")
fs.DurationVar(&c.ReportingInterval, "interval", 1*time.Second, "Reporting interval")

Expand Down
7 changes: 4 additions & 3 deletions cmd/telemetrygen/internal/logs/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,14 @@ func (w worker) simulateLogs(res *resource.Resource, exporter exporter, telemetr
lattrs.PutStr(string(attr.Key), telemetryAttributes[i].Value.AsString())
}

if err := exporter.export(logs); err != nil {
w.logger.Fatal("exporter failed", zap.Error(err))
}
if err := limiter.Wait(context.Background()); err != nil {
w.logger.Fatal("limiter wait failed, retry", zap.Error(err))
}

if err := exporter.export(logs); err != nil {
w.logger.Fatal("exporter failed", zap.Error(err))
}

i++
if w.numLogs != 0 && i >= int64(w.numLogs) {
break
Expand Down
22 changes: 22 additions & 0 deletions cmd/telemetrygen/internal/logs/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,28 @@ func TestRateOfLogs(t *testing.T) {
assert.True(t, len(exp.logs) <= 20, "there should have been less than 20 logs, had %d", len(exp.logs))
}

func TestRateOfLogsLessThan1(t *testing.T) {
cfg := &Config{
Config: common.Config{
Rate: 0.3,
TotalDuration: time.Second * 3,
WorkerCount: 1,
},
SeverityText: "Info",
SeverityNumber: 9,
}
exp := &mockExporter{}

// test
require.NoError(t, Run(cfg, exp, zap.NewNop()))

// verify
// the minimum acceptable number of logs for the rate of .3/sec for 3 seconds
assert.True(t, len(exp.logs) >= 1, "there should have been 1 or more logs, had %d", len(exp.logs))
// the maximum acceptable number of logs for the rate of .3/sec for 3 seconds
assert.True(t, len(exp.logs) <= 2, "there should have been less than 3 logs, had %d", len(exp.logs))
damemi marked this conversation as resolved.
Show resolved Hide resolved
}

func TestUnthrottled(t *testing.T) {
cfg := &Config{
Config: common.Config{
Expand Down
7 changes: 4 additions & 3 deletions cmd/telemetrygen/internal/metrics/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,14 @@ func (w worker) simulateMetrics(res *resource.Resource, exporterFunc func() (sdk
ScopeMetrics: []metricdata.ScopeMetrics{{Metrics: metrics}},
}

if err := exporter.Export(context.Background(), &rm); err != nil {
w.logger.Fatal("exporter failed", zap.Error(err))
}
if err := limiter.Wait(context.Background()); err != nil {
w.logger.Fatal("limiter wait failed, retry", zap.Error(err))
}

if err := exporter.Export(context.Background(), &rm); err != nil {
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
w.logger.Fatal("exporter failed", zap.Error(err))
}

i++
if w.numMetrics != 0 && i >= int64(w.numMetrics) {
break
Expand Down
25 changes: 25 additions & 0 deletions cmd/telemetrygen/internal/metrics/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,31 @@ func TestRateOfMetrics(t *testing.T) {
assert.True(t, len(m.rms) <= 20, "there should have been less than 20 metrics, had %d", len(m.rms))
}

func TestRateOfMetricsLessThan1(t *testing.T) {
// arrange
cfg := &Config{
Config: common.Config{
Rate: 0.3,
TotalDuration: time.Second * 3,
WorkerCount: 1,
},
MetricType: metricTypeSum,
}
m := &mockExporter{}
expFunc := func() (sdkmetric.Exporter, error) {
return m, nil
}

// act
require.NoError(t, Run(cfg, expFunc, zap.NewNop()))

// assert
// the minimum acceptable number of metrics for the rate of .3/sec for 3 seconds
assert.True(t, len(m.rms) >= 1, "there should have been more than 1 metrics, had %d", len(m.rms))
// the maximum acceptable number of metrics for the rate of .3/sec for 3 seconds
assert.True(t, len(m.rms) <= 2, "there should have been less than 2 metrics, had %d", len(m.rms))
}

func TestUnthrottled(t *testing.T) {
// arrange
cfg := &Config{
Expand Down
12 changes: 8 additions & 4 deletions cmd/telemetrygen/internal/traces/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (w worker) simulateTraces(telemetryAttributes []attribute.KeyValue) {
spanStart := time.Now()
spanEnd := spanStart.Add(w.spanDuration)

if err := limiter.Wait(context.Background()); err != nil {
w.logger.Fatal("limiter waited failed, retry", zap.Error(err))
}

ctx, sp := tracer.Start(context.Background(), "lets-go", trace.WithAttributes(
semconv.NetPeerIPKey.String(fakeIP),
semconv.PeerServiceKey.String("telemetrygen-server"),
Expand All @@ -74,6 +78,10 @@ func (w worker) simulateTraces(telemetryAttributes []attribute.KeyValue) {
var endTimestamp trace.SpanEventOption

for j := 0; j < w.numChildSpans; j++ {
if err := limiter.Wait(context.Background()); err != nil {
w.logger.Fatal("limiter waited failed, retry", zap.Error(err))
}

_, child := tracer.Start(childCtx, "okey-dokey-"+strconv.Itoa(j), trace.WithAttributes(
semconv.NetPeerIPKey.String(fakeIP),
semconv.PeerServiceKey.String("telemetrygen-client"),
Expand All @@ -83,10 +91,6 @@ func (w worker) simulateTraces(telemetryAttributes []attribute.KeyValue) {
)
child.SetAttributes(telemetryAttributes...)

if err := limiter.Wait(context.Background()); err != nil {
w.logger.Fatal("limiter waited failed, retry", zap.Error(err))
}

endTimestamp = trace.WithTimestamp(spanEnd)
child.SetStatus(w.statusCode, "")
child.End(endTimestamp)
Expand Down
30 changes: 30 additions & 0 deletions cmd/telemetrygen/internal/traces/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,36 @@ func TestRateOfSpans(t *testing.T) {
assert.True(t, len(syncer.spans) <= 20, "there should have been less than 20 spans, had %d", len(syncer.spans))
}

func TestRateOfSpansLessThan1(t *testing.T) {
// prepare
syncer := &mockSyncer{}

tracerProvider := sdktrace.NewTracerProvider()
sp := sdktrace.NewSimpleSpanProcessor(syncer)
tracerProvider.RegisterSpanProcessor(sp)
otel.SetTracerProvider(tracerProvider)

cfg := &Config{
Config: common.Config{
Rate: 0.3,
TotalDuration: time.Second * 3,
WorkerCount: 1,
},
}

// sanity check
require.Len(t, syncer.spans, 0)

// test
require.NoError(t, Run(cfg, zap.NewNop()))

// verify
// the minimum acceptable number of spans for the rate of .3/sec for 3 seconds
assert.True(t, len(syncer.spans) >= 1, "there should have been more than 1 spans, had %d", len(syncer.spans))
// the minimum acceptable number of spans for the rate of .3/sec for 3 seconds
codeboten marked this conversation as resolved.
Show resolved Hide resolved
assert.True(t, len(syncer.spans) <= 2, "there should have been less than 2 spans, had %d", len(syncer.spans))
}

func TestSpanDuration(t *testing.T) {
// prepare
syncer := &mockSyncer{}
Expand Down