Skip to content

Commit

Permalink
feat(test): added test to show expected behaviour (#289)
Browse files Browse the repository at this point in the history
This test shows that if there are not active consumers "ka-consumer" is
not generated and we get merely "ka-consumer-group" and
"ka-consumer-group-topic"
  • Loading branch information
paologallinaharbur authored Apr 2, 2024
1 parent 1efe6a9 commit decc406
Showing 1 changed file with 88 additions and 11 deletions.
99 changes: 88 additions & 11 deletions src/consumeroffset/kafka_offset_collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (cm *ConsumerGroupTopicListerMock) ListTopics() (map[string]sarama.TopicDet
}, nil
}

func (cm *ConsumerGroupTopicListerMock) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*sarama.OffsetFetchResponse, error) {
func (cm *ConsumerGroupTopicListerMock) ListConsumerGroupOffsets(_ string, topicPartitions map[string][]int32) (*sarama.OffsetFetchResponse, error) {
offsetFetchResponse := &sarama.OffsetFetchResponse{
Blocks: map[string]map[int32]*sarama.OffsetFetchResponseBlock{},
}
Expand Down Expand Up @@ -87,7 +87,6 @@ func TestCollectOffsetsForConsumerGroup(t *testing.T) { // nolint: funlen
name string
inactiveConsumerGroupOffset bool
consumerGroupOffsetByTopic bool
consumerGroup string
cGroupEntities map[string]map[string]float64
topicEntities map[string]map[string]float64
numEntities int
Expand All @@ -96,7 +95,6 @@ func TestCollectOffsetsForConsumerGroup(t *testing.T) { // nolint: funlen
name: "Only active consumers",
inactiveConsumerGroupOffset: false,
consumerGroupOffsetByTopic: false,
consumerGroup: consumerGroupOne,
cGroupEntities: map[string]map[string]float64{
consumerGroupOne: {
// comes from: 25 - 10 + 30 - 10
Expand All @@ -113,7 +111,6 @@ func TestCollectOffsetsForConsumerGroup(t *testing.T) { // nolint: funlen
name: "Only active consumers with topic Aggregation",
inactiveConsumerGroupOffset: false,
consumerGroupOffsetByTopic: true,
consumerGroup: consumerGroupOne,
cGroupEntities: map[string]map[string]float64{
consumerGroupOne: {
// comes from: 25 - 10 + 30 - 10
Expand All @@ -138,7 +135,6 @@ func TestCollectOffsetsForConsumerGroup(t *testing.T) { // nolint: funlen
name: "With inactive consumers",
inactiveConsumerGroupOffset: true,
consumerGroupOffsetByTopic: false,
consumerGroup: consumerGroupOne,
cGroupEntities: map[string]map[string]float64{
consumerGroupOne: {
// comes from: (25 - 10 + 30 - 10) + (25 - 10 + 30 - 10)
Expand All @@ -155,7 +151,6 @@ func TestCollectOffsetsForConsumerGroup(t *testing.T) { // nolint: funlen
name: "With inactive consumers and topic aggregation",
inactiveConsumerGroupOffset: true,
consumerGroupOffsetByTopic: true,
consumerGroup: consumerGroupOne,
cGroupEntities: map[string]map[string]float64{
consumerGroupOne: {
// comes from: (25 - 10 + 30 - 10) + (25 - 10 + 30 - 10)
Expand Down Expand Up @@ -194,7 +189,7 @@ func TestCollectOffsetsForConsumerGroup(t *testing.T) { // nolint: funlen

collectOffsetsForConsumerGroup(
&ConsumerGroupTopicListerMock{},
tc.consumerGroup,
consumerGroupOne,
members,
kafkaIntegration,
&TopicOffsetGetterMock{},
Expand All @@ -204,10 +199,10 @@ func TestCollectOffsetsForConsumerGroup(t *testing.T) { // nolint: funlen
for _, entity := range kafkaIntegration.Entities {
switch entity.Metadata.Namespace {
case nrConsumerGroupEntity:
if entity.Metrics[0].Metrics["consumerGroup"] == tc.consumerGroup {
assert.Equal(t, tc.cGroupEntities[tc.consumerGroup]["totalLag"], entity.Metrics[0].Metrics["consumerGroup.totalLag"])
assert.Equal(t, tc.cGroupEntities[tc.consumerGroup]["maxLag"], entity.Metrics[0].Metrics["consumerGroup.maxLag"])
assert.Equal(t, tc.cGroupEntities[tc.consumerGroup]["activeConsumers"], entity.Metrics[0].Metrics["consumerGroup.activeConsumers"])
if entity.Metrics[0].Metrics["consumerGroup"] == consumerGroupOne {
assert.Equal(t, tc.cGroupEntities[consumerGroupOne]["totalLag"], entity.Metrics[0].Metrics["consumerGroup.totalLag"])
assert.Equal(t, tc.cGroupEntities[consumerGroupOne]["maxLag"], entity.Metrics[0].Metrics["consumerGroup.maxLag"])
assert.Equal(t, tc.cGroupEntities[consumerGroupOne]["activeConsumers"], entity.Metrics[0].Metrics["consumerGroup.activeConsumers"])
}
case nrConsumerGroupTopicEntity:
topicName := fmt.Sprintf("%v", entity.Metrics[0].Metrics["topic"])
Expand All @@ -220,13 +215,95 @@ func TestCollectOffsetsForConsumerGroup(t *testing.T) { // nolint: funlen
// this entity only for topicOne that has member clientID
assert.Equal(t, testClientID, entity.Metrics[0].Metrics["clientID"])
assert.Equal(t, topicOne, entity.Metrics[0].Metrics["topic"])
default:
assert.Fail(t, "not expected")
}
assert.NotEmpty(t, entity)
}
})
}
}

func TestNoActiveConsumersForConsumerGroup(t *testing.T) { // nolint: funlen
// MemberAssignment mock created as in sarama's consumer_group_member_test.go
members := map[string]*sarama.GroupMemberDescription{
testClientID: {
ClientId: testClientID,
ClientHost: "a-host",
MemberMetadata: nil,
// In the schema below there are no consumer assigned to any of the topics.
MemberAssignment: []byte{
0, 0, // Version
0, 0, 0, 0, // Topic array length
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
},
},
}

args.GlobalArgs = &args.ParsedArguments{}

expectedTopicEntities := map[string]map[string]float64{
topicOne: {
// comes from: 25 - 10 + 30 - 10
"totalLag": 35,
// comes from max: 30 - 10
"maxLag": 20,
"activeConsumers": 0,
},
topicTwo: {
// comes from: 25 - 10 + 30 - 10
"totalLag": 35,
// comes from max: 30 - 10
"maxLag": 20,
"activeConsumers": 0,
},
}

expectedCGroupEntities := map[string]map[string]float64{
consumerGroupOne: {
// comes from: (25 - 10 + 30 - 10) + (25 - 10 + 30 - 10)
"totalLag": 70,
// comes from max: 30 - 10
"maxLag": 20,
"activeConsumers": 0,
},
}
expectedNumEntities := 3

args.GlobalArgs.InactiveConsumerGroupOffset = true
args.GlobalArgs.ConsumerGroupOffsetByTopic = true

kafkaIntegration, _ := integration.New("test", "test")

collectOffsetsForConsumerGroup(
&ConsumerGroupTopicListerMock{},
consumerGroupOne,
members,
kafkaIntegration,
&TopicOffsetGetterMock{},
)

assert.Equal(t, expectedNumEntities, len(kafkaIntegration.Entities))
for _, entity := range kafkaIntegration.Entities {
switch entity.Metadata.Namespace {
case nrConsumerGroupEntity:
if entity.Metrics[0].Metrics["consumerGroup"] == consumerGroupOne {
assert.Equal(t, expectedCGroupEntities[consumerGroupOne]["totalLag"], entity.Metrics[0].Metrics["consumerGroup.totalLag"])
assert.Equal(t, expectedCGroupEntities[consumerGroupOne]["maxLag"], entity.Metrics[0].Metrics["consumerGroup.maxLag"])
assert.Equal(t, expectedCGroupEntities[consumerGroupOne]["activeConsumers"], entity.Metrics[0].Metrics["consumerGroup.activeConsumers"])
}
case nrConsumerGroupTopicEntity:
topicName := fmt.Sprintf("%v", entity.Metrics[0].Metrics["topic"])
assert.Equal(t, expectedTopicEntities[topicName]["totalLag"], entity.Metrics[0].Metrics["consumerGroup.totalLag"])
assert.Equal(t, expectedTopicEntities[topicName]["maxLag"], entity.Metrics[0].Metrics["consumerGroup.maxLag"])
assert.Equal(t, expectedTopicEntities[topicName]["activeConsumers"], entity.Metrics[0].Metrics["consumerGroup.activeConsumers"])
default:
// We do not expect any "ka-consumer" or "ka-partition-consumer" since no consumers are active.
assert.Fail(t, "not expected")
}
}
}

func TestCollectOffsetsForConsumerGroup_Error(t *testing.T) { // nolint: funlen
// MemberAssignment mock created as in sarama's consumer_group_member_test.go
members := map[string]*sarama.GroupMemberDescription{
Expand Down

0 comments on commit decc406

Please sign in to comment.