Skip to content

Commit

Permalink
[jaeger-v2] Add StartTimeMin and StartTimeMax params to consumeTraces…
Browse files Browse the repository at this point in the history
… function. (#5288)

## Which problem is this PR solving?
- Fixes #5281 issue for not being able to consume traces

## Description of the changes
- Added `StartTimeMin` and `StartTimeMax` query params with 1h lookback
to `consumeTraces` function.
- Added logging to the data receivers with the OTelCol codes below as
the references.
-
https://github.com/open-telemetry/opentelemetry-collector/blob/main/otelcol/unmarshaler.go#L45
-
https://github.com/open-telemetry/opentelemetry-collector/blob/main/service/telemetry/telemetry.go#L118

## How was this change tested?
- Tested manually.

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: James Ryans <[email protected]>
  • Loading branch information
james-ryans authored Mar 23, 2024
1 parent d887b74 commit 53592b3
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 24 deletions.
38 changes: 24 additions & 14 deletions cmd/jaeger/internal/integration/datareceivers/jaegerstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,50 +6,60 @@ package datareceivers
import (
"context"
"fmt"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"

"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/integration/receivers/storagereceiver"
)

type jaegerStorageDataReceiver struct {
TraceStorage string
StorageConfig *jaegerstorage.Config
host *storagetest.StorageHost
receiver receiver.Traces
TelemetrySettings component.TelemetrySettings
TraceStorage string
StorageConfig *jaegerstorage.Config
host *storagetest.StorageHost
receiver receiver.Traces
}

func NewJaegerStorageDataReceiver(traceStorage string, storageConfig *jaegerstorage.Config) testbed.DataReceiver {
func NewJaegerStorageDataReceiver(
telemetrySettings component.TelemetrySettings,
traceStorage string,
storageConfig *jaegerstorage.Config,
) testbed.DataReceiver {
return &jaegerStorageDataReceiver{
TraceStorage: traceStorage,
StorageConfig: storageConfig,
TelemetrySettings: telemetrySettings,
TraceStorage: traceStorage,
StorageConfig: storageConfig,
}
}

func (dr *jaegerStorageDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, _ consumer.Logs) error {
ctx := context.Background()

extSet := extension.CreateSettings{
ID: jaegerstorage.ID,
TelemetrySettings: dr.TelemetrySettings,
}
extFactory := jaegerstorage.NewFactory()
ext, err := extFactory.CreateExtension(ctx, extension.CreateSettings{
TelemetrySettings: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
}, dr.StorageConfig)
ext, err := extFactory.CreateExtension(ctx, extSet, dr.StorageConfig)
if err != nil {
return err
}

rcvSet := receivertest.NewNopCreateSettings()
rcvSet := receiver.CreateSettings{
ID: storagereceiver.ID,
TelemetrySettings: dr.TelemetrySettings,
}
rcvFactory := storagereceiver.NewFactory()
rcvCfg := rcvFactory.CreateDefaultConfig().(*storagereceiver.Config)
rcvCfg.TraceStorage = dr.TraceStorage
rcvCfg.PullInterval = 100 * time.Millisecond
rcv, err := rcvFactory.CreateTracesReceiver(ctx, rcvSet, rcvCfg, tc)
if err != nil {
return err
Expand Down
14 changes: 13 additions & 1 deletion cmd/jaeger/internal/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/provider/fileprovider"
"go.opentelemetry.io/collector/otelcol"
"go.opentelemetry.io/collector/service/telemetry"

"github.com/jaegertracing/jaeger/cmd/jaeger/internal"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/exporters/storageexporter"
Expand Down Expand Up @@ -46,13 +48,23 @@ func (s *StorageIntegration) newDataReceiver(t *testing.T, factories otelcol.Fac
cfg, err := configProvider.Get(context.Background(), factories)
require.NoError(t, err)

tel, err := telemetry.New(context.Background(), telemetry.Settings{}, cfg.Service.Telemetry)
require.NoError(t, err)

storageCfg, ok := cfg.Extensions[jaegerstorage.ID].(*jaegerstorage.Config)
require.True(t, ok, "no jaeger storage extension found in the config")

exporterCfg, ok := cfg.Exporters[storageexporter.ID].(*storageexporter.Config)
require.True(t, ok, "no jaeger storage exporter found in the config")

receiver := datareceivers.NewJaegerStorageDataReceiver(exporterCfg.TraceStorage, storageCfg)
telemetrySettings := componenttest.NewNopTelemetrySettings()
telemetrySettings.Logger = tel.Logger()

receiver := datareceivers.NewJaegerStorageDataReceiver(
telemetrySettings,
exporterCfg.TraceStorage,
storageCfg,
)
return receiver
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
// componentType is the name of this extension in configuration.
const componentType = component.Type("jaeger_storage_receiver")

// ID is the identifier of this extension.
var ID = component.NewID(componentType)

func NewFactory() receiver.Factory {
return receiver.NewFactory(
componentType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@ func (r *storageReceiver) consumeLoop(ctx context.Context) error {
}

func (r *storageReceiver) consumeTraces(ctx context.Context, serviceName string) error {
endTime := time.Now()
traces, err := r.spanReader.FindTraces(ctx, &spanstore.TraceQueryParameters{
ServiceName: serviceName,
ServiceName: serviceName,
StartTimeMin: endTime.Add(-1 * time.Hour),
StartTimeMax: endTime,
})
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage"
factoryMocks "github.com/jaegertracing/jaeger/storage/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanStoreMocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
)

Expand Down Expand Up @@ -254,13 +253,11 @@ func TestReceiver_StartConsume(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
reader := new(spanStoreMocks.Reader)
reader.On("GetServices", mock.AnythingOfType("*context.cancelCtx")).Return(test.services, nil)
for _, service := range test.services {
reader.On(
"FindTraces",
mock.AnythingOfType("*context.cancelCtx"),
&spanstore.TraceQueryParameters{ServiceName: service},
).Return(test.traces, test.tracesErr)
}
reader.On(
"FindTraces",
mock.AnythingOfType("*context.cancelCtx"),
mock.AnythingOfType("*spanstore.TraceQueryParameters"),
).Return(test.traces, test.tracesErr)
r.receiver.spanReader = reader

require.NoError(t, r.receiver.Shutdown(ctx))
Expand Down

0 comments on commit 53592b3

Please sign in to comment.