From 08c2456a6e03f0572a657a339efc333e388084b9 Mon Sep 17 00:00:00 2001 From: Harmen Date: Sun, 6 Nov 2022 08:59:27 +0100 Subject: [PATCH] fix a race in the stream code --- stream.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/stream.go b/stream.go index 2a1fe537..75848d02 100644 --- a/stream.go +++ b/stream.go @@ -64,7 +64,7 @@ func (s *streamKey) generateID(now time.Time) string { next = fmt.Sprintf("%d-%d", last[0], last[1]+1) } - lastID := s.lastID() + lastID := s.lastIDUnlocked() if streamCmp(lastID, next) >= 0 { last, _ := parseStreamID(lastID) next = fmt.Sprintf("%d-%d", last[0], last[1]+1) @@ -74,8 +74,16 @@ func (s *streamKey) generateID(now time.Time) string { return next } -// lastID doesn't lock the mutex +// lastID locks the mutex func (s *streamKey) lastID() string { + s.mu.Lock() + defer s.mu.Unlock() + + return s.lastIDUnlocked() +} + +// lastID doesn't lock the mutex +func (s *streamKey) lastIDUnlocked() string { if len(s.entries) == 0 { return "0-0" } @@ -209,7 +217,7 @@ func (s *streamKey) createGroup(group, id string) error { } if id == "$" { - id = s.lastID() + id = s.lastIDUnlocked() } s.groups[group] = &streamGroup{ stream: s, @@ -237,7 +245,7 @@ func (s *streamKey) add(entryID string, values []string, now time.Time) (string, if entryID == "0-0" { return "", errors.New(msgStreamIDZero) } - if streamCmp(s.lastID(), entryID) != -1 { + if streamCmp(s.lastIDUnlocked(), entryID) != -1 { return "", errors.New(msgStreamIDTooSmall) }