Skip to content

Commit

Permalink
add integration test for start_message_move_task (#1046)
Browse files Browse the repository at this point in the history
  • Loading branch information
micossow authored Sep 9, 2024
1 parent 1a0e3de commit 9b92e00
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 8 deletions.
10 changes: 10 additions & 0 deletions integration-tests/conf/queue-storage.conf
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ queues {
}
}

queueName2 {
defaultVisibilityTimeout = 1 second
delay = 0 seconds
receiveMessageWait = 0 seconds
deadLettersQueue {
name = "myDLQ"
maxReceiveCount = 3
}
}

myDLQ { }

fifoQueue {
Expand Down
51 changes: 43 additions & 8 deletions integration-tests/python/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ def stop(self):
def get_elasticmq_port(self):
return self.get_exposed_port(9321)

def create_sqs_client(self):
def create_sqs_resource(self):
port = self.get_elasticmq_port()
return boto3.resource('sqs', region_name='radom', endpoint_url=f'http://localhost:{port}', aws_access_key_id='test', aws_secret_access_key='test')
return boto3.resource('sqs', region_name='random', endpoint_url=f'http://localhost:{port}', aws_access_key_id='test', aws_secret_access_key='test')

def create_sqs_client(self):
port = self.get_elasticmq_port()
return boto3.client('sqs', region_name='random', endpoint_url=f'http://localhost:{port}', aws_access_key_id='test', aws_secret_access_key='test')

@pytest.fixture(scope="function")
def message_storage_container():
Expand All @@ -67,7 +70,7 @@ def queue_storage_container():
container.stop()

def test_messages_storage(message_storage_container):
sqs = message_storage_container.create_sqs_client()
sqs = message_storage_container.create_sqs_resource()
queue = sqs.create_queue(QueueName='simpleQueue', Attributes={'VisibilityTimeout': '1'})
assert queue is not None
queue.send_message(MessageBody='Hello 1')
Expand All @@ -85,7 +88,7 @@ def test_messages_storage(message_storage_container):

message_storage_container.start()

sqs = message_storage_container.create_sqs_client()
sqs = message_storage_container.create_sqs_resource()
queue = sqs.get_queue_by_name(QueueName='simpleQueue')
assert queue is not None
queue.send_message(MessageBody='Hello 4')
Expand All @@ -98,7 +101,7 @@ def test_messages_storage(message_storage_container):
assert not os.path.exists(os.path.join(os.getcwd(), ".data", "queues.conf"))

def test_queue_storage(queue_storage_container):
sqs = queue_storage_container.create_sqs_client()
sqs = queue_storage_container.create_sqs_resource()
queue = sqs.create_queue(QueueName='simpleQueue', Attributes={'VisibilityTimeout': '1'})
assert queue is not None
queue.send_message(MessageBody='Hello 1')
Expand All @@ -117,7 +120,7 @@ def test_queue_storage(queue_storage_container):

queue_storage_container.start()

sqs = queue_storage_container.create_sqs_client()
sqs = queue_storage_container.create_sqs_resource()
queue = sqs.get_queue_by_name(QueueName='simpleQueue')
assert queue is not None
queue.send_message(MessageBody='Hello 4')
Expand All @@ -130,8 +133,40 @@ def test_queue_storage(queue_storage_container):
assert os.path.exists(os.path.join(os.getcwd(), ".data", "queues.conf"))

def test_list_dead_letter_source_queues(queue_storage_container):
sqs = queue_storage_container.create_sqs_client()
sqs = queue_storage_container.create_sqs_resource()
queue = sqs.get_queue_by_name(QueueName='myDLQ')
queues = list(queue.dead_letter_source_queues.all())
print(queues)
assert len(queues) == 2
assert len(queues) == 3

def test_message_move_task(queue_storage_container):
sqs = queue_storage_container.create_sqs_resource()
queue = sqs.get_queue_by_name(QueueName='queueName2')
dlq = sqs.get_queue_by_name(QueueName='myDLQ')

# populate the queue with 3 messages
queue.send_message(MessageBody='Hello 1')
queue.send_message(MessageBody='Hello 2')
queue.send_message(MessageBody='Hello 3')

# receive from queue maxReceiveCount + 1 to make it move to DLQ
messages1 = queue.receive_messages(MaxNumberOfMessages=10)
assert len(messages1) == 3
time.sleep(1.5)
messages2 = queue.receive_messages(MaxNumberOfMessages=10)
assert len(messages2) == 3
time.sleep(1.5)
messages3 = queue.receive_messages(MaxNumberOfMessages=10)
assert len(messages3) == 3
time.sleep(1.5)
messages4 = queue.receive_messages(MaxNumberOfMessages=10)
assert len(messages4) == 0

# start the message move task
client = queue_storage_container.create_sqs_client()
client.start_message_move_task(SourceArn=dlq.attributes['QueueArn'])
time.sleep(1)

# receive again
messages5 = queue.receive_messages(MaxNumberOfMessages=10)
assert len(messages5) == 3
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,16 @@
"allDeclaredFields":true,
"queryAllPublicMethods":true
},
{
"name":"org.elasticmq.rest.sqs.StartMessageMoveTaskActionRequest",
"allDeclaredFields":true,
"queryAllPublicMethods":true
},
{
"name":"org.elasticmq.rest.sqs.StartMessageMoveTaskResponse",
"allDeclaredFields":true,
"queryAllPublicMethods":true
},
{
"name":"org.elasticmq.rest.sqs.TagQueueActionRequest",
"allDeclaredFields":true,
Expand Down

0 comments on commit 9b92e00

Please sign in to comment.