You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
defcopy_s3_object(src_s3_uri: str, dst_s3_uri: str, *args, s3_client: BaseClient=None, **kwargs) ->bool:
""" Copy s3 URI :param src_s3_uri: a fully qualified S3 URI for the s3 object to read :param dst_s3_uri: a fully qualified S3 URI for the s3 object to write :param s3_client: an optional botocore.client.BaseClient for s3 :return: boolean (True on success, False on failure) """# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/copy_object.html# CopySource={‘Bucket’: ‘bucket’, ‘Key’: ‘key’, ‘VersionId’: ‘id’}# Note that the VersionId key is optional and may be omitted.ifs3_clientisNone:
s3_client=s3_io_client(*args, **kwargs)
try:
src_s3_uri=S3URI(src_s3_uri)
dst_s3_uri=S3URI(dst_s3_uri)
logger.info(f"Copy: {src_s3_uri} to {dst_s3_uri}")
response=s3_client.copy_object(
ACL='bucket-owner-full-control',
Bucket=dst_s3_uri.bucket,
Key=dst_s3_uri.key,
CopySource={'Bucket': src_s3_uri.bucket, 'Key': src_s3_uri.key},
)
ifresponse:
returnTrueexceptbotocore.exceptions.ClientErroraserr:
logger.error(f"Failed S3 Copy: {src_s3_uri} to {dst_s3_uri}")
logger.error(err)
returnFalse
The text was updated successfully, but these errors were encountered:
For example, a synchronous s3 copy can use
The text was updated successfully, but these errors were encountered: