-
Notifications
You must be signed in to change notification settings - Fork 26
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(api): add chain pipeline stages to load images from S3 or URL (#184
, #185)
- Loading branch information
Showing
5 changed files
with
85 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
from io import BytesIO | ||
from logging import getLogger | ||
from typing import Optional | ||
|
||
from boto3 import Session | ||
from PIL import Image | ||
|
||
from ..params import ImageParams, StageParams | ||
from ..server import ServerContext | ||
from ..worker import WorkerContext | ||
|
||
logger = getLogger(__name__) | ||
|
||
|
||
def source_s3( | ||
_job: WorkerContext, | ||
server: ServerContext, | ||
_stage: StageParams, | ||
_params: ImageParams, | ||
source: Image.Image, | ||
*, | ||
source_key: str, | ||
bucket: str, | ||
endpoint_url: Optional[str] = None, | ||
profile_name: Optional[str] = None, | ||
stage_source: Optional[Image.Image] = None, | ||
**kwargs, | ||
) -> Image.Image: | ||
source = stage_source or source | ||
|
||
session = Session(profile_name=profile_name) | ||
s3 = session.client("s3", endpoint_url=endpoint_url) | ||
|
||
try: | ||
logger.info("loading image from s3://%s/%s", bucket, source_key) | ||
data = BytesIO() | ||
s3.download_fileobj(bucket, source_key, data) | ||
|
||
data.seek(0) | ||
return Image.open(data) | ||
except Exception: | ||
logger.exception("error loading image from S3") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
from io import BytesIO | ||
from logging import getLogger | ||
|
||
import requests | ||
from PIL import Image | ||
|
||
from ..params import ImageParams, StageParams | ||
from ..server import ServerContext | ||
from ..worker import WorkerContext | ||
|
||
logger = getLogger(__name__) | ||
|
||
|
||
def source_url( | ||
_job: WorkerContext, | ||
_server: ServerContext, | ||
_stage: StageParams, | ||
_params: ImageParams, | ||
source: Image.Image, | ||
*, | ||
source_url: str, | ||
stage_source: Image.Image, | ||
**kwargs, | ||
) -> Image.Image: | ||
source = stage_source or source | ||
logger.info("loading image from URL source") | ||
|
||
if source is not None: | ||
logger.warn( | ||
"a source image was passed to a source stage, and will be discarded" | ||
) | ||
|
||
response = requests.get(source_url) | ||
output = Image.open(BytesIO(response.content)) | ||
|
||
logger.info("final output image size: %sx%s", output.width, output.height) | ||
return output |