Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer groups on Kafka 0.9 #588

Merged
merged 4 commits into from
Jan 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,50 @@ func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse,
return response, nil
}

func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
response := new(JoinGroupResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
response := new(SyncGroupResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
response := new(LeaveGroupResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
response := new(HeartbeatResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, error) {
b.lock.Lock()
defer b.lock.Unlock()
Expand Down
48 changes: 48 additions & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,52 @@ var brokerTestTable = []struct {
t.Error("Offset request got no response!")
}
}},

{[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := JoinGroupRequest{}
response, err := broker.JoinGroup(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("JoinGroup request got no response!")
}
}},

{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := SyncGroupRequest{}
response, err := broker.SyncGroup(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("SyncGroup request got no response!")
}
}},

{[]byte{0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := LeaveGroupRequest{}
response, err := broker.LeaveGroup(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("LeaveGroup request got no response!")
}
}},

{[]byte{0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := HeartbeatRequest{}
response, err := broker.Heartbeat(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("Heartbeat request got no response!")
}
}},
}
94 changes: 94 additions & 0 deletions consumer_group_members.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package sarama

type ConsumerGroupMemberMetadata struct {
Version int16
Topics []string
UserData []byte
}

func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error {
pe.putInt16(m.Version)

if err := pe.putStringArray(m.Topics); err != nil {
return err
}

if err := pe.putBytes(m.UserData); err != nil {
return err
}

return nil
}

func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) {
if m.Version, err = pd.getInt16(); err != nil {
return
}

if m.Topics, err = pd.getStringArray(); err != nil {
return
}

if m.UserData, err = pd.getBytes(); err != nil {
return
}

return nil
}

type ConsumerGroupMemberAssignment struct {
Version int16
Topics map[string][]int32
UserData []byte
}

func (m *ConsumerGroupMemberAssignment) encode(pe packetEncoder) error {
pe.putInt16(m.Version)

if err := pe.putArrayLength(len(m.Topics)); err != nil {
return err
}

for topic, partitions := range m.Topics {
if err := pe.putString(topic); err != nil {
return err
}
if err := pe.putInt32Array(partitions); err != nil {
return err
}
}

if err := pe.putBytes(m.UserData); err != nil {
return err
}

return nil
}

func (m *ConsumerGroupMemberAssignment) decode(pd packetDecoder) (err error) {
if m.Version, err = pd.getInt16(); err != nil {
return
}

var topicLen int
if topicLen, err = pd.getArrayLength(); err != nil {
return
}

m.Topics = make(map[string][]int32, topicLen)
for i := 0; i < topicLen; i++ {
var topic string
if topic, err = pd.getString(); err != nil {
return
}
if m.Topics[topic], err = pd.getInt32Array(); err != nil {
return
}
}

if m.UserData, err = pd.getBytes(); err != nil {
return
}

return nil
}
77 changes: 77 additions & 0 deletions consumer_group_members_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package sarama

import (
"bytes"
"reflect"
"testing"
)

var (
groupMemberMetadata = []byte{
0, 1, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}
groupMemberAssignment = []byte{
0, 1, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 0, 0, 3, // Topic one, partition array length
0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 4, // 0, 2, 4
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 2, // Topic two, partition array length
0, 0, 0, 1, 0, 0, 0, 3, // 1, 3
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}
)

func TestConsumerGroupMemberMetadata(t *testing.T) {
meta := &ConsumerGroupMemberMetadata{
Version: 1,
Topics: []string{"one", "two"},
UserData: []byte{0x01, 0x02, 0x03},
}

buf, err := encode(meta)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberMetadata, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberMetadata, buf)
}

meta2 := new(ConsumerGroupMemberMetadata)
err = decode(buf, meta2)
if err != nil {
t.Error("Failed to decode data", err)
} else if !reflect.DeepEqual(meta, meta2) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", meta, meta2)
}
}

func TestConsumerGroupMemberAssignment(t *testing.T) {
amt := &ConsumerGroupMemberAssignment{
Version: 1,
Topics: map[string][]int32{
"one": []int32{0, 2, 4},
"two": []int32{1, 3},
},
UserData: []byte{0x01, 0x02, 0x03},
}

buf, err := encode(amt)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberAssignment, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberAssignment, buf)
}

amt2 := new(ConsumerGroupMemberAssignment)
err = decode(buf, amt2)
if err != nil {
t.Error("Failed to decode data", err)
} else if !reflect.DeepEqual(amt, amt2) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", amt, amt2)
}
}
10 changes: 10 additions & 0 deletions join_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,13 @@ func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {

r.GroupProtocols[name] = metadata
}

func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error {
bin, err := encode(metadata)
if err != nil {
return err
}

r.AddGroupProtocol(name, bin)
return nil
}
12 changes: 12 additions & 0 deletions join_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ type JoinGroupResponse struct {
Members map[string][]byte
}

func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error) {
members := make(map[string]ConsumerGroupMemberMetadata, len(r.Members))
for id, bin := range r.Members {
meta := new(ConsumerGroupMemberMetadata)
if err := decode(bin, meta); err != nil {
return nil, err
}
members[id] = *meta
}
return members, nil
}

func (r *JoinGroupResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(r.Err))
pe.putInt32(r.GenerationId)
Expand Down
10 changes: 10 additions & 0 deletions sync_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,13 @@ func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment

r.GroupAssignments[memberId] = memberAssignment
}

func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error {
bin, err := encode(memberAssignment)
if err != nil {
return err
}

r.AddGroupAssignment(memberId, bin)
return nil
}
6 changes: 6 additions & 0 deletions sync_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ type SyncGroupResponse struct {
MemberAssignment []byte
}

func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) {
assignment := new(ConsumerGroupMemberAssignment)
err := decode(r.MemberAssignment, assignment)
return assignment, err
}

func (r *SyncGroupResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(r.Err))
return pe.putBytes(r.MemberAssignment)
Expand Down