Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jaeger-v2] Add support for Badger #5112

Merged
merged 42 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
d904638
add support for badger in jaegerV2
akagami-harsh Jan 17, 2024
1d03e46
fix
akagami-harsh Jan 17, 2024
cdec3f7
revert ui_config path
akagami-harsh Jan 17, 2024
9384dc4
add .nocover
akagami-harsh Jan 18, 2024
4f00e75
remove datarecivers
akagami-harsh Jan 18, 2024
43ec3c7
change name to badger_primary
akagami-harsh Jan 18, 2024
40c6e9f
remove unnecessary fields
akagami-harsh Jan 18, 2024
0f35c2e
merge main
akagami-harsh Jan 18, 2024
4849ada
run go mod tidy
akagami-harsh Jan 18, 2024
dbd89fb
Merge branch 'main' into implement-badger-for-v2
akagami-harsh Jan 19, 2024
71b9df8
fix
akagami-harsh Jan 19, 2024
137ff81
Update cmd/jaeger/badger_config.yaml
akagami-harsh Jan 20, 2024
cde4a0e
fix
akagami-harsh Jan 20, 2024
046c433
Merge branch 'main' into implement-badger-for-v2
akagami-harsh Jan 20, 2024
4563f42
fix
akagami-harsh Jan 20, 2024
d07168e
fix
akagami-harsh Jan 21, 2024
e2df247
Update submodule to latest commit
akagami-harsh Jan 21, 2024
babae5b
add test to badger factory
akagami-harsh Jan 21, 2024
d2fe9a4
fix
akagami-harsh Jan 21, 2024
f939154
remove unrelated changes
akagami-harsh Jan 21, 2024
74e94c6
Merge branch 'main' of https://github.com/akagami-harsh/jaeger into i…
akagami-harsh Jan 21, 2024
0735463
fix
akagami-harsh Jan 21, 2024
d388571
update submodules
akagami-harsh Jan 22, 2024
c0c65c7
Update cmd/jaeger/badger_config.yaml
akagami-harsh Jan 22, 2024
90f4671
Update cmd/jaeger/badger_config.yaml
akagami-harsh Jan 22, 2024
27097f0
Update cmd/jaeger/internal/extension/jaegerstorage/config.go
akagami-harsh Jan 22, 2024
a24be8f
Update cmd/jaeger/internal/extension/jaegerstorage/config.go
akagami-harsh Jan 22, 2024
6b70cf5
Update cmd/jaeger/badger_config.yaml
akagami-harsh Jan 22, 2024
10aa09b
fix
akagami-harsh Jan 22, 2024
bf5f528
Merge branch 'implement-badger-for-v2' of https://github.com/akagami-…
akagami-harsh Jan 22, 2024
e3a0740
assign map after error check
akagami-harsh Jan 22, 2024
f2408a6
fix
akagami-harsh Jan 22, 2024
f580d97
add test badger extension
akagami-harsh Jan 22, 2024
aa4e691
Merge branch 'main' into implement-badger-for-v2
akagami-harsh Jan 22, 2024
101015c
fix
akagami-harsh Jan 22, 2024
2c48857
Merge branch 'implement-badger-for-v2' of https://github.com/akagami-…
akagami-harsh Jan 22, 2024
2014840
fix
akagami-harsh Jan 22, 2024
76ae325
Update plugin/storage/badger/factory_test.go
akagami-harsh Jan 22, 2024
424ec72
run make fmt
akagami-harsh Jan 22, 2024
48a5ca2
fix lint
akagami-harsh Jan 22, 2024
014c198
log kafka logs
yurishkuro Jan 23, 2024
f9da1a0
pin kafka to patch number
yurishkuro Jan 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions cmd/jaeger/badger_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
service:
extensions: [jaeger_storage]
akagami-harsh marked this conversation as resolved.
Show resolved Hide resolved
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [jaeger_storage_exporter]

extensions:
jaeger_query:
trace_storage: memstore
akagami-harsh marked this conversation as resolved.
Show resolved Hide resolved
ui_config: ./config-ui.json
akagami-harsh marked this conversation as resolved.
Show resolved Hide resolved

jaeger_storage:
badger_primary:
memstore:
akagami-harsh marked this conversation as resolved.
Show resolved Hide resolved
akagami-harsh marked this conversation as resolved.
Show resolved Hide resolved
directory_key: "/tmp/jaeger/"
directory_value: "/tmp/jaeger/"
ephemeral: false
maintenance_interval: 5
metrics_update_interval: 10

receivers:
otlp:
protocols:
grpc:
http:

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: memstore
1 change: 0 additions & 1 deletion cmd/jaeger/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ extensions:
memstore_archive:
max_traces: 100000


receivers:
otlp:
protocols:
Expand Down
1 change: 1 addition & 0 deletions cmd/jaeger/integration/receivers/storagereceiver/.nocover
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
FIXME
30 changes: 30 additions & 0 deletions cmd/jaeger/integration/receivers/storagereceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2024 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storagereceiver

import (
"github.com/asaskevich/govalidator"

badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
)

type Config struct {
Badger badgerCfg.NamespaceConfig `mapstructure:"badger"`
}

func (cfg *Config) Validate() error {
_, err := govalidator.ValidateStruct(cfg)
return err
}
43 changes: 43 additions & 0 deletions cmd/jaeger/integration/receivers/storagereceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2024 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storagereceiver

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
)

const componentType = component.Type("jaeger_storage_receiver")

func NewFactory() receiver.Factory {
return receiver.NewFactory(
componentType,
createDefaultConfig,
receiver.WithTraces(createTraces, component.StabilityLevelDevelopment),
)
}

func createDefaultConfig() component.Config {
return &Config{}
}

func createTraces(ctx context.Context, set receiver.CreateSettings, config component.Config, nextConsumer consumer.Traces) (receiver.Traces, error) {
cfg := config.(*Config)

return newReceiver(cfg, set.TelemetrySettings, nextConsumer)
}
146 changes: 146 additions & 0 deletions cmd/jaeger/integration/receivers/storagereceiver/reciver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright (c) 2024 The Jaeger Authors.
akagami-harsh marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storagereceiver

import (
"context"
"fmt"

jaeger2otlp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

type storageReceiver struct {
cancelConsumeLoop context.CancelFunc
config *Config
logger *zap.Logger
consumedTraces map[model.TraceID]*consumedTrace
nextConsumer consumer.Traces
spanReader spanstore.Reader
}

type consumedTrace struct {
spanIDs map[model.SpanID]struct{}
}

func newReceiver(config *Config, otel component.TelemetrySettings, nextConsumer consumer.Traces) (*storageReceiver, error) {
f, err := badger.NewFactoryWithConfig(
config.Badger,
metrics.NullFactory,
otel.Logger,
)
if err != nil {
return nil, fmt.Errorf("failed to init storage factory: %w", err)
}

spanReader, err := f.CreateSpanReader()
if err != nil {
return nil, fmt.Errorf("failed to create span reader: %w", err)
}

return &storageReceiver{
config: config,
logger: otel.Logger,
consumedTraces: make(map[model.TraceID]*consumedTrace),
nextConsumer: nextConsumer,
spanReader: spanReader,
}, nil
}

func (r *storageReceiver) Start(_ context.Context, host component.Host) error {
ctx, cancel := context.WithCancel(context.Background())
r.cancelConsumeLoop = cancel

go func() {
if err := r.consumeLoop(ctx); err != nil {
host.ReportFatalError(err)
}
}()

return nil
}

func (r *storageReceiver) consumeLoop(ctx context.Context) error {
services := []string{"", "customers", "OTLPResourceNoServiceName"}

for {
for _, svc := range services {
if err := r.consumeTraces(ctx, svc); err != nil {
r.logger.Error("Error from consumer", zap.Error(err))
}
if ctx.Err() != nil {
r.logger.Error("Consumer stopped", zap.Error(ctx.Err()))
return ctx.Err()
}
}
}
}

func (r *storageReceiver) consumeTraces(ctx context.Context, serviceName string) error {
traces, err := r.spanReader.FindTraces(ctx, &spanstore.TraceQueryParameters{
ServiceName: serviceName,
})
if err != nil {
return err
}

cnt := 0
for _, trace := range traces {
cnt += len(trace.Spans)
traceID := trace.Spans[0].TraceID
if _, ok := r.consumedTraces[traceID]; !ok {
r.consumedTraces[traceID] = &consumedTrace{
spanIDs: make(map[model.SpanID]struct{}),
}
}
if len(trace.Spans) > len(r.consumedTraces[traceID].spanIDs) {
r.consumeSpans(ctx, r.consumedTraces[traceID], trace.Spans)
}
}

return nil
}

func (r *storageReceiver) consumeSpans(ctx context.Context, tc *consumedTrace, spans []*model.Span) error {
for _, span := range spans {
if _, ok := tc.spanIDs[span.SpanID]; !ok {
tc.spanIDs[span.SpanID] = struct{}{}
td, err := jaeger2otlp.ProtoToTraces([]*model.Batch{
{
Spans: []*model.Span{span},
Process: span.Process,
},
})
if err != nil {
return err
}
r.nextConsumer.ConsumeTraces(ctx, td)
}
}

return nil
}

func (r *storageReceiver) Shutdown(_ context.Context) error {
r.cancelConsumeLoop()
return nil
}
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func Command() *cobra.Command {

settings := otelcol.CollectorSettings{
BuildInfo: info,
Factories: components,
Factories: Components,
akagami-harsh marked this conversation as resolved.
Show resolved Hide resolved
}

cmd := otelcol.NewCommand(settings)
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,6 @@ func (b builders) build() (otelcol.Factories, error) {
return factories, nil
}

func components() (otelcol.Factories, error) {
func Components() (otelcol.Factories, error) {
return defaultBuilders().build()
}
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/components_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

func TestComponents(t *testing.T) {
factories, err := components()
factories, err := Components()

require.NoError(t, err)

Expand Down
8 changes: 7 additions & 1 deletion cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"reflect"

memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
)

// Config has the configuration for jaeger-query,
type Config struct {
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger_primary"`
akagami-harsh marked this conversation as resolved.
Show resolved Hide resolved
// TODO add other storage types here
// TODO how will this work with 3rd party storage implementations?
// Option: instead of looking for specific name, check interface.
Expand All @@ -22,6 +24,10 @@ type MemoryStorage struct {
Name string `mapstructure:"name"`
memoryCfg.Configuration
}
type BadgerStorage struct {
Name string `mapstructure:"name"`
badgerCfg.NamespaceConfig
}
akagami-harsh marked this conversation as resolved.
Show resolved Hide resolved

func (cfg *Config) Validate() error {
emptyCfg := createDefaultConfig().(*Config)
Expand Down
17 changes: 17 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage"
)
Expand Down Expand Up @@ -70,6 +71,22 @@
s.logger.With(zap.String("storage_name", name)),
)
}

for name, b := range s.config.Badger {
if _, ok := s.factories[name]; ok {
return fmt.Errorf("duplicate badger storage name %s", name)
}
var err error
s.factories[name], err = badger.NewFactoryWithConfig(
akagami-harsh marked this conversation as resolved.
Show resolved Hide resolved
b,
metrics.NullFactory,
s.logger.With(zap.String("storage_name", name)),
)
if err != nil {
return fmt.Errorf("failed to initialize badger storage: %w", err)
}

Check warning on line 87 in cmd/jaeger/internal/extension/jaegerstorage/extension.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/extension.go#L76-L87

Added lines #L76 - L87 were not covered by tests
}
akagami-harsh marked this conversation as resolved.
Show resolved Hide resolved

// TODO add support for other backends
return nil
}
Expand Down
Loading
Loading