From 7d4184518aeb636391f793fa167a10f8ff782fa1 Mon Sep 17 00:00:00 2001 From: Harry Date: Fri, 12 Apr 2024 09:26:21 +0700 Subject: [PATCH] fix: Prune bytewax dependencies Signed-off-by: Harry --- .../contrib/bytewax/Dockerfile | 29 - .../contrib/bytewax/__init__.py | 15 - .../bytewax_materialization_dataflow.py | 83 --- .../bytewax/bytewax_materialization_engine.py | 495 ------------------ .../bytewax/bytewax_materialization_job.py | 63 --- .../bytewax/bytewax_materialization_task.py | 10 - .../contrib/bytewax/dataflow.py | 28 - .../contrib/bytewax/entrypoint.sh | 4 - .../contrib/bytewax/test_bytewax.py | 67 --- setup.py | 4 - 10 files changed, 798 deletions(-) delete mode 100644 sdk/python/feast/infra/materialization/contrib/bytewax/Dockerfile delete mode 100644 sdk/python/feast/infra/materialization/contrib/bytewax/__init__.py delete mode 100644 sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py delete mode 100644 sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py delete mode 100644 sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py delete mode 100644 sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_task.py delete mode 100644 sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py delete mode 100644 sdk/python/feast/infra/materialization/contrib/bytewax/entrypoint.sh delete mode 100644 sdk/python/tests/integration/materialization/contrib/bytewax/test_bytewax.py diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/Dockerfile b/sdk/python/feast/infra/materialization/contrib/bytewax/Dockerfile deleted file mode 100644 index a7d0af9b416..00000000000 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/Dockerfile +++ /dev/null @@ -1,29 +0,0 @@ -FROM python:3.9-slim-bullseye AS build - -RUN apt-get update && \ - apt-get install --no-install-suggests --no-install-recommends --yes git - -WORKDIR /bytewax - -# Copy dataflow code -COPY sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py /bytewax -COPY sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py /bytewax - -# Copy entrypoint -COPY sdk/python/feast/infra/materialization/contrib/bytewax/entrypoint.sh /bytewax - -# Copy necessary parts of the Feast codebase -COPY sdk/python sdk/python -COPY protos protos -COPY go go -COPY setup.py setup.py -COPY pyproject.toml pyproject.toml -COPY README.md README.md - -# Install Feast for AWS with Bytewax dependencies -# We need this mount thingy because setuptools_scm needs access to the -# git dir to infer the version of feast we're installing. -# https://github.com/pypa/setuptools_scm#usage-from-docker -# I think it also assumes that this dockerfile is being built from the root of the directory. -RUN --mount=source=.git,target=.git,type=bind pip3 install --no-cache-dir '.[aws,gcp,bytewax,snowflake,postgres]' - diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/__init__.py b/sdk/python/feast/infra/materialization/contrib/bytewax/__init__.py deleted file mode 100644 index 0838a4c0d59..00000000000 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -from .bytewax_materialization_dataflow import BytewaxMaterializationDataflow -from .bytewax_materialization_engine import ( - BytewaxMaterializationEngine, - BytewaxMaterializationEngineConfig, -) -from .bytewax_materialization_job import BytewaxMaterializationJob -from .bytewax_materialization_task import BytewaxMaterializationTask - -__all__ = [ - "BytewaxMaterializationTask", - "BytewaxMaterializationJob", - "BytewaxMaterializationDataflow", - "BytewaxMaterializationEngine", - "BytewaxMaterializationEngineConfig", -] diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py deleted file mode 100644 index 6fc53b67f2f..00000000000 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py +++ /dev/null @@ -1,83 +0,0 @@ -import logging -import os -from typing import List - -import pyarrow as pa -import pyarrow.parquet as pq -from bytewax.dataflow import Dataflow # type: ignore -from bytewax.execution import cluster_main -from bytewax.inputs import ManualInputConfig -from bytewax.outputs import ManualOutputConfig - -from feast import FeatureStore, FeatureView, RepoConfig -from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping - -logger = logging.getLogger(__name__) -DEFAULT_BATCH_SIZE = 1000 - - -class BytewaxMaterializationDataflow: - def __init__( - self, - config: RepoConfig, - feature_view: FeatureView, - paths: List[str], - worker_index: int, - ): - self.config = config - self.feature_store = FeatureStore(config=config) - - self.feature_view = feature_view - self.worker_index = worker_index - self.paths = paths - self.mini_batch_size = int( - os.getenv("BYTEWAX_MINI_BATCH_SIZE", DEFAULT_BATCH_SIZE) - ) - - self._run_dataflow() - - def process_path(self, path): - logger.info(f"Processing path {path}") - dataset = pq.ParquetDataset(path, use_legacy_dataset=False) - batches = [] - for fragment in dataset.fragments: - for batch in fragment.to_table().to_batches( - max_chunksize=self.mini_batch_size - ): - batches.append(batch) - - return batches - - def input_builder(self, worker_index, worker_count, _state): - return [(None, self.paths[self.worker_index])] - - def output_builder(self, worker_index, worker_count): - def output_fn(mini_batch): - table: pa.Table = pa.Table.from_batches([mini_batch]) - - if self.feature_view.batch_source.field_mapping is not None: - table = _run_pyarrow_field_mapping( - table, self.feature_view.batch_source.field_mapping - ) - join_key_to_value_type = { - entity.name: entity.dtype.to_value_type() - for entity in self.feature_view.entity_columns - } - rows_to_write = _convert_arrow_to_proto( - table, self.feature_view, join_key_to_value_type - ) - self.feature_store._get_provider().online_write_batch( - config=self.config, - table=self.feature_view, - data=rows_to_write, - progress=None, - ) - - return output_fn - - def _run_dataflow(self): - flow = Dataflow() - flow.input("inp", ManualInputConfig(self.input_builder)) - flow.flat_map(self.process_path) - flow.capture(ManualOutputConfig(self.output_builder)) - cluster_main(flow, [], 0) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py deleted file mode 100644 index 3ad6fe4b559..00000000000 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py +++ /dev/null @@ -1,495 +0,0 @@ -import logging -import uuid -from datetime import datetime -from time import sleep -from typing import Callable, List, Literal, Sequence, Union - -import yaml -from kubernetes import client, utils -from kubernetes import config as k8s_config -from kubernetes.client.exceptions import ApiException -from kubernetes.utils import FailToCreateError -from pydantic import StrictStr -from tqdm import tqdm - -from feast import FeatureView, RepoConfig -from feast.batch_feature_view import BatchFeatureView -from feast.entity import Entity -from feast.infra.materialization.batch_materialization_engine import ( - BatchMaterializationEngine, - MaterializationJob, - MaterializationJobStatus, - MaterializationTask, -) -from feast.infra.offline_stores.offline_store import OfflineStore -from feast.infra.online_stores.online_store import OnlineStore -from feast.infra.registry.base_registry import BaseRegistry -from feast.repo_config import FeastConfigBaseModel -from feast.stream_feature_view import StreamFeatureView -from feast.utils import _get_column_names - -from .bytewax_materialization_job import BytewaxMaterializationJob - -logger = logging.getLogger(__name__) - - -class BytewaxMaterializationEngineConfig(FeastConfigBaseModel): - """Batch Materialization Engine config for Bytewax""" - - type: Literal["bytewax"] = "bytewax" - """ Materialization type selector""" - - namespace: StrictStr = "default" - """ (optional) The namespace in Kubernetes to use when creating services, configuration maps and jobs. - """ - - image: StrictStr = "bytewax/bytewax-feast:latest" - """ (optional) The container image to use when running the materialization job.""" - - env: List[dict] = [] - """ (optional) A list of environment variables to set in the created Kubernetes pods. - These environment variables can be used to reference Kubernetes secrets. - """ - - image_pull_secrets: List[dict] = [] - """ (optional) The secrets to use when pulling the image to run for the materialization job """ - - resources: dict = {} - """ (optional) The resource requests and limits for the materialization containers """ - - service_account_name: StrictStr = "" - """ (optional) The service account name to use when running the job """ - - annotations: dict = {} - """ (optional) Annotations to apply to the job container. Useful for linking the service account to IAM roles, operational metadata, etc """ - - include_security_context_capabilities: bool = True - """ (optional) Include security context capabilities in the init and job container spec """ - - labels: dict = {} - """ (optional) additional labels to append to kubernetes objects """ - - max_parallelism: int = 10 - """ (optional) Maximum number of pods allowed to run in parallel""" - - synchronous: bool = False - """ (optional) If true, wait for materialization for one feature to complete before moving to the next """ - - retry_limit: int = 2 - """ (optional) Maximum number of times to retry a materialization worker pod""" - - mini_batch_size: int = 1000 - """ (optional) Number of rows to process per write operation (default 1000)""" - - active_deadline_seconds: int = 86400 - """ (optional) Maximum amount of time a materialization job is allowed to run""" - - job_batch_size: int = 100 - """ (optional) Maximum number of pods to process per job. Only applies to synchronous materialization""" - - print_pod_logs_on_failure: bool = True - """(optional) Print pod logs on job failure. Only applies to synchronous materialization""" - - -class BytewaxMaterializationEngine(BatchMaterializationEngine): - def __init__( - self, - *, - repo_config: RepoConfig, - offline_store: OfflineStore, - online_store: OnlineStore, - **kwargs, - ): - super().__init__( - repo_config=repo_config, - offline_store=offline_store, - online_store=online_store, - **kwargs, - ) - self.repo_config = repo_config - self.offline_store = offline_store - self.online_store = online_store - - k8s_config.load_config() - - self.k8s_client = client.api_client.ApiClient() - self.v1 = client.CoreV1Api(self.k8s_client) - self.batch_v1 = client.BatchV1Api(self.k8s_client) - self.batch_engine_config = repo_config.batch_engine - self.namespace = self.batch_engine_config.namespace - - def update( - self, - project: str, - views_to_delete: Sequence[ - Union[BatchFeatureView, StreamFeatureView, FeatureView] - ], - views_to_keep: Sequence[ - Union[BatchFeatureView, StreamFeatureView, FeatureView] - ], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], - ): - """This method ensures that any necessary infrastructure or resources needed by the - engine are set up ahead of materialization.""" - pass - - def teardown_infra( - self, - project: str, - fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]], - entities: Sequence[Entity], - ): - """This method ensures that any infrastructure or resources set up by ``update()``are torn down.""" - pass - - def materialize( - self, - registry: BaseRegistry, - tasks: List[MaterializationTask], - ) -> List[MaterializationJob]: - return [ - self._materialize_one( - registry, - task.feature_view, - task.start_time, - task.end_time, - task.project, - task.tqdm_builder, - ) - for task in tasks - ] - - def _materialize_one( - self, - registry: BaseRegistry, - feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView], - start_date: datetime, - end_date: datetime, - project: str, - tqdm_builder: Callable[[int], tqdm], - ): - entities = [] - for entity_name in feature_view.entities: - entities.append(registry.get_entity(entity_name, project)) - - ( - join_key_columns, - feature_name_columns, - timestamp_field, - created_timestamp_column, - ) = _get_column_names(feature_view, entities) - - offline_job = self.offline_store.pull_latest_from_table_or_query( - config=self.repo_config, - data_source=feature_view.batch_source, - join_key_columns=join_key_columns, - feature_name_columns=feature_name_columns, - timestamp_field=timestamp_field, - created_timestamp_column=created_timestamp_column, - start_date=start_date, - end_date=end_date, - ) - - paths = offline_job.to_remote_storage() - if self.batch_engine_config.synchronous: - offset = 0 - total_pods = len(paths) - batch_size = self.batch_engine_config.job_batch_size - if batch_size < 1: - raise ValueError("job_batch_size must be a value greater than 0") - if batch_size < self.batch_engine_config.max_parallelism: - logger.warning( - "job_batch_size is less than max_parallelism. Setting job_batch_size = max_parallelism" - ) - batch_size = self.batch_engine_config.max_parallelism - - while True: - next_offset = min(offset + batch_size, total_pods) - job = self._await_path_materialization( - paths[offset:next_offset], - feature_view, - offset, - next_offset, - total_pods, - ) - offset += batch_size - if ( - offset >= total_pods - or job.status() == MaterializationJobStatus.ERROR - ): - break - else: - job_id = str(uuid.uuid4()) - job = self._create_kubernetes_job(job_id, paths, feature_view) - - return job - - def _await_path_materialization( - self, paths, feature_view, batch_start, batch_end, total_pods - ): - job_id = str(uuid.uuid4()) - job = self._create_kubernetes_job(job_id, paths, feature_view) - - try: - while job.status() in ( - MaterializationJobStatus.WAITING, - MaterializationJobStatus.RUNNING, - ): - logger.info( - f"{feature_view.name} materialization for pods {batch_start}-{batch_end} " - f"(of {total_pods}) running..." - ) - sleep(30) - logger.info( - f"{feature_view.name} materialization for pods {batch_start}-{batch_end} " - f"(of {total_pods}) complete with status {job.status()}" - ) - except BaseException as e: - logger.info(f"Deleting job {job.job_id()}") - try: - self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace) - except ApiException as ae: - logger.warning(f"Could not delete job due to API Error: {ae.body}") - raise e - finally: - logger.info(f"Deleting configmap {self._configmap_name(job_id)}") - try: - self.v1.delete_namespaced_config_map( - self._configmap_name(job_id), self.namespace - ) - except ApiException as ae: - logger.warning( - f"Could not delete configmap due to API Error: {ae.body}" - ) - - if ( - job.status() == MaterializationJobStatus.ERROR - and self.batch_engine_config.print_pod_logs_on_failure - ): - self._print_pod_logs(job.job_id(), feature_view, batch_start) - - return job - - def _print_pod_logs(self, job_id, feature_view, offset=0): - pods_list = self.v1.list_namespaced_pod( - namespace=self.namespace, - label_selector=f"job-name={job_id}", - ).items - for i, pod in enumerate(pods_list): - logger.info(f"Logging output for {feature_view.name} pod {offset+i}") - try: - logger.info( - self.v1.read_namespaced_pod_log(pod.metadata.name, self.namespace) - ) - except ApiException as e: - logger.warning(f"Could not retrieve pod logs due to: {e.body}") - - def _create_kubernetes_job(self, job_id, paths, feature_view): - try: - # Create a k8s configmap with information needed by bytewax - self._create_configuration_map(job_id, paths, feature_view, self.namespace) - - # Create the k8s job definition - self._create_job_definition( - job_id, - self.namespace, - len(paths), # Create a pod for each parquet file - self.batch_engine_config.env, - ) - logger.info( - f"Created job `dataflow-{job_id}` on namespace `{self.namespace}`" - ) - except FailToCreateError as failures: - return BytewaxMaterializationJob(job_id, self.namespace, error=failures) - - return BytewaxMaterializationJob(job_id, self.namespace) - - def _create_configuration_map(self, job_id, paths, feature_view, namespace): - """Create a Kubernetes configmap for this job""" - - feature_store_configuration = yaml.dump(self.repo_config.dict(by_alias=True)) - - materialization_config = yaml.dump( - {"paths": paths, "feature_view": feature_view.name} - ) - - labels = {"feast-bytewax-materializer": "configmap"} - configmap_manifest = { - "kind": "ConfigMap", - "apiVersion": "v1", - "metadata": { - "name": self._configmap_name(job_id), - "labels": {**labels, **self.batch_engine_config.labels}, - }, - "data": { - "feature_store.yaml": feature_store_configuration, - "bytewax_materialization_config.yaml": materialization_config, - }, - } - self.v1.create_namespaced_config_map( - namespace=namespace, - body=configmap_manifest, - ) - - def _configmap_name(self, job_id): - return f"feast-{job_id}" - - def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0): - """Create a kubernetes job definition.""" - job_env = [ - {"name": "RUST_BACKTRACE", "value": "full"}, - { - "name": "BYTEWAX_PYTHON_FILE_PATH", - "value": "/bytewax/dataflow.py", - }, - {"name": "BYTEWAX_WORKDIR", "value": "/bytewax"}, - { - "name": "BYTEWAX_WORKERS_PER_PROCESS", - "value": "1", - }, - { - "name": "BYTEWAX_POD_NAME", - "valueFrom": { - "fieldRef": { - "apiVersion": "v1", - "fieldPath": "metadata.annotations['batch.kubernetes.io/job-completion-index']", - } - }, - }, - { - "name": "BYTEWAX_REPLICAS", - "value": "1", - }, - { - "name": "BYTEWAX_KEEP_CONTAINER_ALIVE", - "value": "false", - }, - { - "name": "BYTEWAX_STATEFULSET_NAME", - "value": f"dataflow-{job_id}", - }, - { - "name": "BYTEWAX_MINI_BATCH_SIZE", - "value": str(self.batch_engine_config.mini_batch_size), - }, - ] - # Add any Feast configured environment variables - job_env.extend(env) - - securityContextCapabilities = None - if self.batch_engine_config.include_security_context_capabilities: - securityContextCapabilities = { - "add": ["NET_BIND_SERVICE"], - "drop": ["ALL"], - } - - job_labels = {"feast-bytewax-materializer": "job"} - pod_labels = {"feast-bytewax-materializer": "pod"} - job_definition = { - "apiVersion": "batch/v1", - "kind": "Job", - "metadata": { - "name": f"dataflow-{job_id}", - "namespace": namespace, - "labels": {**job_labels, **self.batch_engine_config.labels}, - }, - "spec": { - "ttlSecondsAfterFinished": 3600, - "backoffLimit": self.batch_engine_config.retry_limit, - "completions": pods, - "parallelism": min(pods, self.batch_engine_config.max_parallelism), - "activeDeadlineSeconds": self.batch_engine_config.active_deadline_seconds, - "completionMode": "Indexed", - "template": { - "metadata": { - "annotations": self.batch_engine_config.annotations, - "labels": {**pod_labels, **self.batch_engine_config.labels}, - }, - "spec": { - "restartPolicy": "Never", - "subdomain": f"dataflow-{job_id}", - "imagePullSecrets": self.batch_engine_config.image_pull_secrets, - "serviceAccountName": self.batch_engine_config.service_account_name, - "initContainers": [ - { - "env": [ - { - "name": "BYTEWAX_REPLICAS", - "value": f"{pods}", - } - ], - "image": "busybox", - "imagePullPolicy": "IfNotPresent", - "name": "init-hostfile", - "resources": {}, - "securityContext": { - "allowPrivilegeEscalation": False, - "capabilities": securityContextCapabilities, - "readOnlyRootFilesystem": True, - }, - "terminationMessagePath": "/dev/termination-log", - "terminationMessagePolicy": "File", - "volumeMounts": [ - {"mountPath": "/etc/bytewax", "name": "hostfile"}, - { - "mountPath": "/tmp/bytewax/", - "name": "python-files", - }, - { - "mountPath": "/var/feast/", - "name": self._configmap_name(job_id), - }, - ], - } - ], - "containers": [ - { - "command": ["sh", "-c", "sh ./entrypoint.sh"], - "env": job_env, - "image": self.batch_engine_config.image, - "imagePullPolicy": "Always", - "name": "process", - "ports": [ - { - "containerPort": 9999, - "name": "process", - "protocol": "TCP", - } - ], - "resources": self.batch_engine_config.resources, - "securityContext": { - "allowPrivilegeEscalation": False, - "capabilities": securityContextCapabilities, - "readOnlyRootFilesystem": False, - }, - "terminationMessagePath": "/dev/termination-log", - "terminationMessagePolicy": "File", - "volumeMounts": [ - {"mountPath": "/etc/bytewax", "name": "hostfile"}, - { - "mountPath": "/var/feast/", - "name": self._configmap_name(job_id), - }, - ], - } - ], - "volumes": [ - {"emptyDir": {}, "name": "hostfile"}, - { - "configMap": { - "defaultMode": 420, - "name": self._configmap_name(job_id), - }, - "name": "python-files", - }, - { - "configMap": {"name": self._configmap_name(job_id)}, - "name": self._configmap_name(job_id), - }, - ], - }, - }, - }, - } - utils.create_from_dict(self.k8s_client, job_definition) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py deleted file mode 100644 index da969d5a880..00000000000 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py +++ /dev/null @@ -1,63 +0,0 @@ -from typing import Optional - -from kubernetes import client - -from feast.infra.materialization.batch_materialization_engine import ( - MaterializationJob, - MaterializationJobStatus, -) - - -class BytewaxMaterializationJob(MaterializationJob): - def __init__( - self, - job_id, - namespace, - error: Optional[BaseException] = None, - ): - super().__init__() - self._job_id = job_id - self.namespace = namespace - self._error: Optional[BaseException] = error - self.batch_v1 = client.BatchV1Api() - - def error(self): - return self._error - - def status(self): - if self._error is not None: - return MaterializationJobStatus.ERROR - else: - # TODO: Find a better way to parse status? - job_status = self.batch_v1.read_namespaced_job_status( - self.job_id(), self.namespace - ).status - if job_status.active is not None: - if job_status.completion_time is None: - return MaterializationJobStatus.RUNNING - else: - if ( - job_status.completion_time is not None - and job_status.conditions[0].type == "Complete" - ): - return MaterializationJobStatus.SUCCEEDED - - if ( - job_status.conditions is not None - and job_status.conditions[0].type == "Failed" - ): - self._error = Exception( - f"Job {self.job_id()} failed with reason: " - f"{job_status.conditions[0].message}" - ) - return MaterializationJobStatus.ERROR - return MaterializationJobStatus.WAITING - - def should_be_retried(self): - return False - - def job_id(self): - return f"dataflow-{self._job_id}" - - def url(self): - return None diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_task.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_task.py deleted file mode 100644 index 8bb8da741aa..00000000000 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_task.py +++ /dev/null @@ -1,10 +0,0 @@ -from feast.infra.materialization.batch_materialization_engine import MaterializationTask - - -class BytewaxMaterializationTask(MaterializationTask): - def __init__(self, project, feature_view, start_date, end_date, tqdm): - self.project = project - self.feature_view = feature_view - self.start_date = start_date - self.end_date = end_date - self.tqdm = tqdm diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py b/sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py deleted file mode 100644 index bbc32cc1651..00000000000 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py +++ /dev/null @@ -1,28 +0,0 @@ -import logging -import os - -import yaml - -from feast import FeatureStore, RepoConfig -from feast.infra.materialization.contrib.bytewax.bytewax_materialization_dataflow import ( - BytewaxMaterializationDataflow, -) - -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - - with open("/var/feast/feature_store.yaml") as f: - feast_config = yaml.load(f, Loader=yaml.Loader) - - with open("/var/feast/bytewax_materialization_config.yaml") as b: - bytewax_config = yaml.load(b, Loader=yaml.Loader) - - config = RepoConfig(**feast_config) - store = FeatureStore(config=config) - - job = BytewaxMaterializationDataflow( - config, - store.get_feature_view(bytewax_config["feature_view"]), - bytewax_config["paths"], - int(os.environ["JOB_COMPLETION_INDEX"]), - ) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/entrypoint.sh b/sdk/python/feast/infra/materialization/contrib/bytewax/entrypoint.sh deleted file mode 100644 index 0179e5481fa..00000000000 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/entrypoint.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/sh - -cd /bytewax -python dataflow.py diff --git a/sdk/python/tests/integration/materialization/contrib/bytewax/test_bytewax.py b/sdk/python/tests/integration/materialization/contrib/bytewax/test_bytewax.py deleted file mode 100644 index 0d2cecb2f14..00000000000 --- a/sdk/python/tests/integration/materialization/contrib/bytewax/test_bytewax.py +++ /dev/null @@ -1,67 +0,0 @@ -from datetime import timedelta - -import pytest - -from feast import Entity, Feature, FeatureView, ValueType -from tests.data.data_creator import create_basic_driver_dataset -from tests.integration.feature_repos.integration_test_repo_config import ( - IntegrationTestRepoConfig, - RegistryLocation, -) -from tests.integration.feature_repos.repo_configuration import ( - construct_test_environment, -) -from tests.integration.feature_repos.universal.data_sources.redshift import ( - RedshiftDataSourceCreator, -) -from tests.utils.e2e_test_validation import validate_offline_online_store_consistency - - -@pytest.mark.integration -@pytest.mark.skip(reason="Run this test manually after creating an EKS cluster.") -def test_bytewax_materialization(): - bytewax_config = IntegrationTestRepoConfig( - provider="aws", - online_store={"type": "dynamodb", "region": "us-west-2"}, - offline_store_creator=RedshiftDataSourceCreator, - batch_engine={ - "type": "bytewax", - }, - registry_location=RegistryLocation.S3, - ) - bytewax_environment = construct_test_environment(bytewax_config, None) - - df = create_basic_driver_dataset() - ds = bytewax_environment.data_source_creator.create_data_source( - df, - bytewax_environment.feature_store.project, - field_mapping={"ts_1": "ts"}, - ) - - fs = bytewax_environment.feature_store - driver = Entity( - name="driver_id", - join_key="driver_id", - value_type=ValueType.INT64, - ) - - driver_stats_fv = FeatureView( - name="driver_hourly_stats", - entities=["driver_id"], - ttl=timedelta(weeks=52), - features=[Feature(name="value", dtype=ValueType.FLOAT)], - batch_source=ds, - ) - - try: - fs.apply([driver, driver_stats_fv]) - - # materialization is run in two steps and - # we use timestamp from generated dataframe as a split point - split_dt = df["ts_1"][4].to_pydatetime() - timedelta(seconds=1) - - print(f"Split datetime: {split_dt}") - - validate_offline_online_store_consistency(fs, driver_stats_fv, split_dt) - finally: - fs.teardown() diff --git a/setup.py b/setup.py index 9f1676524f0..ddaf237ae17 100644 --- a/setup.py +++ b/setup.py @@ -91,8 +91,6 @@ AWS_REQUIRED = ["boto3>=1.17.0,<2", "docker>=5.0.2", "fsspec<=2024.1.0"] -BYTEWAX_REQUIRED = ["bytewax==0.15.1", "docker>=5.0.2", "kubernetes<=20.13.0"] - KUBERNETES_REQUIRED = ["kubernetes<=20.13.0"] SNOWFLAKE_REQUIRED = [ @@ -196,7 +194,6 @@ + GCP_REQUIRED + REDIS_REQUIRED + AWS_REQUIRED - + BYTEWAX_REQUIRED + KUBERNETES_REQUIRED + SNOWFLAKE_REQUIRED + SPARK_REQUIRED @@ -359,7 +356,6 @@ def run(self): "ci": CI_REQUIRED, "gcp": GCP_REQUIRED, "aws": AWS_REQUIRED, - "bytewax": BYTEWAX_REQUIRED, "k8s": KUBERNETES_REQUIRED, "redis": REDIS_REQUIRED, "snowflake": SNOWFLAKE_REQUIRED,