Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(test): added test to show expected behaviour #289

Merged
merged 3 commits into from
Apr 2, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 86 additions & 11 deletions src/consumeroffset/kafka_offset_collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (cm *ConsumerGroupTopicListerMock) ListTopics() (map[string]sarama.TopicDet
}, nil
}

func (cm *ConsumerGroupTopicListerMock) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*sarama.OffsetFetchResponse, error) {
func (cm *ConsumerGroupTopicListerMock) ListConsumerGroupOffsets(_ string, topicPartitions map[string][]int32) (*sarama.OffsetFetchResponse, error) {
offsetFetchResponse := &sarama.OffsetFetchResponse{
Blocks: map[string]map[int32]*sarama.OffsetFetchResponseBlock{},
}
Expand Down Expand Up @@ -87,7 +87,6 @@ func TestCollectOffsetsForConsumerGroup(t *testing.T) { // nolint: funlen
name string
inactiveConsumerGroupOffset bool
consumerGroupOffsetByTopic bool
consumerGroup string
cGroupEntities map[string]map[string]float64
topicEntities map[string]map[string]float64
numEntities int
Expand All @@ -96,7 +95,6 @@ func TestCollectOffsetsForConsumerGroup(t *testing.T) { // nolint: funlen
name: "Only active consumers",
inactiveConsumerGroupOffset: false,
consumerGroupOffsetByTopic: false,
consumerGroup: consumerGroupOne,
cGroupEntities: map[string]map[string]float64{
consumerGroupOne: {
// comes from: 25 - 10 + 30 - 10
Expand All @@ -113,7 +111,6 @@ func TestCollectOffsetsForConsumerGroup(t *testing.T) { // nolint: funlen
name: "Only active consumers with topic Aggregation",
inactiveConsumerGroupOffset: false,
consumerGroupOffsetByTopic: true,
consumerGroup: consumerGroupOne,
cGroupEntities: map[string]map[string]float64{
consumerGroupOne: {
// comes from: 25 - 10 + 30 - 10
Expand All @@ -138,7 +135,6 @@ func TestCollectOffsetsForConsumerGroup(t *testing.T) { // nolint: funlen
name: "With inactive consumers",
inactiveConsumerGroupOffset: true,
consumerGroupOffsetByTopic: false,
consumerGroup: consumerGroupOne,
cGroupEntities: map[string]map[string]float64{
consumerGroupOne: {
// comes from: (25 - 10 + 30 - 10) + (25 - 10 + 30 - 10)
Expand All @@ -155,7 +151,6 @@ func TestCollectOffsetsForConsumerGroup(t *testing.T) { // nolint: funlen
name: "With inactive consumers and topic aggregation",
inactiveConsumerGroupOffset: true,
consumerGroupOffsetByTopic: true,
consumerGroup: consumerGroupOne,
cGroupEntities: map[string]map[string]float64{
consumerGroupOne: {
// comes from: (25 - 10 + 30 - 10) + (25 - 10 + 30 - 10)
Expand Down Expand Up @@ -194,7 +189,7 @@ func TestCollectOffsetsForConsumerGroup(t *testing.T) { // nolint: funlen

collectOffsetsForConsumerGroup(
&ConsumerGroupTopicListerMock{},
tc.consumerGroup,
consumerGroupOne,
members,
kafkaIntegration,
&TopicOffsetGetterMock{},
Expand All @@ -204,10 +199,10 @@ func TestCollectOffsetsForConsumerGroup(t *testing.T) { // nolint: funlen
for _, entity := range kafkaIntegration.Entities {
switch entity.Metadata.Namespace {
case nrConsumerGroupEntity:
if entity.Metrics[0].Metrics["consumerGroup"] == tc.consumerGroup {
assert.Equal(t, tc.cGroupEntities[tc.consumerGroup]["totalLag"], entity.Metrics[0].Metrics["consumerGroup.totalLag"])
assert.Equal(t, tc.cGroupEntities[tc.consumerGroup]["maxLag"], entity.Metrics[0].Metrics["consumerGroup.maxLag"])
assert.Equal(t, tc.cGroupEntities[tc.consumerGroup]["activeConsumers"], entity.Metrics[0].Metrics["consumerGroup.activeConsumers"])
if entity.Metrics[0].Metrics["consumerGroup"] == consumerGroupOne {
assert.Equal(t, tc.cGroupEntities[consumerGroupOne]["totalLag"], entity.Metrics[0].Metrics["consumerGroup.totalLag"])
assert.Equal(t, tc.cGroupEntities[consumerGroupOne]["maxLag"], entity.Metrics[0].Metrics["consumerGroup.maxLag"])
assert.Equal(t, tc.cGroupEntities[consumerGroupOne]["activeConsumers"], entity.Metrics[0].Metrics["consumerGroup.activeConsumers"])
}
case nrConsumerGroupTopicEntity:
topicName := fmt.Sprintf("%v", entity.Metrics[0].Metrics["topic"])
Expand All @@ -220,13 +215,93 @@ func TestCollectOffsetsForConsumerGroup(t *testing.T) { // nolint: funlen
// this entity only for topicOne that has member clientID
assert.Equal(t, testClientID, entity.Metrics[0].Metrics["clientID"])
assert.Equal(t, topicOne, entity.Metrics[0].Metrics["topic"])
default:
assert.Fail(t, "not expected")
}
assert.NotEmpty(t, entity)
}
})
}
}

func TestNoActiveConsumersForConsumerGroup(t *testing.T) { // nolint: funlen
// MemberAssignment mock created as in sarama's consumer_group_member_test.go
members := map[string]*sarama.GroupMemberDescription{
testClientID: {
ClientId: testClientID,
ClientHost: "a-host",
MemberMetadata: nil,
MemberAssignment: []byte{
0, 0, // Version
0, 0, 0, 0, // Topic array length
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
},
},
}
paologallinaharbur marked this conversation as resolved.
Show resolved Hide resolved

args.GlobalArgs = &args.ParsedArguments{}

expectedTopicEntities := map[string]map[string]float64{
topicOne: {
// comes from: 25 - 10 + 30 - 10
"totalLag": 35,
// comes from max: 30 - 10
"maxLag": 20,
"activeConsumers": 0,
},
topicTwo: {
// comes from: 25 - 10 + 30 - 10
"totalLag": 35,
// comes from max: 30 - 10
"maxLag": 20,
"activeConsumers": 0,
},
}

expectedCGroupEntities := map[string]map[string]float64{
consumerGroupOne: {
// comes from: (25 - 10 + 30 - 10) + (25 - 10 + 30 - 10)
"totalLag": 70,
// comes from max: 30 - 10
"maxLag": 20,
"activeConsumers": 0,
},
}
expectedNumEntities := 3

args.GlobalArgs.InactiveConsumerGroupOffset = true
args.GlobalArgs.ConsumerGroupOffsetByTopic = true

kafkaIntegration, _ := integration.New("test", "test")

collectOffsetsForConsumerGroup(
&ConsumerGroupTopicListerMock{},
consumerGroupOne,
members,
kafkaIntegration,
&TopicOffsetGetterMock{},
)

assert.Equal(t, expectedNumEntities, len(kafkaIntegration.Entities))
for _, entity := range kafkaIntegration.Entities {
switch entity.Metadata.Namespace {
case nrConsumerGroupEntity:
if entity.Metrics[0].Metrics["consumerGroup"] == consumerGroupOne {
assert.Equal(t, expectedCGroupEntities[consumerGroupOne]["totalLag"], entity.Metrics[0].Metrics["consumerGroup.totalLag"])
assert.Equal(t, expectedCGroupEntities[consumerGroupOne]["maxLag"], entity.Metrics[0].Metrics["consumerGroup.maxLag"])
assert.Equal(t, expectedCGroupEntities[consumerGroupOne]["activeConsumers"], entity.Metrics[0].Metrics["consumerGroup.activeConsumers"])
}
case nrConsumerGroupTopicEntity:
topicName := fmt.Sprintf("%v", entity.Metrics[0].Metrics["topic"])
assert.Equal(t, expectedTopicEntities[topicName]["totalLag"], entity.Metrics[0].Metrics["consumerGroup.totalLag"])
assert.Equal(t, expectedTopicEntities[topicName]["maxLag"], entity.Metrics[0].Metrics["consumerGroup.maxLag"])
assert.Equal(t, expectedTopicEntities[topicName]["activeConsumers"], entity.Metrics[0].Metrics["consumerGroup.activeConsumers"])
default:
assert.Fail(t, "not expected")
}
}
}
paologallinaharbur marked this conversation as resolved.
Show resolved Hide resolved

func TestCollectOffsetsForConsumerGroup_Error(t *testing.T) { // nolint: funlen
// MemberAssignment mock created as in sarama's consumer_group_member_test.go
members := map[string]*sarama.GroupMemberDescription{
Expand Down
Loading