-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[receiver/gelfreceiver] Added Support for GELF log receiver. #33858
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,9 +2,7 @@ | |
|
||
module github.com/open-telemetry/opentelemetry-collector-contrib/cmd/otelcontribcol | ||
|
||
go 1.21.0 | ||
|
||
toolchain go1.21.11 | ||
go 1.22.0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please revert |
||
|
||
require ( | ||
github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/s3provider v0.102.0 | ||
|
@@ -141,6 +139,7 @@ require ( | |
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filestatsreceiver v0.102.0 | ||
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver v0.102.0 | ||
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/fluentforwardreceiver v0.102.0 | ||
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/gelfreceiver v0.0.0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this should be v0.102.0 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, and same in the builder yaml. If anything, you should defer adding this receiver to cmd/otelcontribcol to a second PR. |
||
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver v0.102.0 | ||
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver v0.102.0 | ||
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/haproxyreceiver v0.102.0 | ||
|
@@ -1095,6 +1094,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/a | |
|
||
replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zookeeperreceiver => ../../receiver/zookeeperreceiver | ||
|
||
replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/gelfreceiver => ../../receiver/gelfreceiver | ||
|
||
replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/wavefrontreceiver => ../../receiver/wavefrontreceiver | ||
|
||
replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbreceiver => ../../receiver/mongodbreceiver | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,113 @@ | ||||||
// Copyright The OpenTelemetry Authors | ||||||
// SPDX-License-Identifier: Apache-2.0 | ||||||
|
||||||
package gelfinternal // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/udp" | ||||||
|
||||||
import ( | ||||||
"fmt" | ||||||
"net" | ||||||
"sync" | ||||||
|
||||||
"go.opentelemetry.io/collector/component" | ||||||
|
||||||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" | ||||||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" | ||||||
) | ||||||
|
||||||
const ( | ||||||
operatorType = "gelf_input" | ||||||
defaultReaders = 1 | ||||||
defaultProcessors = 1 | ||||||
defaultUDPMaxQueueLength = 100 | ||||||
defaultListenAddress = "0.0.0.0:31250" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use localhost as the default binding instead of |
||||||
defaultProtocol = "udp" | ||||||
MaxUDPSize = 64 * 1024 | ||||||
) | ||||||
|
||||||
func init() { | ||||||
operator.Register(operatorType, func() operator.Builder { return NewConfig() }) | ||||||
} | ||||||
|
||||||
// NewConfig creates a new UDP input config with default values | ||||||
func NewConfig() *Config { | ||||||
return NewConfigWithID(operatorType) | ||||||
} | ||||||
|
||||||
// NewConfigWithID creates a new UDP input config with default values | ||||||
func NewConfigWithID(operatorID string) *Config { | ||||||
return &Config{ | ||||||
InputConfig: helper.NewInputConfig(operatorID, operatorType), | ||||||
BaseConfig: BaseConfig{ | ||||||
ListenAddress: string(defaultListenAddress), | ||||||
Protocol: string(defaultProtocol), | ||||||
AsyncReaders: defaultReaders, | ||||||
AsyncProcessors: defaultProcessors, | ||||||
UDPMaxQueueLength: defaultUDPMaxQueueLength, | ||||||
}, | ||||||
} | ||||||
} | ||||||
|
||||||
// Config is the configuration of a udp input operator. | ||||||
type Config struct { | ||||||
helper.InputConfig `mapstructure:",squash"` | ||||||
BaseConfig `mapstructure:",squash"` | ||||||
} | ||||||
|
||||||
// BaseConfig is the details configuration of a udp input operator. | ||||||
type BaseConfig struct { | ||||||
ListenAddress string `mapstructure:"listen_address,omitempty"` | ||||||
Protocol string `mapstructure:"protocol,omitempty"` | ||||||
AsyncReaders int `mapstructure:"async_readers,omitempty"` | ||||||
AsyncProcessors int `mapstructure:"async_processors,omitempty"` | ||||||
UDPMaxQueueLength int `mapstructure:"udp_max_queue_length,omitempty"` | ||||||
} | ||||||
|
||||||
// Build will build a udp input operator. | ||||||
func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error) { | ||||||
inputOperator, err := c.InputConfig.Build(set) | ||||||
if err != nil { | ||||||
return nil, err | ||||||
} | ||||||
|
||||||
if c.ListenAddress == "" { | ||||||
return nil, fmt.Errorf("missing required parameter 'listen_address'") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
|
||||||
if _, _, err := net.SplitHostPort(c.ListenAddress); err != nil { | ||||||
return nil, fmt.Errorf("invalid listen_address: %w", err) | ||||||
} | ||||||
|
||||||
if c.Protocol != "udp" { | ||||||
return nil, fmt.Errorf("supported protocols - udp, invalid protocol: %s", c.Protocol) | ||||||
} | ||||||
if c.AsyncReaders <= 0 && c.AsyncReaders < 20 { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are you sure of these conditions? |
||||||
return nil, fmt.Errorf("invalid async_reader: %d", c.AsyncReaders) | ||||||
} | ||||||
if c.AsyncProcessors <= 0 && c.AsyncProcessors < 20 { | ||||||
return nil, fmt.Errorf("invalid async_processors: %d", c.AsyncProcessors) | ||||||
} | ||||||
if c.UDPMaxQueueLength <= 0 && c.UDPMaxQueueLength < 65535 { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
return nil, fmt.Errorf("expecting queue length greater than 0 and less than 65535, invalid udp_max_queue_length: %d", c.UDPMaxQueueLength) | ||||||
} | ||||||
|
||||||
udpInput := &Input{ | ||||||
InputOperator: inputOperator, | ||||||
address: c.ListenAddress, | ||||||
protocol: c.Protocol, | ||||||
udpMessageQueue: make(chan UDPMessage, c.UDPMaxQueueLength), | ||||||
readBufferPool: sync.Pool{ | ||||||
New: func() any { | ||||||
buffer := make([]byte, MaxUDPSize) | ||||||
return &buffer | ||||||
}, | ||||||
}, | ||||||
buffer: make(map[string]*MapGelfMessage), | ||||||
lastBuffer: make(map[string]*MapGelfMessage), | ||||||
asyncReaders: c.AsyncReaders, | ||||||
asyncProcessors: c.AsyncProcessors, | ||||||
} | ||||||
|
||||||
// fmt.Println("GELF receiver config validated.") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
return udpInput, nil | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please peg to a specific version (commit hash works in absence of tags) instead of using latest