Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve aws lambda deployment (logging, idempotency, etc) #1985

Merged
merged 2 commits into from
Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ def cli(ctx: click.Context, chdir: Optional[str], log_level: str):
datefmt="%m/%d/%Y %I:%M:%S %p",
level=level,
)
# Override the logging level for already created loggers (due to loggers being created at the import time)
# Note, that format & datefmt does not need to be set, because by default child loggers don't override them

# Also note, that mypy complains that logging.root doesn't have "manager" because of the way it's written.
# So we have to put a type ignore hint for mypy.
for logger_name in logging.root.manager.loggerDict: # type: ignore
if "feast" in logger_name:
logger = logging.getLogger(logger_name)
logger.setLevel(level)

except Exception as e:
raise e
pass
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
MAX_WAIT_INTERVAL: str = "60"

AWS_LAMBDA_FEATURE_SERVER_IMAGE = "feastdev/feature-server:aws"
AWS_LAMBDA_FEATURE_SERVER_REPOSITORY = "feast-python-server"

# feature_store.yaml environment variable name for remote feature server
FEATURE_STORE_YAML_ENV_NAME: str = "FEATURE_STORE_YAML_BASE64"
Expand Down
265 changes: 148 additions & 117 deletions sdk/python/feast/infra/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from feast.constants import (
AWS_LAMBDA_FEATURE_SERVER_IMAGE,
AWS_LAMBDA_FEATURE_SERVER_REPOSITORY,
FEAST_USAGE,
FEATURE_STORE_YAML_ENV_NAME,
)
Expand All @@ -29,6 +30,7 @@
from feast.feature_view import FeatureView
from feast.flags import FLAG_AWS_LAMBDA_FEATURE_SERVER_NAME
from feast.flags_helper import enable_aws_lambda_feature_server
from feast.infra.feature_servers.aws_lambda.config import AwsLambdaFeatureServerConfig
from feast.infra.passthrough_provider import PassthroughProvider
from feast.infra.utils import aws_utils
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
Expand Down Expand Up @@ -83,89 +85,108 @@ def update_infra(
registry_store_class.__name__, S3RegistryStore.__name__
)

image_uri = self._upload_docker_image(project)
_logger.info("Deploying feature server...")
ecr_client = boto3.client("ecr")
repository_uri = self._create_or_get_repository_uri(ecr_client)
version = _get_version_for_aws()
# Only download & upload the docker image if it doesn't already exist in ECR
if not ecr_client.batch_get_image(
repositoryName=AWS_LAMBDA_FEATURE_SERVER_REPOSITORY,
imageIds=[{"imageTag": version}],
).get("images"):
image_uri = self._upload_docker_image(
ecr_client, repository_uri, version
)
else:
image_uri = f"{repository_uri}:{version}"

if not self.repo_config.repo_path:
raise RepoConfigPathDoesNotExist()
with open(self.repo_config.repo_path / "feature_store.yaml", "rb") as f:
config_bytes = f.read()
config_base64 = base64.b64encode(config_bytes).decode()
self._deploy_feature_server(project, image_uri)

resource_name = self._get_lambda_name(project)
lambda_client = boto3.client("lambda")
api_gateway_client = boto3.client("apigatewayv2")
function = aws_utils.get_lambda_function(lambda_client, resource_name)
def _deploy_feature_server(self, project: str, image_uri: str):
_logger.info("Deploying feature server...")

if not self.repo_config.repo_path:
raise RepoConfigPathDoesNotExist()
with open(self.repo_config.repo_path / "feature_store.yaml", "rb") as f:
config_bytes = f.read()
config_base64 = base64.b64encode(config_bytes).decode()

if function is None:
# If the Lambda function does not exist, create it.
_logger.info(" Creating AWS Lambda...")
lambda_client.create_function(
resource_name = _get_lambda_name(project)
lambda_client = boto3.client("lambda")
api_gateway_client = boto3.client("apigatewayv2")
function = aws_utils.get_lambda_function(lambda_client, resource_name)

if function is None:
# If the Lambda function does not exist, create it.
_logger.info(" Creating AWS Lambda...")
assert isinstance(
self.repo_config.feature_server, AwsLambdaFeatureServerConfig
)
lambda_client.create_function(
FunctionName=resource_name,
Role=self.repo_config.feature_server.execution_role_name,
Code={"ImageUri": image_uri},
PackageType="Image",
MemorySize=1769,
Environment={
"Variables": {
FEATURE_STORE_YAML_ENV_NAME: config_base64,
FEAST_USAGE: "False",
}
},
Tags={
"feast-owned": "True",
"project": project,
"feast-sdk-version": get_version(),
},
)
function = aws_utils.get_lambda_function(lambda_client, resource_name)
if not function:
raise AwsLambdaDoesNotExist(resource_name)
else:
# If the feature_store.yaml has changed, need to update the environment variable.
env = function.get("Environment", {}).get("Variables", {})
if env.get(FEATURE_STORE_YAML_ENV_NAME) != config_base64:
# Note, that this does not update Lambda gracefully (e.g. no rolling deployment).
# It's expected that feature_store.yaml is not regularly updated while the lambda
# is serving production traffic. However, the update in registry (e.g. modifying
# feature views, feature services, and other definitions does not update lambda).
_logger.info(" Updating AWS Lambda...")

lambda_client.update_function_configuration(
FunctionName=resource_name,
Role=self.repo_config.feature_server.execution_role_name,
Code={"ImageUri": image_uri},
PackageType="Image",
MemorySize=1769,
Environment={
"Variables": {
FEATURE_STORE_YAML_ENV_NAME: config_base64,
FEAST_USAGE: "False",
}
},
Tags={
"feast-owned": "True",
"project": project,
"feast-sdk-version": get_version(),
"Variables": {FEATURE_STORE_YAML_ENV_NAME: config_base64}
},
)
function = aws_utils.get_lambda_function(lambda_client, resource_name)
if not function:
raise AwsLambdaDoesNotExist(resource_name)
else:
# If the feature_store.yaml has changed, need to update the environment variable.
env = function.get("Environment", {}).get("Variables", {})
if env.get(FEATURE_STORE_YAML_ENV_NAME) != config_base64:
# Note, that this does not update Lambda gracefully (e.g. no rolling deployment).
# It's expected that feature_store.yaml is not regularly updated while the lambda
# is serving production traffic. However, the update in registry (e.g. modifying
# feature views, feature services, and other definitions does not update lambda).
_logger.info(" Updating AWS Lambda...")

lambda_client.update_function_configuration(
FunctionName=resource_name,
Environment={
"Variables": {FEATURE_STORE_YAML_ENV_NAME: config_base64}
},
)

api = aws_utils.get_first_api_gateway(api_gateway_client, resource_name)
api = aws_utils.get_first_api_gateway(api_gateway_client, resource_name)
if not api:
# If the API Gateway doesn't exist, create it
_logger.info(" Creating AWS API Gateway...")
api = api_gateway_client.create_api(
Name=resource_name,
ProtocolType="HTTP",
Target=function["FunctionArn"],
RouteKey="POST /get-online-features",
Tags={
"feast-owned": "True",
"project": project,
"feast-sdk-version": get_version(),
},
)
if not api:
# If the API Gateway doesn't exist, create it
_logger.info(" Creating AWS API Gateway...")
api = api_gateway_client.create_api(
Name=resource_name,
ProtocolType="HTTP",
Target=function["FunctionArn"],
RouteKey="POST /get-online-features",
Tags={
"feast-owned": "True",
"project": project,
"feast-sdk-version": get_version(),
},
)
if not api:
raise AwsAPIGatewayDoesNotExist(resource_name)
# Make sure to give AWS Lambda a permission to be invoked by the newly created API Gateway
api_id = api["ApiId"]
region = lambda_client.meta.region_name
account_id = aws_utils.get_account_id()
lambda_client.add_permission(
FunctionName=function["FunctionArn"],
StatementId=str(uuid.uuid4()),
Action="lambda:InvokeFunction",
Principal="apigateway.amazonaws.com",
SourceArn=f"arn:aws:execute-api:{region}:{account_id}:{api_id}/*/*/get-online-features",
)
raise AwsAPIGatewayDoesNotExist(resource_name)
# Make sure to give AWS Lambda a permission to be invoked by the newly created API Gateway
api_id = api["ApiId"]
region = lambda_client.meta.region_name
account_id = aws_utils.get_account_id()
lambda_client.add_permission(
FunctionName=function["FunctionArn"],
StatementId=str(uuid.uuid4()),
Action="lambda:InvokeFunction",
Principal="apigateway.amazonaws.com",
SourceArn=f"arn:aws:execute-api:{region}:{account_id}:{api_id}/*/*/get-online-features",
)

def teardown_infra(
self,
Expand All @@ -180,7 +201,7 @@ def teardown_infra(
and self.repo_config.feature_server.enabled
):
_logger.info("Tearing down feature server...")
resource_name = self._get_lambda_name(project)
resource_name = _get_lambda_name(project)
lambda_client = boto3.client("lambda")
api_gateway_client = boto3.client("apigatewayv2")

Expand All @@ -197,7 +218,7 @@ def teardown_infra(

def get_feature_server_endpoint(self) -> Optional[str]:
project = self.repo_config.project
resource_name = self._get_lambda_name(project)
resource_name = _get_lambda_name(project)
api_gateway_client = boto3.client("apigatewayv2")
api = aws_utils.get_first_api_gateway(api_gateway_client, resource_name)

Expand All @@ -209,25 +230,15 @@ def get_feature_server_endpoint(self) -> Optional[str]:
region = lambda_client.meta.region_name
return f"https://{api_id}.execute-api.{region}.amazonaws.com"

def _upload_docker_image(self, project: str) -> str:
def _upload_docker_image(
self, ecr_client, repository_uri: str, version: str
) -> str:
"""
Pulls the AWS Lambda docker image from Dockerhub and uploads it to AWS ECR.

Args:
project: Feast project name

Returns:
The URI of the uploaded docker image.
"""
import base64

try:
import boto3
except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError

raise FeastExtrasDependencyImportError("aws", str(e))

try:
import docker
from docker.errors import APIError
Expand All @@ -244,52 +255,72 @@ def _upload_docker_image(self, project: str) -> str:
raise DockerDaemonNotRunning()

_logger.info(
f"Pulling remote image {Style.BRIGHT + Fore.GREEN}{AWS_LAMBDA_FEATURE_SERVER_IMAGE}{Style.RESET_ALL}:"
f"Pulling remote image {Style.BRIGHT + Fore.GREEN}{AWS_LAMBDA_FEATURE_SERVER_IMAGE}{Style.RESET_ALL}"
)
docker_client.images.pull(AWS_LAMBDA_FEATURE_SERVER_IMAGE)

version = self._get_version_for_aws()
repository_name = f"feast-python-server-{project}-{version}"
ecr_client = boto3.client("ecr")
try:
_logger.info(
f"Creating remote ECR repository {Style.BRIGHT + Fore.GREEN}{repository_name}{Style.RESET_ALL}:"
)
response = ecr_client.create_repository(repositoryName=repository_name)
repository_uri = response["repository"]["repositoryUri"]
except ecr_client.exceptions.RepositoryAlreadyExistsException:
response = ecr_client.describe_repositories(
repositoryNames=[repository_name]
)
repository_uri = response["repositories"][0]["repositoryUri"]
for line in docker_client.api.pull(
AWS_LAMBDA_FEATURE_SERVER_IMAGE, stream=True, decode=True
):
_logger.debug(f" {line}")

auth_token = ecr_client.get_authorization_token()["authorizationData"][0][
"authorizationToken"
]
username, password = base64.b64decode(auth_token).decode("utf-8").split(":")

ecr_address = repository_uri.split("/")[0]
docker_client.login(username=username, password=password, registry=ecr_address)
_logger.info(
f"Logging in Docker client to {Style.BRIGHT + Fore.GREEN}{ecr_address}{Style.RESET_ALL}"
)
login_status = docker_client.login(
username=username, password=password, registry=ecr_address
)
_logger.debug(f" {login_status}")

image = docker_client.images.get(AWS_LAMBDA_FEATURE_SERVER_IMAGE)
image_remote_name = f"{repository_uri}:{version}"
_logger.info(
f"Pushing local image to remote {Style.BRIGHT + Fore.GREEN}{image_remote_name}{Style.RESET_ALL}:"
f"Pushing local image to remote {Style.BRIGHT + Fore.GREEN}{image_remote_name}{Style.RESET_ALL}"
)
image.tag(image_remote_name)
docker_client.api.push(repository_uri, tag=version)
for line in docker_client.api.push(
repository_uri, tag=version, stream=True, decode=True
):
_logger.debug(f" {line}")

return image_remote_name

def _get_lambda_name(self, project: str):
return f"feast-python-server-{project}-{self._get_version_for_aws()}"
def _create_or_get_repository_uri(self, ecr_client):
try:
return ecr_client.describe_repositories(
repositoryNames=[AWS_LAMBDA_FEATURE_SERVER_REPOSITORY]
)["repositories"][0]["repositoryUri"]
except ecr_client.exceptions.RepositoryNotFoundException:
_logger.info(
f"Creating remote ECR repository {Style.BRIGHT + Fore.GREEN}{AWS_LAMBDA_FEATURE_SERVER_REPOSITORY}{Style.RESET_ALL}"
)
response = ecr_client.create_repository(
repositoryName=AWS_LAMBDA_FEATURE_SERVER_REPOSITORY
)
return response["repository"]["repositoryUri"]


@staticmethod
def _get_version_for_aws():
"""Returns Feast version with certain characters replaced.
def _get_lambda_name(project: str):
lambda_prefix = AWS_LAMBDA_FEATURE_SERVER_REPOSITORY
lambda_suffix = f"{project}-{_get_version_for_aws()}"
# AWS Lambda name can't have the length greater than 64 bytes.
# This usually occurs during integration tests or when feast is
# installed in editable mode (pip install -e), where feast version is long
if len(lambda_prefix) + len(lambda_suffix) >= 63:
lambda_suffix = base64.b64encode(lambda_suffix.encode()).decode()[:40]
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved
return f"{lambda_prefix}-{lambda_suffix}"

This allows the version to be included in names for AWS resources.
"""
return get_version().replace(".", "_").replace("+", "_")

def _get_version_for_aws():
"""Returns Feast version with certain characters replaced.

This allows the version to be included in names for AWS resources.
"""
return get_version().replace(".", "_").replace("+", "_")


class S3RegistryStore(RegistryStore):
Expand Down