Skip to content

Commit

Permalink
Changing Headers definition from byte to int8 for KafkaRecord (#506)
Browse files Browse the repository at this point in the history
  • Loading branch information
milovacb authored Apr 28, 2023
1 parent 14212e8 commit 771b391
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 11 deletions.
45 changes: 37 additions & 8 deletions events/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

package events

import (
"encoding/json"
)

type KafkaEvent struct {
EventSource string `json:"eventSource"`
EventSourceARN string `json:"eventSourceArn"`
Expand All @@ -10,12 +14,37 @@ type KafkaEvent struct {
}

type KafkaRecord struct {
Topic string `json:"topic"`
Partition int64 `json:"partition"`
Offset int64 `json:"offset"`
Timestamp MilliSecondsEpochTime `json:"timestamp"`
TimestampType string `json:"timestampType"`
Key string `json:"key,omitempty"`
Value string `json:"value,omitempty"`
Headers []map[string][]byte `json:"headers"`
Topic string `json:"topic"`
Partition int64 `json:"partition"`
Offset int64 `json:"offset"`
Timestamp MilliSecondsEpochTime `json:"timestamp"`
TimestampType string `json:"timestampType"`
Key string `json:"key,omitempty"`
Value string `json:"value,omitempty"`
Headers []map[string]JSONNumberBytes `json:"headers"`
}

// JSONNumberBytes represents array of bytes in Headers field.
type JSONNumberBytes []byte

// MarshalJSON converts byte array into array of signed integers.
func (b JSONNumberBytes) MarshalJSON() ([]byte, error) {
signedNumbers := make([]int8, len(b))
for i, value := range b {
signedNumbers[i] = int8(value)
}
return json.Marshal(signedNumbers)
}

// UnmarshalJSON converts a given json with potential negative values into byte array.
func (b *JSONNumberBytes) UnmarshalJSON(data []byte) error {
var signedNumbers []int8
if err := json.Unmarshal(data, &signedNumbers); err != nil {
return err
}
*b = make(JSONNumberBytes, len(signedNumbers))
for i, value := range signedNumbers {
(*b)[i] = byte(value)
}
return nil
}
13 changes: 11 additions & 2 deletions events/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ func TestKafkaEventMarshaling(t *testing.T) {
t.Errorf("could not unmarshal event. details: %v", err)
}

// expected values for header
var headerValues [5]byte
headerValues[0] = 118
headerValues[1] = 220 // -36 + 256
headerValues[2] = 0
headerValues[3] = 127
headerValues[4] = 128 // -128 + 256

assert.Equal(t, inputEvent.BootstrapServers, "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092")
assert.Equal(t, inputEvent.EventSource, "aws:kafka")
assert.Equal(t, inputEvent.EventSourceARN, "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4")
Expand All @@ -33,8 +41,9 @@ func TestKafkaEventMarshaling(t *testing.T) {
for _, header := range record.Headers {
for key, value := range header {
assert.Equal(t, key, "headerKey")
headerValue := string(value)
assert.Equal(t, headerValue, "headerValue")
for i, headerValue := range value {
assert.Equal(t, headerValue, headerValues[i])
}
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion events/testdata/kafka-event.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@
"value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj",
"headers": [
{
"headerKey": "aGVhZGVyVmFsdWU="
"headerKey": [
118,
-36,
0,
127,
-128
]
}
]
}
Expand Down

0 comments on commit 771b391

Please sign in to comment.