Skip to content

Commit

Permalink
Improve aws lambda deployment (logging, idempotency, etc) (#1985)
Browse files Browse the repository at this point in the history
* Improve aws lambda deployment (logging, idempotency, etc)

Signed-off-by: Tsotne Tabidze <[email protected]>

* Fix linter error

Signed-off-by: Tsotne Tabidze <[email protected]>
  • Loading branch information
Tsotne Tabidze authored Nov 2, 2021
1 parent 600d38e commit bc4ffa5
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 117 deletions.
10 changes: 10 additions & 0 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ def cli(ctx: click.Context, chdir: Optional[str], log_level: str):
datefmt="%m/%d/%Y %I:%M:%S %p",
level=level,
)
# Override the logging level for already created loggers (due to loggers being created at the import time)
# Note, that format & datefmt does not need to be set, because by default child loggers don't override them

# Also note, that mypy complains that logging.root doesn't have "manager" because of the way it's written.
# So we have to put a type ignore hint for mypy.
for logger_name in logging.root.manager.loggerDict: # type: ignore
if "feast" in logger_name:
logger = logging.getLogger(logger_name)
logger.setLevel(level)

except Exception as e:
raise e
pass
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
MAX_WAIT_INTERVAL: str = "60"

AWS_LAMBDA_FEATURE_SERVER_IMAGE = "feastdev/feature-server:aws"
AWS_LAMBDA_FEATURE_SERVER_REPOSITORY = "feast-python-server"

# feature_store.yaml environment variable name for remote feature server
FEATURE_STORE_YAML_ENV_NAME: str = "FEATURE_STORE_YAML_BASE64"
Expand Down
265 changes: 148 additions & 117 deletions sdk/python/feast/infra/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from feast.constants import (
AWS_LAMBDA_FEATURE_SERVER_IMAGE,
AWS_LAMBDA_FEATURE_SERVER_REPOSITORY,
FEAST_USAGE,
FEATURE_STORE_YAML_ENV_NAME,
)
Expand All @@ -29,6 +30,7 @@
from feast.feature_view import FeatureView
from feast.flags import FLAG_AWS_LAMBDA_FEATURE_SERVER_NAME
from feast.flags_helper import enable_aws_lambda_feature_server
from feast.infra.feature_servers.aws_lambda.config import AwsLambdaFeatureServerConfig
from feast.infra.passthrough_provider import PassthroughProvider
from feast.infra.utils import aws_utils
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
Expand Down Expand Up @@ -83,89 +85,108 @@ def update_infra(
registry_store_class.__name__, S3RegistryStore.__name__
)

image_uri = self._upload_docker_image(project)
_logger.info("Deploying feature server...")
ecr_client = boto3.client("ecr")
repository_uri = self._create_or_get_repository_uri(ecr_client)
version = _get_version_for_aws()
# Only download & upload the docker image if it doesn't already exist in ECR
if not ecr_client.batch_get_image(
repositoryName=AWS_LAMBDA_FEATURE_SERVER_REPOSITORY,
imageIds=[{"imageTag": version}],
).get("images"):
image_uri = self._upload_docker_image(
ecr_client, repository_uri, version
)
else:
image_uri = f"{repository_uri}:{version}"

if not self.repo_config.repo_path:
raise RepoConfigPathDoesNotExist()
with open(self.repo_config.repo_path / "feature_store.yaml", "rb") as f:
config_bytes = f.read()
config_base64 = base64.b64encode(config_bytes).decode()
self._deploy_feature_server(project, image_uri)

resource_name = self._get_lambda_name(project)
lambda_client = boto3.client("lambda")
api_gateway_client = boto3.client("apigatewayv2")
function = aws_utils.get_lambda_function(lambda_client, resource_name)
def _deploy_feature_server(self, project: str, image_uri: str):
_logger.info("Deploying feature server...")

if not self.repo_config.repo_path:
raise RepoConfigPathDoesNotExist()
with open(self.repo_config.repo_path / "feature_store.yaml", "rb") as f:
config_bytes = f.read()
config_base64 = base64.b64encode(config_bytes).decode()

if function is None:
# If the Lambda function does not exist, create it.
_logger.info(" Creating AWS Lambda...")
lambda_client.create_function(
resource_name = _get_lambda_name(project)
lambda_client = boto3.client("lambda")
api_gateway_client = boto3.client("apigatewayv2")
function = aws_utils.get_lambda_function(lambda_client, resource_name)

if function is None:
# If the Lambda function does not exist, create it.
_logger.info(" Creating AWS Lambda...")
assert isinstance(
self.repo_config.feature_server, AwsLambdaFeatureServerConfig
)
lambda_client.create_function(
FunctionName=resource_name,
Role=self.repo_config.feature_server.execution_role_name,
Code={"ImageUri": image_uri},
PackageType="Image",
MemorySize=1769,
Environment={
"Variables": {
FEATURE_STORE_YAML_ENV_NAME: config_base64,
FEAST_USAGE: "False",
}
},
Tags={
"feast-owned": "True",
"project": project,
"feast-sdk-version": get_version(),
},
)
function = aws_utils.get_lambda_function(lambda_client, resource_name)
if not function:
raise AwsLambdaDoesNotExist(resource_name)
else:
# If the feature_store.yaml has changed, need to update the environment variable.
env = function.get("Environment", {}).get("Variables", {})
if env.get(FEATURE_STORE_YAML_ENV_NAME) != config_base64:
# Note, that this does not update Lambda gracefully (e.g. no rolling deployment).
# It's expected that feature_store.yaml is not regularly updated while the lambda
# is serving production traffic. However, the update in registry (e.g. modifying
# feature views, feature services, and other definitions does not update lambda).
_logger.info(" Updating AWS Lambda...")

lambda_client.update_function_configuration(
FunctionName=resource_name,
Role=self.repo_config.feature_server.execution_role_name,
Code={"ImageUri": image_uri},
PackageType="Image",
MemorySize=1769,
Environment={
"Variables": {
FEATURE_STORE_YAML_ENV_NAME: config_base64,
FEAST_USAGE: "False",
}
},
Tags={
"feast-owned": "True",
"project": project,
"feast-sdk-version": get_version(),
"Variables": {FEATURE_STORE_YAML_ENV_NAME: config_base64}
},
)
function = aws_utils.get_lambda_function(lambda_client, resource_name)
if not function:
raise AwsLambdaDoesNotExist(resource_name)
else:
# If the feature_store.yaml has changed, need to update the environment variable.
env = function.get("Environment", {}).get("Variables", {})
if env.get(FEATURE_STORE_YAML_ENV_NAME) != config_base64:
# Note, that this does not update Lambda gracefully (e.g. no rolling deployment).
# It's expected that feature_store.yaml is not regularly updated while the lambda
# is serving production traffic. However, the update in registry (e.g. modifying
# feature views, feature services, and other definitions does not update lambda).
_logger.info(" Updating AWS Lambda...")

lambda_client.update_function_configuration(
FunctionName=resource_name,
Environment={
"Variables": {FEATURE_STORE_YAML_ENV_NAME: config_base64}
},
)

api = aws_utils.get_first_api_gateway(api_gateway_client, resource_name)
api = aws_utils.get_first_api_gateway(api_gateway_client, resource_name)
if not api:
# If the API Gateway doesn't exist, create it
_logger.info(" Creating AWS API Gateway...")
api = api_gateway_client.create_api(
Name=resource_name,
ProtocolType="HTTP",
Target=function["FunctionArn"],
RouteKey="POST /get-online-features",
Tags={
"feast-owned": "True",
"project": project,
"feast-sdk-version": get_version(),
},
)
if not api:
# If the API Gateway doesn't exist, create it
_logger.info(" Creating AWS API Gateway...")
api = api_gateway_client.create_api(
Name=resource_name,
ProtocolType="HTTP",
Target=function["FunctionArn"],
RouteKey="POST /get-online-features",
Tags={
"feast-owned": "True",
"project": project,
"feast-sdk-version": get_version(),
},
)
if not api:
raise AwsAPIGatewayDoesNotExist(resource_name)
# Make sure to give AWS Lambda a permission to be invoked by the newly created API Gateway
api_id = api["ApiId"]
region = lambda_client.meta.region_name
account_id = aws_utils.get_account_id()
lambda_client.add_permission(
FunctionName=function["FunctionArn"],
StatementId=str(uuid.uuid4()),
Action="lambda:InvokeFunction",
Principal="apigateway.amazonaws.com",
SourceArn=f"arn:aws:execute-api:{region}:{account_id}:{api_id}/*/*/get-online-features",
)
raise AwsAPIGatewayDoesNotExist(resource_name)
# Make sure to give AWS Lambda a permission to be invoked by the newly created API Gateway
api_id = api["ApiId"]
region = lambda_client.meta.region_name
account_id = aws_utils.get_account_id()
lambda_client.add_permission(
FunctionName=function["FunctionArn"],
StatementId=str(uuid.uuid4()),
Action="lambda:InvokeFunction",
Principal="apigateway.amazonaws.com",
SourceArn=f"arn:aws:execute-api:{region}:{account_id}:{api_id}/*/*/get-online-features",
)

def teardown_infra(
self,
Expand All @@ -180,7 +201,7 @@ def teardown_infra(
and self.repo_config.feature_server.enabled
):
_logger.info("Tearing down feature server...")
resource_name = self._get_lambda_name(project)
resource_name = _get_lambda_name(project)
lambda_client = boto3.client("lambda")
api_gateway_client = boto3.client("apigatewayv2")

Expand All @@ -197,7 +218,7 @@ def teardown_infra(

def get_feature_server_endpoint(self) -> Optional[str]:
project = self.repo_config.project
resource_name = self._get_lambda_name(project)
resource_name = _get_lambda_name(project)
api_gateway_client = boto3.client("apigatewayv2")
api = aws_utils.get_first_api_gateway(api_gateway_client, resource_name)

Expand All @@ -209,25 +230,15 @@ def get_feature_server_endpoint(self) -> Optional[str]:
region = lambda_client.meta.region_name
return f"https://{api_id}.execute-api.{region}.amazonaws.com"

def _upload_docker_image(self, project: str) -> str:
def _upload_docker_image(
self, ecr_client, repository_uri: str, version: str
) -> str:
"""
Pulls the AWS Lambda docker image from Dockerhub and uploads it to AWS ECR.
Args:
project: Feast project name
Returns:
The URI of the uploaded docker image.
"""
import base64

try:
import boto3
except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError

raise FeastExtrasDependencyImportError("aws", str(e))

try:
import docker
from docker.errors import APIError
Expand All @@ -244,52 +255,72 @@ def _upload_docker_image(self, project: str) -> str:
raise DockerDaemonNotRunning()

_logger.info(
f"Pulling remote image {Style.BRIGHT + Fore.GREEN}{AWS_LAMBDA_FEATURE_SERVER_IMAGE}{Style.RESET_ALL}:"
f"Pulling remote image {Style.BRIGHT + Fore.GREEN}{AWS_LAMBDA_FEATURE_SERVER_IMAGE}{Style.RESET_ALL}"
)
docker_client.images.pull(AWS_LAMBDA_FEATURE_SERVER_IMAGE)

version = self._get_version_for_aws()
repository_name = f"feast-python-server-{project}-{version}"
ecr_client = boto3.client("ecr")
try:
_logger.info(
f"Creating remote ECR repository {Style.BRIGHT + Fore.GREEN}{repository_name}{Style.RESET_ALL}:"
)
response = ecr_client.create_repository(repositoryName=repository_name)
repository_uri = response["repository"]["repositoryUri"]
except ecr_client.exceptions.RepositoryAlreadyExistsException:
response = ecr_client.describe_repositories(
repositoryNames=[repository_name]
)
repository_uri = response["repositories"][0]["repositoryUri"]
for line in docker_client.api.pull(
AWS_LAMBDA_FEATURE_SERVER_IMAGE, stream=True, decode=True
):
_logger.debug(f" {line}")

auth_token = ecr_client.get_authorization_token()["authorizationData"][0][
"authorizationToken"
]
username, password = base64.b64decode(auth_token).decode("utf-8").split(":")

ecr_address = repository_uri.split("/")[0]
docker_client.login(username=username, password=password, registry=ecr_address)
_logger.info(
f"Logging in Docker client to {Style.BRIGHT + Fore.GREEN}{ecr_address}{Style.RESET_ALL}"
)
login_status = docker_client.login(
username=username, password=password, registry=ecr_address
)
_logger.debug(f" {login_status}")

image = docker_client.images.get(AWS_LAMBDA_FEATURE_SERVER_IMAGE)
image_remote_name = f"{repository_uri}:{version}"
_logger.info(
f"Pushing local image to remote {Style.BRIGHT + Fore.GREEN}{image_remote_name}{Style.RESET_ALL}:"
f"Pushing local image to remote {Style.BRIGHT + Fore.GREEN}{image_remote_name}{Style.RESET_ALL}"
)
image.tag(image_remote_name)
docker_client.api.push(repository_uri, tag=version)
for line in docker_client.api.push(
repository_uri, tag=version, stream=True, decode=True
):
_logger.debug(f" {line}")

return image_remote_name

def _get_lambda_name(self, project: str):
return f"feast-python-server-{project}-{self._get_version_for_aws()}"
def _create_or_get_repository_uri(self, ecr_client):
try:
return ecr_client.describe_repositories(
repositoryNames=[AWS_LAMBDA_FEATURE_SERVER_REPOSITORY]
)["repositories"][0]["repositoryUri"]
except ecr_client.exceptions.RepositoryNotFoundException:
_logger.info(
f"Creating remote ECR repository {Style.BRIGHT + Fore.GREEN}{AWS_LAMBDA_FEATURE_SERVER_REPOSITORY}{Style.RESET_ALL}"
)
response = ecr_client.create_repository(
repositoryName=AWS_LAMBDA_FEATURE_SERVER_REPOSITORY
)
return response["repository"]["repositoryUri"]


@staticmethod
def _get_version_for_aws():
"""Returns Feast version with certain characters replaced.
def _get_lambda_name(project: str):
lambda_prefix = AWS_LAMBDA_FEATURE_SERVER_REPOSITORY
lambda_suffix = f"{project}-{_get_version_for_aws()}"
# AWS Lambda name can't have the length greater than 64 bytes.
# This usually occurs during integration tests or when feast is
# installed in editable mode (pip install -e), where feast version is long
if len(lambda_prefix) + len(lambda_suffix) >= 63:
lambda_suffix = base64.b64encode(lambda_suffix.encode()).decode()[:40]
return f"{lambda_prefix}-{lambda_suffix}"

This allows the version to be included in names for AWS resources.
"""
return get_version().replace(".", "_").replace("+", "_")

def _get_version_for_aws():
"""Returns Feast version with certain characters replaced.
This allows the version to be included in names for AWS resources.
"""
return get_version().replace(".", "_").replace("+", "_")


class S3RegistryStore(RegistryStore):
Expand Down

0 comments on commit bc4ffa5

Please sign in to comment.