diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 2ecd4f9307..a53eb86042 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -16,3 +16,5 @@ # Maximum interval(secs) to wait between retries for retry function MAX_WAIT_INTERVAL: str = "60" + +AWS_LAMBDA_FEATURE_SERVER_IMAGE = "feastdev/feature-server" diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 0d4fb929d9..b7f97627d6 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -210,6 +210,14 @@ def __init__( ) +class DockerDaemonNotRunning(Exception): + def __init__(self): + super().__init__( + "The Docker Python sdk cannot connect to the Docker daemon. Please make sure you have" + "the docker daemon installed, and that it is running." + ) + + class RegistryInferenceFailure(Exception): def __init__(self, repo_obj_type: str, specific_issue: str): super().__init__( diff --git a/sdk/python/feast/infra/aws.py b/sdk/python/feast/infra/aws.py index 0f4f2e0738..d2bd330646 100644 --- a/sdk/python/feast/infra/aws.py +++ b/sdk/python/feast/infra/aws.py @@ -5,6 +5,10 @@ from tempfile import TemporaryFile from urllib.parse import urlparse +from colorama import Fore, Style + +import feast +from feast.constants import AWS_LAMBDA_FEATURE_SERVER_IMAGE from feast.errors import S3RegistryBucketForbiddenAccess, S3RegistryBucketNotExist from feast.infra.passthrough_provider import PassthroughProvider from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto @@ -13,11 +17,66 @@ class AwsProvider(PassthroughProvider): - """ - This class only exists for backwards compatibility. - """ + def _upload_docker_image(self) -> None: + import base64 + + try: + import boto3 + except ImportError as e: + from feast.errors import FeastExtrasDependencyImportError + + raise FeastExtrasDependencyImportError("aws", str(e)) + + try: + import docker + from docker.errors import APIError + except ImportError as e: + from feast.errors import FeastExtrasDependencyImportError - pass + raise FeastExtrasDependencyImportError("docker", str(e)) + + try: + docker_client = docker.from_env() + except APIError: + from feast.errors import DockerDaemonNotRunning + + raise DockerDaemonNotRunning() + + print( + f"Pulling remote image {Style.BRIGHT + Fore.GREEN}{AWS_LAMBDA_FEATURE_SERVER_IMAGE}{Style.RESET_ALL}:" + ) + docker_client.images.pull(AWS_LAMBDA_FEATURE_SERVER_IMAGE) + + version = ".".join(feast.__version__.split(".")[:3]) + repository_name = f"feast-python-server-{version}" + ecr_client = boto3.client("ecr") + try: + print( + f"Creating remote ECR repository {Style.BRIGHT + Fore.GREEN}{repository_name}{Style.RESET_ALL}:" + ) + response = ecr_client.create_repository(repositoryName=repository_name) + repository_uri = response["repository"]["repositoryUri"] + except ecr_client.exceptions.RepositoryAlreadyExistsException: + response = ecr_client.describe_repositories( + repositoryNames=[repository_name] + ) + repository_uri = response["repositories"][0]["repositoryUri"] + + auth_token = ecr_client.get_authorization_token()["authorizationData"][0][ + "authorizationToken" + ] + username, password = base64.b64decode(auth_token).decode("utf-8").split(":") + + ecr_address = repository_uri.split("/")[0] + docker_client.login(username=username, password=password, registry=ecr_address) + + image = docker_client.images.get(AWS_LAMBDA_FEATURE_SERVER_IMAGE) + image_remote_name = f"{repository_uri}:{version}" + print( + f"Pushing local image to remote {Style.BRIGHT + Fore.GREEN}{image_remote_name}{Style.RESET_ALL}:" + ) + image.tag(image_remote_name) + docker_client.api.push(repository_uri, tag=version) class S3RegistryStore(RegistryStore): diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 3e4c0a3485..dddb3f79c8 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -52,6 +52,9 @@ def update_infra( partial=partial, ) + if self.repo_config.feature_server and self.repo_config.feature_server.enabled: + self._upload_docker_image() + def teardown_infra( self, project: str, @@ -147,3 +150,7 @@ def get_historical_features( full_feature_names=full_feature_names, ) return job + + def _upload_docker_image(self) -> None: + """Upload the docker image for the feature server to the cloud.""" + pass diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 6147f19b9a..d2e15199f2 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -147,6 +147,11 @@ def online_read( def get_provider(config: RepoConfig, repo_path: Path) -> Provider: if "." not in config.provider: if config.provider in {"gcp", "aws", "local"}: + if config.provider == "aws": + from feast.infra.aws import AwsProvider + + return AwsProvider(config) + from feast.infra.passthrough_provider import PassthroughProvider return PassthroughProvider(config) diff --git a/sdk/python/setup.py b/sdk/python/setup.py index e1c36693fd..08a0c1db00 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -78,6 +78,7 @@ AWS_REQUIRED = [ "boto3==1.17.*", + "docker>=5.0.2", ] CI_REQUIRED = [