diff --git a/sdk/python/feast/infra/materialization/kubernetes/Dockerfile b/sdk/python/feast/infra/materialization/kubernetes/Dockerfile new file mode 100644 index 00000000000..9a4e68b396a --- /dev/null +++ b/sdk/python/feast/infra/materialization/kubernetes/Dockerfile @@ -0,0 +1,22 @@ +FROM python:3.9-slim-bullseye AS build + +RUN apt-get update && \ + apt-get install --no-install-suggests --no-install-recommends --yes git + +WORKDIR /app + +COPY sdk/python/feast/infra/materialization/contrib/kuberentes/main.py /app + +# Copy necessary parts of the Feast codebase +COPY sdk/python sdk/python +COPY protos protos +COPY go go +COPY setup.py setup.py +COPY pyproject.toml pyproject.toml +COPY README.md README.md + +# We need this mount thingy because setuptools_scm needs access to the +# git dir to infer the version of feast we're installing. +# https://github.com/pypa/setuptools_scm#usage-from-docker +# I think it also assumes that this dockerfile is being built from the root of the directory. +RUN --mount=source=.git,target=.git,type=bind pip3 install --no-cache-dir '.[aws,gcp,k8s,snowflake,postgres]' diff --git a/sdk/python/feast/infra/materialization/kubernetes/__init__.py b/sdk/python/feast/infra/materialization/kubernetes/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/feast/infra/materialization/kubernetes/k8s_materialization_engine.py b/sdk/python/feast/infra/materialization/kubernetes/k8s_materialization_engine.py new file mode 100644 index 00000000000..2e7129b0376 --- /dev/null +++ b/sdk/python/feast/infra/materialization/kubernetes/k8s_materialization_engine.py @@ -0,0 +1,421 @@ +import logging +import uuid +from datetime import datetime +from time import sleep +from typing import Callable, List, Literal, Sequence, Union + +import yaml +from kubernetes import client, utils +from kubernetes import config as k8s_config +from kubernetes.client.exceptions import ApiException +from kubernetes.utils import FailToCreateError +from pydantic import StrictStr +from tqdm import tqdm + +from feast import FeatureView, RepoConfig +from feast.batch_feature_view import BatchFeatureView +from feast.entity import Entity +from feast.infra.materialization.batch_materialization_engine import ( + BatchMaterializationEngine, + MaterializationJob, + MaterializationJobStatus, + MaterializationTask, +) +from feast.infra.offline_stores.offline_store import OfflineStore +from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry +from feast.repo_config import FeastConfigBaseModel +from feast.stream_feature_view import StreamFeatureView +from feast.utils import _get_column_names + +from .k8s_materialization_job import KubernetesMaterializationJob + +logger = logging.getLogger(__name__) + + +class KubernetesMaterializationEngineConfig(FeastConfigBaseModel): + """Batch Materialization Engine config for Kubernetes""" + + type: Literal["k8s"] = "k8s" + """ Materialization type selector""" + + namespace: StrictStr = "default" + """ (optional) The namespace in Kubernetes to use when creating services, configuration maps and jobs. + """ + + image: StrictStr = "feast/feast-k8s-materialization:latest" + """ (optional) The container image to use when running the materialization job.""" + + env: List[dict] = [] + """ (optional) A list of environment variables to set in the created Kubernetes pods. + These environment variables can be used to reference Kubernetes secrets. + """ + + image_pull_secrets: List[dict] = [] + """ (optional) The secrets to use when pulling the image to run for the materialization job """ + + resources: dict = {} + """ (optional) The resource requests and limits for the materialization containers """ + + service_account_name: StrictStr = "" + """ (optional) The service account name to use when running the job """ + + annotations: dict = {} + """ (optional) Annotations to apply to the job container. Useful for linking the service account to IAM roles, operational metadata, etc """ + + include_security_context_capabilities: bool = True + """ (optional) Include security context capabilities in the init and job container spec """ + + labels: dict = {} + """ (optional) additional labels to append to kubernetes objects """ + + max_parallelism: int = 10 + """ (optional) Maximum number of pods allowed to run in parallel within a single job""" + + synchronous: bool = False + """ (optional) If true, wait for materialization for one feature to complete before moving to the next """ + + retry_limit: int = 2 + """ (optional) Maximum number of times to retry a materialization worker pod""" + + mini_batch_size: int = 1000 + """ (optional) Number of rows to process per write operation (default 1000)""" + + active_deadline_seconds: int = 86400 + """ (optional) Maximum amount of time a materialization job is allowed to run""" + + job_batch_size: int = 100 + """ (optional) Maximum number of pods to process per job. Only applies to synchronous materialization""" + + print_pod_logs_on_failure: bool = True + """(optional) Print pod logs on job failure. Only applies to synchronous materialization""" + + +class KubernetesMaterializationEngine(BatchMaterializationEngine): + def __init__( + self, + *, + repo_config: RepoConfig, + offline_store: OfflineStore, + online_store: OnlineStore, + **kwargs, + ): + super().__init__( + repo_config=repo_config, + offline_store=offline_store, + online_store=online_store, + **kwargs, + ) + self.repo_config = repo_config + self.offline_store = offline_store + self.online_store = online_store + + k8s_config.load_config() + + self.k8s_client = client.api_client.ApiClient() + self.v1 = client.CoreV1Api(self.k8s_client) + self.batch_v1 = client.BatchV1Api(self.k8s_client) + self.batch_engine_config = repo_config.batch_engine + self.namespace = self.batch_engine_config.namespace + + def update( + self, + project: str, + views_to_delete: Sequence[ + Union[BatchFeatureView, StreamFeatureView, FeatureView] + ], + views_to_keep: Sequence[ + Union[BatchFeatureView, StreamFeatureView, FeatureView] + ], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + ): + """This method ensures that any necessary infrastructure or resources needed by the + engine are set up ahead of materialization.""" + pass + + def teardown_infra( + self, + project: str, + fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]], + entities: Sequence[Entity], + ): + """This method ensures that any infrastructure or resources set up by ``update()``are torn down.""" + pass + + def materialize( + self, + registry: BaseRegistry, + tasks: List[MaterializationTask], + ) -> List[MaterializationJob]: + return [ + self._materialize_one( + registry, + task.feature_view, + task.start_time, + task.end_time, + task.project, + task.tqdm_builder, + ) + for task in tasks + ] + + def _materialize_one( + self, + registry: BaseRegistry, + feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView], + start_date: datetime, + end_date: datetime, + project: str, + tqdm_builder: Callable[[int], tqdm], + ): + entities = [] + for entity_name in feature_view.entities: + entities.append(registry.get_entity(entity_name, project)) + + ( + join_key_columns, + feature_name_columns, + timestamp_field, + created_timestamp_column, + ) = _get_column_names(feature_view, entities) + + offline_job = self.offline_store.pull_latest_from_table_or_query( + config=self.repo_config, + data_source=feature_view.batch_source, + join_key_columns=join_key_columns, + feature_name_columns=feature_name_columns, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + start_date=start_date, + end_date=end_date, + ) + + paths = offline_job.to_remote_storage() + if self.batch_engine_config.synchronous: + offset = 0 + total_pods = len(paths) + batch_size = self.batch_engine_config.job_batch_size + if batch_size < 1: + raise ValueError("job_batch_size must be a value greater than 0") + if batch_size < self.batch_engine_config.max_parallelism: + logger.warning( + "job_batch_size is less than max_parallelism. Setting job_batch_size = max_parallelism" + ) + batch_size = self.batch_engine_config.max_parallelism + + while True: + next_offset = min(offset + batch_size, total_pods) + job = self._await_path_materialization( + paths[offset:next_offset], + feature_view, + offset, + next_offset, + total_pods, + ) + offset += batch_size + if ( + offset >= total_pods + or job.status() == MaterializationJobStatus.ERROR + ): + break + else: + job_id = str(uuid.uuid4()) + job = self._create_kubernetes_job(job_id, paths, feature_view) + + return job + + def _await_path_materialization( + self, paths, feature_view, batch_start, batch_end, total_pods + ): + job_id = str(uuid.uuid4()) + job = self._create_kubernetes_job(job_id, paths, feature_view) + + try: + while job.status() in ( + MaterializationJobStatus.WAITING, + MaterializationJobStatus.RUNNING, + ): + logger.info( + f"{feature_view.name} materialization for pods {batch_start}-{batch_end} " + f"(of {total_pods}) running..." + ) + sleep(30) + logger.info( + f"{feature_view.name} materialization for pods {batch_start}-{batch_end} " + f"(of {total_pods}) complete with status {job.status()}" + ) + except BaseException as e: + logger.info(f"Deleting job {job.job_id()}") + try: + self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace) + except ApiException as ae: + logger.warning(f"Could not delete job due to API Error: {ae.body}") + raise e + finally: + logger.info(f"Deleting configmap {self._configmap_name(job_id)}") + try: + self.v1.delete_namespaced_config_map( + self._configmap_name(job_id), self.namespace + ) + except ApiException as ae: + logger.warning( + f"Could not delete configmap due to API Error: {ae.body}" + ) + + if ( + job.status() == MaterializationJobStatus.ERROR + and self.batch_engine_config.print_pod_logs_on_failure + ): + self._print_pod_logs(job.job_id(), feature_view, batch_start) + + return job + + def _print_pod_logs(self, job_id, feature_view, offset=0): + pods_list = self.v1.list_namespaced_pod( + namespace=self.namespace, + label_selector=f"job-name={job_id}", + ).items + for i, pod in enumerate(pods_list): + logger.info(f"Logging output for {feature_view.name} pod {offset+i}") + try: + logger.info( + self.v1.read_namespaced_pod_log(pod.metadata.name, self.namespace) + ) + except ApiException as e: + logger.warning(f"Could not retrieve pod logs due to: {e.body}") + + def _create_kubernetes_job(self, job_id, paths, feature_view): + try: + # Create a k8s configmap with information needed by pods + self._create_configuration_map(job_id, paths, feature_view, self.namespace) + + # Create the k8s job definition + self._create_job_definition( + job_id=job_id, + namespace=self.namespace, + pods=len(paths), # Create a pod for each parquet file + env=self.batch_engine_config.env, + ) + job = KubernetesMaterializationJob(job_id, self.namespace) + logger.info(f"Created job `{job.job_id()}` on namespace `{self.namespace}`") + return job + except FailToCreateError as failures: + return KubernetesMaterializationJob(job_id, self.namespace, error=failures) + + def _create_configuration_map(self, job_id, paths, feature_view, namespace): + """Create a Kubernetes configmap for this job""" + + feature_store_configuration = yaml.dump(self.repo_config.dict(by_alias=True)) + + materialization_config = yaml.dump( + {"paths": paths, "feature_view": feature_view.name} + ) + + labels = {"feast-materializer": "configmap"} + configmap_manifest = { + "kind": "ConfigMap", + "apiVersion": "v1", + "metadata": { + "name": self._configmap_name(job_id), + "labels": {**labels, **self.batch_engine_config.labels}, + }, + "data": { + "feature_store.yaml": feature_store_configuration, + "materialization_config.yaml": materialization_config, + }, + } + self.v1.create_namespaced_config_map( + namespace=namespace, + body=configmap_manifest, + ) + + def _configmap_name(self, job_id): + return f"feast-{job_id}" + + def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0): + """Create a kubernetes job definition.""" + job_env = [ + { + "name": "MINI_BATCH_SIZE", + "value": str(self.batch_engine_config.mini_batch_size), + }, + ] + # Add any Feast configured environment variables + job_env.extend(env) + + securityContextCapabilities = None + if self.batch_engine_config.include_security_context_capabilities: + securityContextCapabilities = { + "add": ["NET_BIND_SERVICE"], + "drop": ["ALL"], + } + + job_labels = {"feast-materializer": "job"} + pod_labels = {"feast-materializer": "pod"} + job_definition = { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "name": f"feast-materialization-{job_id}", + "namespace": namespace, + "labels": {**job_labels, **self.batch_engine_config.labels}, + }, + "spec": { + "ttlSecondsAfterFinished": 3600, + "backoffLimit": self.batch_engine_config.retry_limit, + "completions": pods, + "parallelism": min(pods, self.batch_engine_config.max_parallelism), + "activeDeadlineSeconds": self.batch_engine_config.active_deadline_seconds, + "completionMode": "Indexed", + "template": { + "metadata": { + "annotations": self.batch_engine_config.annotations, + "labels": {**pod_labels, **self.batch_engine_config.labels}, + }, + "spec": { + "restartPolicy": "Never", + "subdomain": f"feast-materialization-{job_id}", + "imagePullSecrets": self.batch_engine_config.image_pull_secrets, + "serviceAccountName": self.batch_engine_config.service_account_name, + "containers": [ + { + "command": ["python", "main.py"], + "env": job_env, + "image": self.batch_engine_config.image, + "imagePullPolicy": "Always", + "name": "feast", + "resources": self.batch_engine_config.resources, + "securityContext": { + "allowPrivilegeEscalation": False, + "capabilities": securityContextCapabilities, + "readOnlyRootFilesystem": False, + }, + "terminationMessagePath": "/dev/termination-log", + "terminationMessagePolicy": "File", + "volumeMounts": [ + { + "mountPath": "/var/feast/", + "name": self._configmap_name(job_id), + }, + ], + } + ], + "volumes": [ + { + "configMap": { + "defaultMode": 420, + "name": self._configmap_name(job_id), + }, + "name": "python-files", + }, + { + "configMap": {"name": self._configmap_name(job_id)}, + "name": self._configmap_name(job_id), + }, + ], + }, + }, + }, + } + utils.create_from_dict(self.k8s_client, job_definition) diff --git a/sdk/python/feast/infra/materialization/kubernetes/k8s_materialization_job.py b/sdk/python/feast/infra/materialization/kubernetes/k8s_materialization_job.py new file mode 100644 index 00000000000..612b20155d4 --- /dev/null +++ b/sdk/python/feast/infra/materialization/kubernetes/k8s_materialization_job.py @@ -0,0 +1,62 @@ +from typing import Optional + +from kubernetes import client + +from feast.infra.materialization.batch_materialization_engine import ( + MaterializationJob, + MaterializationJobStatus, +) + + +class KubernetesMaterializationJob(MaterializationJob): + def __init__( + self, + job_id: str, + namespace: str, + error: Optional[BaseException] = None, + ): + super().__init__() + self._job_id = job_id + self.namespace = namespace + self._error: Optional[BaseException] = error + self.batch_v1 = client.BatchV1Api() + + def error(self): + return self._error + + def status(self): + if self._error is not None: + return MaterializationJobStatus.ERROR + else: + job_status = self.batch_v1.read_namespaced_job_status( + self.job_id(), self.namespace + ).status + if job_status.active is not None: + if job_status.completion_time is None: + return MaterializationJobStatus.RUNNING + else: + if ( + job_status.completion_time is not None + and job_status.conditions[0].type == "Complete" + ): + return MaterializationJobStatus.SUCCEEDED + + if ( + job_status.conditions is not None + and job_status.conditions[0].type == "Failed" + ): + self._error = Exception( + f"Job {self.job_id()} failed with reason: " + f"{job_status.conditions[0].message}" + ) + return MaterializationJobStatus.ERROR + return MaterializationJobStatus.WAITING + + def should_be_retried(self): + return False + + def job_id(self): + return f"feast-materialization-{self._job_id}" + + def url(self): + return None diff --git a/sdk/python/feast/infra/materialization/kubernetes/k8s_materialization_task.py b/sdk/python/feast/infra/materialization/kubernetes/k8s_materialization_task.py new file mode 100644 index 00000000000..607dcb5b260 --- /dev/null +++ b/sdk/python/feast/infra/materialization/kubernetes/k8s_materialization_task.py @@ -0,0 +1,10 @@ +from feast.infra.materialization.batch_materialization_engine import MaterializationTask + + +class KubernetesMaterializationTask(MaterializationTask): + def __init__(self, project, feature_view, start_date, end_date, tqdm): + self.project = project + self.feature_view = feature_view + self.start_date = start_date + self.end_date = end_date + self.tqdm = tqdm diff --git a/sdk/python/feast/infra/materialization/kubernetes/main.py b/sdk/python/feast/infra/materialization/kubernetes/main.py new file mode 100644 index 00000000000..d80cad3edb3 --- /dev/null +++ b/sdk/python/feast/infra/materialization/kubernetes/main.py @@ -0,0 +1,85 @@ +import logging +import os +from typing import List + +import pyarrow as pa +import pyarrow.parquet as pq +import yaml + +from feast import FeatureStore, FeatureView, RepoConfig +from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping + +logger = logging.getLogger(__name__) +DEFAULT_BATCH_SIZE = 1000 + + +class KubernetesMaterializer: + def __init__( + self, + config: RepoConfig, + feature_view: FeatureView, + paths: List[str], + worker_index: int, + ): + self.config = config + self.feature_store = FeatureStore(config=config) + + self.feature_view = feature_view + self.worker_index = worker_index + self.paths = paths + self.mini_batch_size = int(os.getenv("MINI_BATCH_SIZE", DEFAULT_BATCH_SIZE)) + + def process_path(self, path): + logger.info(f"Processing path {path}") + dataset = pq.ParquetDataset(path, use_legacy_dataset=False) + batches = [] + for fragment in dataset.fragments: + for batch in fragment.to_table().to_batches( + max_chunksize=self.mini_batch_size + ): + batches.append(batch) + return batches + + def run(self): + for mini_batch in self.process_path(self.paths[self.worker_index]): + table: pa.Table = pa.Table.from_batches([mini_batch]) + + if self.feature_view.batch_source.field_mapping is not None: + table = _run_pyarrow_field_mapping( + table, self.feature_view.batch_source.field_mapping + ) + join_key_to_value_type = { + entity.name: entity.dtype.to_value_type() + for entity in self.feature_view.entity_columns + } + rows_to_write = _convert_arrow_to_proto( + table, self.feature_view, join_key_to_value_type + ) + self.feature_store._get_provider().online_write_batch( + config=self.config, + table=self.feature_view, + data=rows_to_write, + progress=None, + ) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + + with open("/var/feast/feature_store.yaml") as f: + feast_config = yaml.load(f, Loader=yaml.Loader) + + with open("/var/feast/materialization_config.yaml") as b: + materialization_cfg = yaml.load(b, Loader=yaml.Loader) + + config = RepoConfig(**feast_config) + store = FeatureStore(config=config) + + KubernetesMaterializer( + config=config, + feature_view=store.get_feature_view( + materialization_cfg["feature_view"] + ), + paths=materialization_cfg["paths"], + worker_index=int(os.environ["JOB_COMPLETION_INDEX"]), + ).run() diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index fe3491c6fe6..d818eb013d5 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -47,6 +47,7 @@ "snowflake.engine": "feast.infra.materialization.snowflake_engine.SnowflakeMaterializationEngine", "lambda": "feast.infra.materialization.aws_lambda.lambda_engine.LambdaMaterializationEngine", "bytewax": "feast.infra.materialization.contrib.bytewax.bytewax_materialization_engine.BytewaxMaterializationEngine", + "k8s.engine": "feast.infra.materialization.kubernetes.kubernetes_materialization_engine.KubernetesMaterializationEngine", "spark.engine": "feast.infra.materialization.contrib.spark.spark_materialization_engine.SparkMaterializationEngine", } diff --git a/sdk/python/tests/integration/materialization/kubernetes/README.md b/sdk/python/tests/integration/materialization/kubernetes/README.md new file mode 100644 index 00000000000..4ed5d49a680 --- /dev/null +++ b/sdk/python/tests/integration/materialization/kubernetes/README.md @@ -0,0 +1,22 @@ +# Running Bytewax integration tests + +To run the Bytewax integration tests, you'll need to provision a cluster using [eksctl.](https://docs.aws.amazon.com/eks/latest/userguide/eksctl.html). + +## Creating an EKS cluster + +In this directory is a configuration file for a single-node EKS cluster + +To create the EKS cluster needed for testing, issue the following command: + +``` shell +> eksctl create cluster -f ./eks-config.yaml +``` + +When the tests are complete, delete the created cluster with: + +``` shell +> eksctl delete cluster bytewax-feast-cluster +``` + + + diff --git a/sdk/python/tests/integration/materialization/kubernetes/eks-config.yaml b/sdk/python/tests/integration/materialization/kubernetes/eks-config.yaml new file mode 100644 index 00000000000..5f8d0655aac --- /dev/null +++ b/sdk/python/tests/integration/materialization/kubernetes/eks-config.yaml @@ -0,0 +1,13 @@ +apiVersion: eksctl.io/v1alpha5 +kind: ClusterConfig + +metadata: + name: bytewax-feast-cluster + version: "1.22" + region: us-west-2 + +managedNodeGroups: +- name: ng-1 + instanceType: c6a.large + desiredCapacity: 1 + privateNetworking: true diff --git a/sdk/python/tests/integration/materialization/kubernetes/test_k8s.py b/sdk/python/tests/integration/materialization/kubernetes/test_k8s.py new file mode 100644 index 00000000000..8a4a38915cb --- /dev/null +++ b/sdk/python/tests/integration/materialization/kubernetes/test_k8s.py @@ -0,0 +1,65 @@ +from datetime import timedelta + +import pytest + +from feast import Entity, Feature, FeatureView, ValueType +from tests.data.data_creator import create_basic_driver_dataset +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, + RegistryLocation, +) +from tests.integration.feature_repos.repo_configuration import ( + construct_test_environment, +) +from tests.integration.feature_repos.universal.data_sources.redshift import ( + RedshiftDataSourceCreator, +) +from tests.utils.e2e_test_validation import validate_offline_online_store_consistency + + +@pytest.mark.integration +@pytest.mark.skip(reason="Run this test manually after creating an EKS cluster.") +def test_kubernetes_materialization(): + config = IntegrationTestRepoConfig( + provider="aws", + online_store={"type": "dynamodb", "region": "us-west-2"}, + offline_store_creator=RedshiftDataSourceCreator, + batch_engine={"type": "k8s.engine"}, + registry_location=RegistryLocation.S3, + ) + env = construct_test_environment(config, None) + + df = create_basic_driver_dataset() + ds = env.data_source_creator.create_data_source( + df, + env.feature_store.project, + field_mapping={"ts_1": "ts"}, + ) + + fs = env.feature_store + driver = Entity( + name="driver_id", + join_key="driver_id", + value_type=ValueType.INT64, + ) + + driver_stats_fv = FeatureView( + name="driver_hourly_stats", + entities=["driver_id"], + ttl=timedelta(weeks=52), + features=[Feature(name="value", dtype=ValueType.FLOAT)], + batch_source=ds, + ) + + try: + fs.apply([driver, driver_stats_fv]) + + # materialization is run in two steps and + # we use timestamp from generated dataframe as a split point + split_dt = df["ts_1"][4].to_pydatetime() - timedelta(seconds=1) + + print(f"Split datetime: {split_dt}") + + validate_offline_online_store_consistency(fs, driver_stats_fv, split_dt) + finally: + fs.teardown() diff --git a/setup.py b/setup.py index f94fb25bb55..9f1676524f0 100644 --- a/setup.py +++ b/setup.py @@ -93,6 +93,8 @@ BYTEWAX_REQUIRED = ["bytewax==0.15.1", "docker>=5.0.2", "kubernetes<=20.13.0"] +KUBERNETES_REQUIRED = ["kubernetes<=20.13.0"] + SNOWFLAKE_REQUIRED = [ "snowflake-connector-python[pandas]>=3.7,<4", ] @@ -147,9 +149,7 @@ "grpcio-health-checking>=1.56.2,<2", ] -DUCKDB_REQUIRED = [ - "ibis-framework[duckdb]" -] +DUCKDB_REQUIRED = ["ibis-framework[duckdb]"] CI_REQUIRED = ( [ @@ -197,6 +197,7 @@ + REDIS_REQUIRED + AWS_REQUIRED + BYTEWAX_REQUIRED + + KUBERNETES_REQUIRED + SNOWFLAKE_REQUIRED + SPARK_REQUIRED + POSTGRES_REQUIRED @@ -359,6 +360,7 @@ def run(self): "gcp": GCP_REQUIRED, "aws": AWS_REQUIRED, "bytewax": BYTEWAX_REQUIRED, + "k8s": KUBERNETES_REQUIRED, "redis": REDIS_REQUIRED, "snowflake": SNOWFLAKE_REQUIRED, "spark": SPARK_REQUIRED, @@ -374,7 +376,7 @@ def run(self): "grpcio": GRPCIO_REQUIRED, "rockset": ROCKSET_REQUIRED, "ibis": IBIS_REQUIRED, - "duckdb": DUCKDB_REQUIRED + "duckdb": DUCKDB_REQUIRED, }, include_package_data=True, license="Apache",