Skip to content

Commit

Permalink
#2: Implementing delivery receipt storage for in-memory
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Dec 6, 2012
1 parent 7b553c7 commit ad5aa82
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
5 changes: 5 additions & 0 deletions api/src/main/scala/org/elasticmq/Message.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.elasticmq

import org.joda.time.DateTime
import java.util.UUID

trait Message extends MessageOperations {
def content: String
Expand All @@ -25,6 +26,10 @@ sealed case class DeliveryReceipt(receipt: String) {
override def toString = receipt
}

object DeliveryReceipt {
def generate = new DeliveryReceipt(UUID.randomUUID().toString)
}

case class MessageBuilder private (content: String, id: Option[MessageId], nextDelivery: NextDelivery) {
def withId(id: String) = this.copy(id = Some(MessageId(id)))
def withNextDelivery(nextDelivery: NextDelivery) = this.copy(nextDelivery = nextDelivery)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class InMemoryMessagesStorage(queueName: String, statistics: InMemoryMessageStat
None
} else if (messagesById.contains(MessageId(message.id))) {
// Putting the message again into the queue, with a new next delivery
message.deliveryReceipt.set(Some(DeliveryReceipt.generate.receipt))
message.nextDelivery.set(newNextDelivery.millis)
messageQueue.add(message)

Expand Down Expand Up @@ -106,7 +107,7 @@ class InMemoryMessagesStorage(queueName: String, statistics: InMemoryMessageStat
}

case class InMemoryMessage(id: String,
deliveryReceipt: Option[String],
deliveryReceipt: AtomicReference[Option[String]],
nextDelivery: AtomicLong,
content: String,
created: DateTime,
Expand All @@ -115,14 +116,14 @@ case class InMemoryMessage(id: String,

def compareTo(other: InMemoryMessage) = nextDelivery.get().compareTo(other.nextDelivery.get())

def toMessageData = MessageData(MessageId(id), deliveryReceipt.map(DeliveryReceipt(_)), content,
def toMessageData = MessageData(MessageId(id), deliveryReceipt.get().map(DeliveryReceipt(_)), content,
MillisNextDelivery(nextDelivery.get()), created)
}

object InMemoryMessage {
def from(message: MessageData) = InMemoryMessage(
message.id.id,
message.deliveryReceipt.map(_.receipt),
new AtomicReference(message.deliveryReceipt.map(_.receipt)),
new AtomicLong(message.nextDelivery.millis),
message.content,
message.created,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.elasticmq.storage

import org.joda.time.DateTime
import org.elasticmq.{MillisNextDelivery, MessageId, MillisVisibilityTimeout}
import org.elasticmq.data.MessageData

abstract class MessageCommandsTest extends StorageTest {
test("non-existent message should not be found") {
Expand Down Expand Up @@ -77,7 +78,7 @@ abstract class MessageCommandsTest extends StorageTest {
val lookupResult = execute(ReceiveMessageCommand(q1.name, 200L, MillisNextDelivery(234L)))

// Then
lookupResult must be (Some(createMessageData("xyz", "123", MillisNextDelivery(234L))))
withoutDeliveryReceipt(lookupResult) must be (Some(createMessageData("xyz", "123", MillisNextDelivery(234L))))
}

test("next delivery should be updated after receiving") {
Expand All @@ -93,7 +94,7 @@ abstract class MessageCommandsTest extends StorageTest {
val lookupResult = execute(LookupMessageCommand(q1.name, MessageId("xyz")))

// Then
lookupResult must be (Some(createMessageData("xyz", "123", MillisNextDelivery(567L))))
withoutDeliveryReceipt(lookupResult) must be (Some(createMessageData("xyz", "123", MillisNextDelivery(567L))))
}

test("receipt handle should be filled when receiving") {
Expand Down Expand Up @@ -205,4 +206,8 @@ abstract class MessageCommandsTest extends StorageTest {
// Then
execute(LookupMessageCommand(q1.name, MessageId("xyz"))) must be (None)
}

def withoutDeliveryReceipt(messageOpt: Option[MessageData]) = {
messageOpt.map(_.copy(deliveryReceipt = None))
}
}

0 comments on commit ad5aa82

Please sign in to comment.