Skip to content

Commit

Permalink
feat: stream method--XRange(#141 ID 6)
Browse files Browse the repository at this point in the history
  • Loading branch information
qishenonly committed Jul 16, 2023
1 parent 5de7d34 commit e36d3e0
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 0 deletions.
34 changes: 34 additions & 0 deletions structure/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,40 @@ func (s *StreamStructure) XLen(name string) (int, error) {
return len(messages), nil
}

// XRange returns the messages in the stream
// with the []StreamMessage as the return value
func (s *StreamStructure) XRange(name string, start, stop int) ([]StreamMessage, error) {
// Get the stream
encodedStreams, err := s.db.Get([]byte(name))
if err != nil {
return nil, err
}

// Decode the streams
if err = s.decodeStreams(encodedStreams, s.streams); err != nil {
return nil, err
}

// Get the messages
messages := s.streams.Messages

// Create a new slice of StreamMessage
var result []StreamMessage

// Get the messages
if len(messages) >= stop {
messages = messages[start:stop]
// Convert []*StreamMessage to []StreamMessage
for _, msg := range messages {
result = append(result, *msg)
}
} else {
return nil, ErrAmountOfData
}

return result, nil
}

func (s *StreamStructure) encodeStreams(ss *Streams) ([]byte, error) {
// Encode the streams
data, err := json.Marshal(ss)
Expand Down
30 changes: 30 additions & 0 deletions structure/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,33 @@ func TestStreamStructure_XLen(t *testing.T) {
assert.Equal(t, 1, length)

}

func TestStreamStructure_XRange(t *testing.T) {
stc, _ := initStreamDB()
defer stc.db.Clean()

ok1, err := stc.XAdd("test", "1", map[string]interface{}{"name1": "flydb1"})
assert.Nil(t, err)
assert.True(t, ok1)

ok2, err := stc.XAdd("test", "2", map[string]interface{}{"name2": "flydb2"})
assert.Nil(t, err)
assert.True(t, ok2)

ok3, err := stc.XAdd("test", "3", map[string]interface{}{"name3": "flydb3"})
assert.Nil(t, err)
assert.True(t, ok3)

ok4, err := stc.XAdd("test1", "1", map[string]interface{}{"name11": "flydb11"})
assert.Nil(t, err)
assert.True(t, ok4)

items, err := stc.XRange("test", 1, 3)
assert.Nil(t, err)
assert.Equal(t, 2, len(items))

items, err = stc.XRange("test1", 1, 3)
assert.NotNil(t, err)
assert.Equal(t, 0, len(items))

}

0 comments on commit e36d3e0

Please sign in to comment.