Skip to content

Commit

Permalink
fix AS queue implementation (#61)
Browse files Browse the repository at this point in the history
Co-authored-by: raphaelrobert <[email protected]>
  • Loading branch information
kkohbrok and raphaelrobert authored Aug 9, 2023
1 parent 4c0db33 commit 250b99c
Showing 1 changed file with 14 additions and 41 deletions.
55 changes: 14 additions & 41 deletions server/src/storage_provider/memory/auth_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,58 +374,31 @@ impl AsStorageProvider for MemoryAsStorage {
.ok_or(ReadAndDeleteError::QueueNotFound)?;

if number_of_messages == 0 {
// Converting usize to u64 should be safe since we don't consider architectures above 64.
// Converting usize to u64 should be safe since we don't consider
// architectures above 64.
return Ok((vec![], queue.queue.len() as u64));
}

// Client claims to have seen messages that are not even in the queue yet.
let queue_sequence_number = queue.sequence_number;
if sequence_number > queue_sequence_number {
tracing::warn!("Sequence number too high: Client requested sequence number {sequence_number}, but we only have {queue_sequence_number}.");
return Err(ReadAndDeleteError::SequenceNumberNotFound);
}

// Let's see what the sequence number at the front of the queue looks
// like.
match queue.queue.front() {
// Queue is empty, but client expects there to still be messages in
// the queue.
// TODO: Should we also just return an empty vector here?
None if sequence_number != queue_sequence_number => {
tracing::warn!("Queue is empty. Client requested sequence number {sequence_number}, but we only have {queue_sequence_number}.");
return Err(ReadAndDeleteError::SequenceNumberNotFound);
}
// No new messages. Queue is empty.
None => return Ok((vec![], 0)),
// Client expects messages that are not in the queue anymore.
// TODO: Should we just round the sequence number up to the nearest
// existing one at this point?
Some(message) if sequence_number < message.sequence_number => {
tracing::warn!("Client requests old messages: Client requested sequence number {sequence_number}, but we only have {queue_sequence_number}.");
return Err(ReadAndDeleteError::SequenceNumberNotFound);
}
// Everything is okay. Let's proceed by deleting and returning messages.
Some(_) => (),
};

let mut return_messages = vec![];
while let Some(first_message) = queue.queue.pop_front() {
match first_message.sequence_number {
// Delete messages until we reached the desired sequence number.
x if x < sequence_number => continue,
if first_message.sequence_number >= sequence_number {
// If we're above the "last seen" sequence number given by the
// client, add the popped message to the messages to be
// returned. Continue this until there are no more messages, or
// until the vector contains as many messages as desired by the
// client.
_ => return_messages.push(first_message),
// returned.
// Messages with a lower sequence number are simply dropped.
return_messages.push(first_message);
}
// Converting usize to u64 should be safe since we don't consider architectures above 64.
if return_messages.len() as u64 == number_of_messages {
// Continue this until there are no more messages, or until the
// vector contains as many messages as desired by the client.
// Converting usize to u64 should be safe since we don't consider
// architectures above 64.
if return_messages.len() as u64 >= number_of_messages {
break;
}
}
// Converting usize to u64 should be safe since we don't consider architectures above 64.

// Converting usize to u64 should be safe since we don't consider
// architectures above 64.
Ok((return_messages, queue.queue.len() as u64))
}

Expand Down

0 comments on commit 250b99c

Please sign in to comment.