Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester Consumer #942

Merged
merged 12 commits into from
Jul 26, 2018
Merged

Ingester Consumer #942

merged 12 commits into from
Jul 26, 2018

Conversation

davit-y
Copy link
Contributor

@davit-y davit-y commented Jul 19, 2018

Which problem is this PR solving?

Short description of the changes

  • Export consumer to be initialized and started in a main file
  • Add ConsumerBuilder to pkg/kafka/config
  • Split pkg/kafka/config into Consumer and Producer

@davit-y davit-y mentioned this pull request Jul 19, 2018
9 tasks
@davit-y davit-y changed the title Ingester Consumer [WIP] Ingester Consumer Jul 19, 2018
@davit-y davit-y changed the title [WIP] Ingester Consumer Ingester Consumer Jul 19, 2018
@davit-y davit-y changed the base branch from add-ingester to master July 20, 2018 15:42
@codecov
Copy link

codecov bot commented Jul 20, 2018

Codecov Report

Merging #942 into master will not change coverage.
The diff coverage is 100%.

Impacted file tree graph

@@          Coverage Diff          @@
##           master   #942   +/-   ##
=====================================
  Coverage     100%   100%           
=====================================
  Files         138    138           
  Lines        6343   6362   +19     
=====================================
+ Hits         6343   6362   +19
Impacted Files Coverage Δ
cmd/ingester/app/consumer/consumer_metrics.go 100% <100%> (ø) ⬆️
cmd/ingester/app/consumer/processor_factory.go 100% <100%> (ø) ⬆️
cmd/ingester/app/consumer/consumer.go 100% <100%> (ø) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 8f7e497...10eda27. Read the comment docs.

config.ConsumerBuilder
}

// Consumer uses sarama to consume messages from kafka and handle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this sentence doesn't sound right

type Configuration struct {
Brokers []string
// Consumer is an interface to features of Sarama that are necessary for the consumer
type Consumer interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a big fan of having the interface definition inside the config file. Having said that, I'm not a big fan of it's original location either. Keep this here for now but try to come up with a better location to define this.

@@ -15,22 +15,52 @@
package config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we split this file into consumer.go and producer.go?

}

// ProducerBuilder builds a new kafka producer
type ProducerBuilder interface {
NewProducer() (sarama.AsyncProducer, error)
}

// ConsumerBuilder builds a new kafka producer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/producer/consumer

Brokers []string
Topic string
GroupID string
Consumer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I'm missing something but I don't see need to embed the Consumer interface in this config

)

const (
configPrefix = "ingester-consumer"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this variable? I know all our other configs follow this format but I don't see a reason to do so here. Do you foresee a need for us to consume from different kafka topics? This might be more a question for Prithvi

io.Closer
// New is a constructor for a Consumer
func New(params Params) (*Consumer, error) {
saramaConsumer, err := params.ConsumerBuilder.NewConsumer()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does params.NewConsumer() not work?

@@ -0,0 +1,48 @@
// Copyright (c) 2018 The Jaeger Authors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mb, I meant pkg/kafka/config/consumer.go and pkg/kafka/config/producer.go

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, keep as is, I prefer producer.Configuration over config.ProducerConfiguration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I actually did it the first way, then noticed that it would be inconsistent with ES and Cassandra and changed it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need the path of this file to be pkg/kafka/config/consumer/config.go; having this be pkg/kafka/consumer/config.go conveys the same information

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make the change once the producer PR #957 is merged

@davit-y davit-y mentioned this pull request Jul 24, 2018
@davit-y davit-y force-pushed the ingester-consumer branch 2 times, most recently from f9330f3 to 53b46cf Compare July 25, 2018 05:57

import (
"github.com/Shopify/sarama"
)

// Builder builds a new kafka producer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why move this from 26-30 to here? (I feel that this unnecessarily makes the code review bigger)

producer sarama.AsyncProducer
marshaller Marshaller
producer.Builder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the producer changes required for this commit?
Could they be better factored into a separate commit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won had commented about splitting the config file into consumer and producer packages, would it be better to put everything into config.go in this PR and creating a new PR for the split?

}

// Close closes the Consumer and underlying sarama consumer
func (c *Consumer) Close() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason for moving this?

Copy link
Contributor Author

@davit-y davit-y Jul 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the producer PR you recommended this folder structure

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I mean moving Close from L106-L110 to here.

ProcessorFactory ProcessorFactory
Factory metrics.Factory
Logger *zap.Logger
SaramaConsumer consumer.Consumer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seeing that this consumer is an interface that we control, perhaps internalConsumer is an apt variable name

metricsFactory metrics.Factory
logger *zap.Logger
baseProcessor processor.SpanProcessor
parallelism int
}

func (c *processorFactory) new(partition int32, minOffset int64) processor.SpanProcessor {
// NewFactory constructs a new ProcessorFactory
func NewFactory(params FactoryParams) (ProcessorFactory, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return a pointer to ProcessorFactor instead. I'd rename this to NewProcessorFactory, and do the same for L30

Davit Yeghshatyan added 2 commits July 26, 2018 14:27
Signed-off-by: Davit Yeghshatyan <[email protected]>
Signed-off-by: Davit Yeghshatyan <[email protected]>
Davit Yeghshatyan added 10 commits July 26, 2018 14:27
Signed-off-by: Davit Yeghshatyan <[email protected]>
Signed-off-by: Davit Yeghshatyan <[email protected]>
Signed-off-by: Davit Yeghshatyan <[email protected]>
Signed-off-by: Davit Yeghshatyan <[email protected]>
Removed to reduce the scope of this PR.
Will be added once producer and consumer are merged.

Signed-off-by: Davit Yeghshatyan <[email protected]>
Signed-off-by: Davit Yeghshatyan <[email protected]>
Signed-off-by: Davit Yeghshatyan <[email protected]>
@vprithvi vprithvi merged commit d1c2da6 into jaegertracing:master Jul 26, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants