Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3: Adding tests of EventBridge notification ObjectCreated:Copy #7407

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions moto/s3/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2048,7 +2048,9 @@ def put_object(
lock_legal_status: Optional[str] = None,
lock_until: Optional[str] = None,
checksum_value: Optional[str] = None,
# arguments to handle notification
request_method: Optional[str] = "PUT",
disable_notification: Optional[bool] = False,
) -> FakeKey:
if storage is not None and storage not in STORAGE_CLASS:
raise InvalidStorageClass(storage=storage)
Expand Down Expand Up @@ -2097,21 +2099,22 @@ def put_object(
keys = [new_key]
bucket.keys.setlist(key_name, keys)

# Send event notification
if request_method == "POST":
notify_event_name = (
notifications.S3NotificationEvent.OBJECT_CREATED_POST_EVENT
)
else: # PUT request
notify_event_name = (
notifications.S3NotificationEvent.OBJECT_CREATED_PUT_EVENT
if not disable_notification:
# Send event notification
if request_method == "POST":
notify_event_name = (
notifications.S3NotificationEvent.OBJECT_CREATED_POST_EVENT
)
else: # PUT request
notify_event_name = (
notifications.S3NotificationEvent.OBJECT_CREATED_PUT_EVENT
)
notifications.send_event(
self.account_id,
notify_event_name,
bucket,
new_key,
)
notifications.send_event(
self.account_id,
notify_event_name,
bucket,
new_key,
)

return new_key

Expand Down Expand Up @@ -2706,6 +2709,7 @@ def copy_object(
lock_mode=lock_mode,
lock_legal_status=lock_legal_status,
lock_until=lock_until,
disable_notification=True, # avoid sending PutObject events here
)
self.tagger.copy_tags(src_key.arn, new_key.arn)
if mdirective != "REPLACE":
Expand Down
118 changes: 67 additions & 51 deletions tests/test_s3/test_s3_eventbridge_integration.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
from io import BytesIO
from typing import Any, Dict, List
from unittest import SkipTest
from uuid import uuid4

Expand All @@ -14,18 +15,31 @@
REDUCED_PART_SIZE = 256


@mock_aws
def test_put_object_notification_ObjectCreated_PUT():
def _seteup_bucket_notification_eventbridge(
bucket_name: str = str(uuid4()),
rule_name: str = "test-rule",
log_group_name: str = "/test-group",
) -> Dict[str, str]:
"""Setups S3, EventBridge and CloudWatchLogs"""
# Setup S3
s3_res = boto3.resource("s3", region_name=REGION_NAME)
s3_res.create_bucket(Bucket=bucket_name)

# Put bucket notification event bridge
s3_client = boto3.client("s3", region_name=REGION_NAME)
events_client = boto3.client("events", region_name=REGION_NAME)
logs_client = boto3.client("logs", region_name=REGION_NAME)
s3_client.put_bucket_notification_configuration(
Bucket=bucket_name,
NotificationConfiguration={"EventBridgeConfiguration": {}},
)

rule_name = "test-rule"
# Setup EventBridge Rule
events_client = boto3.client("events", region_name=REGION_NAME)
events_client.put_rule(
Name=rule_name, EventPattern=json.dumps({"account": [ACCOUNT_ID]})
)
log_group_name = "/test-group"

# Create a log group and attach it to the events target.
logs_client = boto3.client("logs", region_name=REGION_NAME)
logs_client.create_log_group(logGroupName=log_group_name)
events_client.put_targets(
Rule=rule_name,
Expand All @@ -37,23 +51,31 @@ def test_put_object_notification_ObjectCreated_PUT():
],
)

# Create S3 bucket
bucket_name = str(uuid4())
s3_res.create_bucket(Bucket=bucket_name)
return {
"bucket_name": bucket_name,
"event_rule_name": rule_name,
"log_group_name": log_group_name,
}

# Put Notification
s3_client.put_bucket_notification_configuration(
Bucket=bucket_name,
NotificationConfiguration={"EventBridgeConfiguration": {}},

def _get_send_events(log_group_name: str = "/test-group") -> List[Dict[str, Any]]:
logs_client = boto3.client("logs", region_name=REGION_NAME)
return sorted(
logs_client.filter_log_events(logGroupName=log_group_name)["events"],
key=lambda item: item["timestamp"],
)


@mock_aws
def test_put_object_notification_ObjectCreated_PUT():
resource_names = _seteup_bucket_notification_eventbridge()
bucket_name = resource_names["bucket_name"]
s3_client = boto3.client("s3", region_name=REGION_NAME)

# Put Object
s3_client.put_object(Bucket=bucket_name, Key="keyname", Body="bodyofnewobject")

events = sorted(
logs_client.filter_log_events(logGroupName=log_group_name)["events"],
key=lambda item: item["eventId"],
)
events = _get_send_events()
assert len(events) == 1
event_message = json.loads(events[0]["message"])
assert event_message["detail-type"] == "Object Created"
Expand All @@ -70,36 +92,8 @@ def test_put_object_notification_ObjectCreated_POST():
if not settings.TEST_DECORATOR_MODE:
raise SkipTest(("Doesn't quite work right with the Proxy or Server"))

s3_res = boto3.resource("s3", region_name=REGION_NAME)
s3_client = boto3.client("s3", region_name=REGION_NAME)
events_client = boto3.client("events", region_name=REGION_NAME)
logs_client = boto3.client("logs", region_name=REGION_NAME)

rule_name = "test-rule"
events_client.put_rule(
Name=rule_name, EventPattern=json.dumps({"account": [ACCOUNT_ID]})
)
log_group_name = "/test-group"
logs_client.create_log_group(logGroupName=log_group_name)
events_client.put_targets(
Rule=rule_name,
Targets=[
{
"Id": "test",
"Arn": f"arn:aws:logs:{REGION_NAME}:{ACCOUNT_ID}:log-group:{log_group_name}",
}
],
)

# Create S3 bucket
bucket_name = str(uuid4())
s3_res.create_bucket(Bucket=bucket_name)

# Put bucket notification event bridge
s3_client.put_bucket_notification_configuration(
Bucket=bucket_name,
NotificationConfiguration={"EventBridgeConfiguration": {}},
)
resource_names = _seteup_bucket_notification_eventbridge()
bucket_name = resource_names["bucket_name"]

###
# multipart/formdata POST request (this request is processed in S3Response._bucket_response_post)
Expand All @@ -113,10 +107,7 @@ def test_put_object_notification_ObjectCreated_POST():
files={"file": ("tmp.txt", BytesIO(content))},
)

events = sorted(
logs_client.filter_log_events(logGroupName=log_group_name)["events"],
key=lambda item: item["eventId"],
)
events = _get_send_events()
assert len(events) == 1
event_message = json.loads(events[0]["message"])
assert event_message["detail-type"] == "Object Created"
Expand All @@ -126,3 +117,28 @@ def test_put_object_notification_ObjectCreated_POST():
assert event_message["detail"]["bucket"]["name"] == bucket_name
assert event_message["detail"]["object"]["key"] == object_key
assert event_message["detail"]["reason"] == "ObjectCreated"


@mock_aws
def test_copy_object_notification():
resource_names = _seteup_bucket_notification_eventbridge()
bucket_name = resource_names["bucket_name"]
s3_client = boto3.client("s3", region_name=REGION_NAME)

# Copy object (send two events; PutObject and CopyObject)
s3_client.put_object(Bucket=bucket_name, Key="keyname", Body="bodyofnewobject")
object_key = "key2"
s3_client.copy_object(
Bucket=bucket_name, CopySource=f"{bucket_name}/keyname", Key="key2"
)

events = _get_send_events()
assert len(events) == 2 # [PutObject event, CopyObject event]
event_message = json.loads(events[-1]["message"])
assert event_message["detail-type"] == "Object Created"
assert event_message["source"] == "aws.s3"
assert event_message["account"] == ACCOUNT_ID
assert event_message["region"] == REGION_NAME
assert event_message["detail"]["bucket"]["name"] == bucket_name
assert event_message["detail"]["object"]["key"] == object_key
assert event_message["detail"]["reason"] == "ObjectCreated"