Skip to content

Commit

Permalink
Changes to the pulsarexporter configuration to prepare with using ups…
Browse files Browse the repository at this point in the history
…tream (open-telemetry#2650)

* Changes to the pulsarexporter configuration to prepare with using upstream

* fieldalignment fix
  • Loading branch information
atoulme authored Mar 3, 2023
1 parent 66a862b commit ceaae86
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 15 deletions.
55 changes: 46 additions & 9 deletions internal/exporter/pulsarexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/zap"
)

type Authentication struct {
Expand All @@ -47,16 +48,16 @@ type Config struct {
type Producer struct {
Properties map[string]string `mapstructure:"producer_properties"`
MaxReconnectToBroker *uint `mapstructure:"max_reconnect_broker"`
HashingScheme string `mapstructure:"hashing_scheme"`
CompressionLevel string `mapstructure:"compression_level"`
SendTimeout *time.Duration `mapstructure:"send_timeout"`
BatcherBuilderType string `mapstructure:"batch_builder_type"`
CompressionType string `mapstructure:"compression_type"`
CompressionLevel string `mapstructure:"compression_level"`
HashingScheme string `mapstructure:"hashing_scheme"`
MaxPendingMessages int `mapstructure:"max_pending_messages"`
BatcherBuilderType int `mapstructure:"batch_builder_type"`
PartitionsAutoDiscoveryInterval time.Duration `mapstructure:"partitions_auto_discovery_interval"`
BatchingMaxPublishDelay time.Duration `mapstructure:"batching_max_publish_delay"`
BatchingMaxMessages uint `mapstructure:"batching_max_messages"`
BatchingMaxSize uint `mapstructure:"batching_max_size"`
SendTimeout time.Duration `mapstructure:"send_timeout"`
DisableBlockIfQueueFull bool `mapstructure:"disable_block_if_queue_full"`
DisableBatching bool `mapstructure:"disable_batching"`
}
Expand Down Expand Up @@ -92,12 +93,27 @@ func (cfg *Config) getClientOptions() (pulsar.ClientOptions, error) {
return options, nil
}

func (cfg *Config) getProducerOptions() (pulsar.ProducerOptions, error) {
func (cfg *Config) getProducerOptions(logger *zap.Logger) (pulsar.ProducerOptions, error) {
// Properties are not used. Issue a warning that these are no longer used.
if cfg.Producer.Properties != nil {
logger.Warn("`producer.properties` is no longer used and will be removed in a subsequent release. Please remove this property from the configuration.")
}
if cfg.Producer.BatcherBuilderType == "1" {
logger.Warn("`producer.batch_builder_type` value 1 is deprecated and should use the value `key_based` instead.")
}
if cfg.Producer.BatcherBuilderType == "0" {
logger.Warn("`producer.batch_builder_type` value 0 is deprecated and should use the value `default` instead.")
}
timeout := cfg.Timeout
if cfg.Producer.SendTimeout != nil {
logger.Warn("`producer.send_timeout` is deprecated and will be removed in a subsequent release. Please use `timeout` instead")
timeout = *cfg.Producer.SendTimeout
}

producerOptions := pulsar.ProducerOptions{
Topic: cfg.Topic,
DisableBatching: cfg.Producer.DisableBatching,
SendTimeout: cfg.Producer.SendTimeout,
SendTimeout: timeout,
DisableBlockIfQueueFull: cfg.Producer.DisableBlockIfQueueFull,
MaxPendingMessages: cfg.Producer.MaxPendingMessages,
BatchingMaxPublishDelay: cfg.Producer.BatchingMaxPublishDelay,
Expand All @@ -107,6 +123,12 @@ func (cfg *Config) getProducerOptions() (pulsar.ProducerOptions, error) {
MaxReconnectToBroker: cfg.Producer.MaxReconnectToBroker,
}

batchBuilderType, err := stringToBatchBuilderType(cfg.Producer.BatcherBuilderType)
if err != nil {
return producerOptions, err
}
producerOptions.BatcherBuilderType = batchBuilderType

compressionType, err := stringToCompressionType(cfg.Producer.CompressionType)
if err != nil {
return producerOptions, err
Expand Down Expand Up @@ -139,7 +161,7 @@ func stringToCompressionType(compressionType string) (pulsar.CompressionType, er
case "zstd":
return pulsar.ZSTD, nil
default:
return pulsar.NoCompression, fmt.Errorf("producer.compressionType should be one of 'none', 'lz4', 'zlib', or 'zstd'. configured value %v. Assiging default value as nocompression", compressionType)
return pulsar.NoCompression, fmt.Errorf("producer.compressionType should be one of 'none', 'lz4', 'zlib', or 'zstd'. configured value %v. Assigning default value as none", compressionType)
}
}

Expand All @@ -152,7 +174,7 @@ func stringToCompressionLevel(compressionLevel string) (pulsar.CompressionLevel,
case "better":
return pulsar.Better, nil
default:
return pulsar.Default, fmt.Errorf("producer.compressionLevel should be one of 'none', 'lz4', 'zlib', or 'zstd'. configured value %v. Assiging default value as default", compressionLevel)
return pulsar.Default, fmt.Errorf("producer.compressionLevel should be one of 'default', 'faster', or 'better'. configured value %v. Assigning default value as default", compressionLevel)
}
}

Expand All @@ -163,6 +185,21 @@ func stringToHashingScheme(hashingScheme string) (pulsar.HashingScheme, error) {
case "murmur3_32hash":
return pulsar.Murmur3_32Hash, nil
default:
return pulsar.JavaStringHash, fmt.Errorf("producer.hashingScheme should be one of 'none', 'lz4', 'zlib', or 'zstd'. configured value %v, Assiging default value as java_string_hash", hashingScheme)
return pulsar.JavaStringHash, fmt.Errorf("producer.hashingScheme should be one of 'java_string_hash' or 'murmur3_32hash'. configured value %v, Assigning default value as java_string_hash", hashingScheme)
}
}

func stringToBatchBuilderType(builderType string) (pulsar.BatcherBuilderType, error) {
switch builderType {
case "0":
return pulsar.DefaultBatchBuilder, nil
case "1":
return pulsar.KeyBasedBatchBuilder, nil
case "default":
return pulsar.DefaultBatchBuilder, nil
case "key_based":
return pulsar.KeyBasedBatchBuilder, nil
default:
return pulsar.DefaultBatchBuilder, fmt.Errorf("producer.batchBuilderType should be one of 'default' or 'key_based'. configured value %v. Assigning default value as default", builderType)
}
}
8 changes: 4 additions & 4 deletions internal/exporter/pulsarexporter/pulsar_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m
if marshaler == nil {
return nil, errUnrecognizedEncoding
}
producer, err := newPulsarProducer(config)
producer, err := newPulsarProducer(config, set.Logger)

if err != nil {
return nil, err
Expand All @@ -55,7 +55,7 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m
}, nil
}

func newPulsarProducer(config Config) (pulsar.Producer, error) {
func newPulsarProducer(config Config, logger *zap.Logger) (pulsar.Producer, error) {
// Get pulsar client options
clientOptions, clientOptionsErr := config.getClientOptions()
if clientOptionsErr != nil {
Expand All @@ -68,8 +68,8 @@ func newPulsarProducer(config Config) (pulsar.Producer, error) {
return nil, clientErr
}

// Get pulsar pruducer options
producerOptions, producerOptionsErr := config.getProducerOptions()
// Get pulsar producer options
producerOptions, producerOptionsErr := config.getProducerOptions(logger)
if producerOptionsErr != nil {
return nil, producerOptionsErr
}
Expand Down
4 changes: 2 additions & 2 deletions internal/exporter/pulsarexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ exporters:
pulsar:
topic: otlp_metrics
broker: pulsar+ssl://localhost:6651
timeout: 0
auth:
tls:
ca_file: "/path/to/cacert"
cert_file: "/path/to/cert"
key_file: "/path/to/key"
insecure_skip_verify: true
producer:
send_timeout: 0
disable_block_if_queue_full: false
max_pending_messages: 100
hashing_scheme: java_string_hash
compression_type: zstd
compression_level: default
batch_builder_type: 1
batch_builder_type: key_based
disable_batching: false
# unit is nanoseconds (10^-9), set to 10 milliseconds in nanoseconds
batching_max_publish_delay: 10000000
Expand Down

0 comments on commit ceaae86

Please sign in to comment.