diff --git a/.github/workflows/complete.yml b/.github/workflows/complete.yml index a748b33101..ef3715bcd8 100644 --- a/.github/workflows/complete.yml +++ b/.github/workflows/complete.yml @@ -127,21 +127,6 @@ jobs: - name: Run integration tests run: make test-java-integration - load-test: - needs: build-push-docker-images - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - uses: actions/setup-python@v2 - with: - python-version: '3.7' - - name: Run load test - run: make test-load GIT_SHA=${GITHUB_SHA} - - uses: actions/upload-artifact@v2 - with: - name: load-test-results - path: load-test-output/ - tests-docker-compose: needs: - build-push-docker-images diff --git a/core/src/main/java/feast/core/validators/Matchers.java b/core/src/main/java/feast/core/validators/Matchers.java index 8ba89a8308..5f7ddd26ac 100644 --- a/core/src/main/java/feast/core/validators/Matchers.java +++ b/core/src/main/java/feast/core/validators/Matchers.java @@ -25,7 +25,7 @@ public class Matchers { private static Pattern BIGQUERY_TABLE_REF_REGEX = Pattern.compile("[a-zA-Z0-9-]+[:]+[a-zA-Z0-9_]+[.]+[a-zA-Z0-9_]*"); private static Pattern CLASS_PATH_REGEX = - Pattern.compile("[a-zA-Z_$][a-zA-Z0-9_$]*(\\.[a-zA-Z_$][a-zA-Z0-9_$]*)"); + Pattern.compile("[a-zA-Z_][a-zA-Z0-9_]*(\\.[a-zA-Z_][a-zA-Z0-9_]*)*$"); private static Pattern UPPER_SNAKE_CASE_REGEX = Pattern.compile("^[A-Z0-9]+(_[A-Z0-9]+)*$"); private static Pattern LOWER_SNAKE_CASE_REGEX = Pattern.compile("^[a-z0-9]+(_[a-z0-9]+)*$"); private static Pattern VALID_CHARACTERS_REGEX = Pattern.compile("^[a-zA-Z_][a-zA-Z0-9_]*$"); diff --git a/core/src/test/java/feast/core/validators/MatchersTest.java b/core/src/test/java/feast/core/validators/MatchersTest.java index eb9d987cbd..fdf4d0469d 100644 --- a/core/src/test/java/feast/core/validators/MatchersTest.java +++ b/core/src/test/java/feast/core/validators/MatchersTest.java @@ -18,6 +18,7 @@ import static feast.core.validators.Matchers.checkLowerSnakeCase; import static feast.core.validators.Matchers.checkUpperSnakeCase; +import static feast.core.validators.Matchers.checkValidClassPath; import com.google.common.base.Strings; import org.junit.Rule; @@ -70,4 +71,22 @@ public void checkLowerSnakeCaseShouldThrowIllegalArgumentExceptionWithFieldForIn String in = "Invalid_feature name"; checkLowerSnakeCase(in, "feature"); } + + @Test + public void checkValidClassPathSuccess() { + checkValidClassPath("com.example.foo", "FeatureTable"); + checkValidClassPath("com.example", "FeatureTable"); + } + + @Test + public void checkValidClassPathEmpty() { + exception.expect(IllegalArgumentException.class); + checkValidClassPath("", "FeatureTable"); + } + + @Test + public void checkValidClassPathDigits() { + exception.expect(IllegalArgumentException.class); + checkValidClassPath("123", "FeatureTable"); + } } diff --git a/docs/.gitbook/assets/feast-on-aws-1-.png b/docs/.gitbook/assets/feast-on-aws-1-.png new file mode 100644 index 0000000000..93af224ff3 Binary files /dev/null and b/docs/.gitbook/assets/feast-on-aws-1-.png differ diff --git a/docs/.gitbook/assets/feast-on-aws-2-.png b/docs/.gitbook/assets/feast-on-aws-2-.png new file mode 100644 index 0000000000..1dd09092c5 Binary files /dev/null and b/docs/.gitbook/assets/feast-on-aws-2-.png differ diff --git a/docs/.gitbook/assets/feast-on-aws-3-.png b/docs/.gitbook/assets/feast-on-aws-3-.png new file mode 100644 index 0000000000..e6de77dde9 Binary files /dev/null and b/docs/.gitbook/assets/feast-on-aws-3-.png differ diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 9c77daab85..345badc72c 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -3,9 +3,10 @@ * [Introduction](README.md) * [Why Feast?](why-feast.md) * [Getting Started](getting-started/README.md) + * [Quickstart](getting-started/quickstart.md) * [Deploying Feast](getting-started/deploying-feast/README.md) - * [Docker Compose](getting-started/deploying-feast/docker-compose.md) * [Kubernetes \(GKE\)](getting-started/deploying-feast/kubernetes.md) + * [Kubernetes \(Amazon EKS\) with Terraform](getting-started/deploying-feast/kubernetes-amazon-eks-with-terraform.md) * [Connecting to Feast](getting-started/connecting-to-feast-1/README.md) * [Python SDK](getting-started/connecting-to-feast-1/python-sdk.md) * [Feast CLI](getting-started/connecting-to-feast-1/connecting-to-feast.md) diff --git a/docs/getting-started/deploying-feast/README.md b/docs/getting-started/deploying-feast/README.md index 4a9aac11ba..c74230ba3b 100644 --- a/docs/getting-started/deploying-feast/README.md +++ b/docs/getting-started/deploying-feast/README.md @@ -1,12 +1,5 @@ # Deploying Feast -## Docker Compose - -* Fastest way to get Feast up and running. -* Provides a pre-installed Jupyter Notebook with sample code. - -{% page-ref page="docker-compose.md" %} - ## Kubernetes \(GKE\) * Recommended way to install Feast for production use. diff --git a/docs/getting-started/deploying-feast/docker-compose.md b/docs/getting-started/deploying-feast/docker-compose.md deleted file mode 100644 index ce0e42f517..0000000000 --- a/docs/getting-started/deploying-feast/docker-compose.md +++ /dev/null @@ -1,130 +0,0 @@ -# Docker Compose - -### Overview - -This guide will give a walk-though on deploying Feast using Docker Compose. - -The Docker Compose setup is recommended if you are running Feast locally to try things out. It includes a built in Jupyter Notebook Server that is preloaded with Feast example notebooks to get you started. - -## 0. Requirements - -* [Docker Compose](https://docs.docker.com/compose/install/) should be installed. -* Additional Requirements for using Feast Historical Serving: - * a [GCP service account](https://cloud.google.com/iam/docs/creating-managing-service-account-keys) that has access to [Google Cloud Storage](https://cloud.google.com/storage) and [BigQuery](https://cloud.google.com/bigquery). - * [Google Cloud SDK ](https://cloud.google.com/sdk/install)installed, authenticated, and configured to the GCP project you want to use. - -## 1. Set up environment - -Clone the latest stable version of the [Feast repository](https://github.com/gojek/feast/) and setup before we deploy: - -```text -git clone --depth 1 --branch v0.7.0 https://github.com/feast-dev/feast.git -export FEAST_REPO=$(pwd) -cd feast/infra/docker-compose -cp .env.sample .env -``` - -## 2. Start Feast for Online Serving - -Use Docker Compose deploy Feast for Online Serving only: - -```javascript -docker-compose up -``` - -{% hint style="info" %} -The Docker Compose deployment will take some time fully startup: - -* During this time you may see some connection failures and container restarts which should be automatically corrected a few minutes. -* If container restarts do not stop after 10 minutes, try redeploying by - * Terminating the current deployment with `Ctrl-C` - * Deleting any attached volumes with `docker-compose down` -v - * Redeploying with `docker-compose up` -{% endhint %} - -{% hint style="info" %} -You may see `feast_historical_serving` exiting with code 1, this expected and does not affect the functionality of Feast for Online Serving. -{% endhint %} - -Once deployed, you should be able to connect at `localhost:8888` to the bundled Jupyter Notebook Server and follow in the Online Serving sections of the example notebooks: - -{% embed url="http://localhost:8888/tree?" caption="" %} - -## 3. Start Feast for Training and Online Serving - -{% hint style="info" %} -Historical serving currently requires Google Cloud Platform to function, specifically a Service Account with access to Google Cloud Storage \(GCS\) and BigQuery. -{% endhint %} - -### 3.1 Set up Google Cloud Platform - -Create a service account for Feast to use. Make sure to copy the JSON key to `infra/docker-compose/gcp-service-accounts/key.json` under the cloned Feast repository. - -```bash -gcloud iam service-accounts create feast-service-account - -gcloud projects add-iam-policy-binding my-gcp-project \ - --member serviceAccount:feast-service-account@my-gcp-project.iam.gserviceaccount.com \ - --role roles/editor -gcloud iam service-accounts keys create credentials.json --iam-account \ - feast-service-account@my-gcp-project.iam.gserviceaccount.com - -cp credentials.json ${FEAST_REPO}/infra/docker-compose/gcp-service-accounts/key.json -# Required to prevent permissions error in Feast Jupyter: -chown 1000:1000 ${FEAST_REPO}/infra/docker-compose/gcp-service-accounts/key.json -``` - -Create a Google Cloud Storage Bucket that Feast will use to load data into and out of BigQuery - -```bash -gsutil mb gs://my-feast-staging-bucket -``` - -Create a BigQuery Dataset for Feast to store historical data: - -```bash -bq --location=US mk --dataset my_project:feast -``` - -### 3.2 Configure Docker Compose - -Configure the `.env` file under `${FEAST_REPO}/infra/docker-compose/` based on your environment. At the very least you have to modify: - -| Parameter | Description | -| :--- | :--- | -| `FEAST_HISTORICAL_SERVING_ENABLED` | Set this to `true` to enable historical serving \(BigQuery\) | - -### 3.3 Configure Services - -The following configuration has to be set in `serving/historical-serving.yml` - -| Parameter | Description | -| :--- | :--- | -| `feast.stores.config.project_id` | This is your [GCP project Id](https://cloud.google.com/resource-manager/docs/creating-managing-projects). | -| `feast.stores.config.dataset_id` | This is the **dataset name** of the BigQuery dataset to use. | -| `feast.stores.config.staging_location` | This is the staging location on Google Cloud Storage for retrieval of training datasets. Make sure you append a suffix \(ie `gs://mybucket/suffix`\) | - -The following configuration has to be set in `jobcontroller/jobcontroller.yml` - -| Parameter | Description | -| :--- | :--- | -| `feast.jobs.runners.options.tempLocation` | Beam ingestion jobs will persist data here before loading it into BigQuery. Use the same bucket as above and make sure you append a different suffix \(ie `gs://mybucket/anothersuffix`\). | - -### 3.4 Start Feast - -Use Docker Compose deploy Feast: - -```javascript -docker-compose up -``` - -Once deployed, you should be able to connect at `localhost:8888` to the bundled Jupyter Notebook Server with example notebooks. - -{% embed url="http://localhost:8888/tree?" caption="" %} - -## 6. Further Reading - -* [Feast Concepts](../../concepts/overview.md) -* [Feast Examples/Tutorials](https://github.com/feast-dev/feast/tree/master/examples) -* [Configuring Feast Components](../../reference/configuration-reference.md) - diff --git a/docs/getting-started/deploying-feast/kubernetes-amazon-eks-with-terraform.md b/docs/getting-started/deploying-feast/kubernetes-amazon-eks-with-terraform.md new file mode 100644 index 0000000000..a6a8206837 --- /dev/null +++ b/docs/getting-started/deploying-feast/kubernetes-amazon-eks-with-terraform.md @@ -0,0 +1,68 @@ +# Kubernetes \(Amazon EKS\) with Terraform + +### Overview + +This guide will give walk-through of installing Feast on AWS using our [reference terraform config](https://github.com/feast-dev/feast/tree/master/infra/terraform/aws). + +{% hint style="info" %} +The terraform config used here is a greenfield installation that doesn't assume anything about, and doesn't integrate with existing resources in your AWS account. It makes this an easy way to get started, but you will likely want to customize this setup before using Feast in production. +{% endhint %} + +This terraform config will create the following resoures: + +* Kubernetes cluster on Amazon EKS \(3x r3.large nodes\) +* Kafka managed by Amazon MSK \(2x kafka.t3.small nodes\) +* Postgres database for Feast metadata, using serverless Aurora \(min capacity: 2\) +* Redis cluster, using Amazon Elasticache \(1x cache.t2.micro\) +* Amazon EMR cluster to run Spark \(3x spot m4.xlarge\) +* Staging S3 bucket to store temporary data + +![](../../.gitbook/assets/feast-on-aws-3-.png) + +## 0. Requirements + +* An AWS account and [credentials configured locally](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html) +* [terraform](https://www.terraform.io/) >= 0.12 \(tested with 0.13.3\) +* [helm](https://helm.sh/docs/intro/install/) \(tested with v3.3.4\) + +## 1. Configure terraform + +Under`feast/infra/terraform/aws`create a `.tfvars` file. The name does not matter, let's call it `my_feast.tfvars`. You can see the full list of configuration variables in `variables.tf` . At very least we need to set `name_prefix` and AWS region: + +{% code title="my\_feast.tfvars" %} +```typescript +name_prefix = "my-feast" +region = "us-east-1" +``` +{% endcode %} + +## 2. Apply + +Once you're happy with the configuration you can init terraform and apply + +```bash +$ cd feast/infra/terraform/aws +$ terraform init +$ terraform apply -var-file=my_feast.tfvars +``` + +This might take a while but in the end everything should succeed. You'll also see a kubectl config file created in this directory. Its name will start with `kubeconfig_` and end with a random suffix. + +## 3. Connect to Feast using Jupyter + +Once the pods are all running we can connect to the Jupyter notebook server running in the cluster. + +To be able to connect to the remote Feast server we just set up, you need to forward a port from remote k8s cluster to your local machine. Replace `kubeconfig_XXXXXXX` below with the file name of the kubeconfig generated for you by terraform. + +```bash +KUBECONFIG=kubeconfig_XXXXXXX kubectl port-forward \ +$(kubectl get pod -o custom-columns=:metadata.name | grep jupyter) 8888:8888 +``` + +```text +Forwarding from 127.0.0.1:8888 -> 8888 +Forwarding from [::1]:8888 -> 8888 +``` + +You should be able to connect at `localhost:8888` to the bundled Jupyter Notebook Server with example notebooks. + diff --git a/docs/getting-started/quickstart.md b/docs/getting-started/quickstart.md new file mode 100644 index 0000000000..63b875ed2e --- /dev/null +++ b/docs/getting-started/quickstart.md @@ -0,0 +1,55 @@ +# Quickstart + +## Overview + +This guide will give a walkthrough on deploying Feast using Docker Compose, which allows the user to quickly explore the functionalities in Feast with minimal infrastructure setup. It includes a built in Jupyter Notebook Server that is preloaded with PySpark and Feast SDK, as well as Feast example notebooks to get you started. + +## 0. Requirements + +* [Docker Compose](https://docs.docker.com/compose/install/) should be installed. +* Optional dependancies: + * a [GCP service account](https://cloud.google.com/iam/docs/creating-managing-service-account-keys) that has access to [Google Cloud Storage](https://cloud.google.com/storage). + +## 1. Set up environment + +Clone the latest stable version of the [Feast repository](https://github.com/gojek/feast/) and setup before we deploy: + +```text +git clone https://github.com/feast-dev/feast.git +cd feast/infra/docker-compose +cp .env.sample .env +``` + +## 2. Start Feast Services + +Start the Feast services. Make sure that the following ports are free on the host machines: 6565, 6566, 8888, 9094, 5432. Alternatively, change the port mapping to use a different port on the host. + +```javascript +docker-compose up -d +``` + +{% hint style="info" %} +The Docker Compose deployment will take some time fully startup: + +* During this time Feast Serving container may restart, which should be automatically corrected after Feast Core is up and ready. +* If container restarts do not stop after 10 minutes, check the docker compose log to see if there is any error that prevents Feast Core from starting successfully. +{% endhint %} + +Once deployed, you should be able to connect at `localhost:8888` to the bundled Jupyter Notebook Server and follow the example notebooks: + +{% embed url="http://localhost:8888/tree?" caption="" %} + +## 3. Optional dependancies + +### 3.1 Set up Google Cloud Platform + +The example Jupyter notebook does not require any GCP dependancies by default. If you would like to modify the example such that a GCP service is required \(eg. Google Cloud Storage\), you would need to set up a [service account](https://cloud.google.com/iam/docs/creating-managing-service-accounts) that is associated with the notebook. Make sure that the service account has sufficient privileges to access the required GCP services. + +Once the service account is created, download the associated JSON key file and copy the file to the path configured in `.env` , under `GCP_SERVICE_ACCOUNT` . + +## 4. Further Reading + +* [Feast Concepts](../concepts/overview.md) +* [Feast Examples/Tutorials](https://github.com/feast-dev/feast/tree/master/examples) +* [Configuring Feast Components](../reference/configuration-reference.md) + diff --git a/examples/basic/README.md b/examples/basic/README.md deleted file mode 100644 index ce8453a324..0000000000 --- a/examples/basic/README.md +++ /dev/null @@ -1,10 +0,0 @@ -# Feast Basic Customer Transactions Example - -This is a minimal example of using Feast. In this example we will -1. Create a synthetic customer feature dataset -2. Register a feature set to represent these features in Feast -3. Ingest these features into Feast -4. Create a feature query to retrieve historical feature data -5. Create a feature query to retrieve online feature data - -Please ensure that Feast is already installed and running before starting this example. \ No newline at end of file diff --git a/examples/basic/basic.ipynb b/examples/basic/basic.ipynb deleted file mode 100644 index 318ccfd3bd..0000000000 --- a/examples/basic/basic.ipynb +++ /dev/null @@ -1,757 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Feast Basic Customer Transactions Example" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "This is a minimal example of using Feast. The point is to show users how to get data into Feast and how to retrieve features for online serving and model training.\n", - "\n", - "In this example we will\n", - "1. Create a synthetic customer feature dataset\n", - "2. Register a feature set to represent these features in Feast\n", - "3. Ingest these features into Feast\n", - "4. Create a feature query and retrieve online feature data\n", - "5. Create a feature query and retrieve historical feature data" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### 0. Configuration" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "metadata": {}, - "outputs": [], - "source": [ - "import os\n", - "\n", - "# Feast Core acts as the central feature registry\n", - "FEAST_CORE_URL = os.getenv('FEAST_CORE_URL', 'localhost:6565')\n", - "\n", - "# Feast Online Serving allows for the retrieval of real-time feature data\n", - "FEAST_ONLINE_SERVING_URL = os.getenv('FEAST_ONLINE_SERVING_URL', 'localhost:6566')\n", - "\n", - "# Feast Batch Serving allows for the retrieval of historical feature data\n", - "FEAST_HISTORICAL_SERVING_URL = os.getenv('FEAST_HISTORICAL_SERVING_URL', 'localhost:6567')" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### 1. Install Feast SDK" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Install from PyPi" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "!pip install feast" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### 2. Import necessary modules" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "metadata": {}, - "outputs": [], - "source": [ - "import pandas as pd\n", - "import numpy as np\n", - "from pytz import timezone, utc\n", - "from feast import Client, FeatureSet, Entity, ValueType\n", - "from feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest\n", - "from feast.types.Value_pb2 import Value as Value\n", - "from google.protobuf.duration_pb2 import Duration\n", - "from datetime import datetime, timedelta\n", - "from random import randrange\n", - "import random" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### 3. Configure Feast services and connect the Feast client\n", - "\n", - "Connect to Feast Core and Feast Online Serving" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "metadata": {}, - "outputs": [], - "source": [ - "client = Client(core_url=FEAST_CORE_URL, serving_url=FEAST_ONLINE_SERVING_URL)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### 4. Create customer features" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Here we will create customer features for 5 customers over a month. Each customer will have a set of features for every day." - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "metadata": {}, - "outputs": [], - "source": [ - "days = [datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0).replace(tzinfo=utc) \\\n", - " - timedelta(day) for day in range(10)][::-1]\n", - "\n", - "customers = [1001, 1002, 1003, 1004, 1005]" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - " datetime customer_id daily_transactions \\\n", - "0 2020-06-17 00:00:00+00:00 1001 4.900417 \n", - "1 2020-06-17 00:00:00+00:00 1002 7.440329 \n", - "2 2020-06-17 00:00:00+00:00 1003 4.224760 \n", - "3 2020-06-17 00:00:00+00:00 1004 5.482722 \n", - "4 2020-06-17 00:00:00+00:00 1005 2.200896 \n", - "5 2020-06-18 00:00:00+00:00 1001 8.173628 \n", - "6 2020-06-18 00:00:00+00:00 1002 3.164327 \n", - "7 2020-06-18 00:00:00+00:00 1003 7.248387 \n", - "8 2020-06-18 00:00:00+00:00 1004 9.274397 \n", - "9 2020-06-18 00:00:00+00:00 1005 7.846449 \n", - "10 2020-06-19 00:00:00+00:00 1001 9.028874 \n", - "11 2020-06-19 00:00:00+00:00 1002 5.140390 \n", - "12 2020-06-19 00:00:00+00:00 1003 4.537877 \n", - "13 2020-06-19 00:00:00+00:00 1004 6.797491 \n", - "14 2020-06-19 00:00:00+00:00 1005 8.234574 \n", - "15 2020-06-20 00:00:00+00:00 1001 8.319164 \n", - "16 2020-06-20 00:00:00+00:00 1002 7.158817 \n", - "17 2020-06-20 00:00:00+00:00 1003 4.920308 \n", - "18 2020-06-20 00:00:00+00:00 1004 7.974404 \n", - "19 2020-06-20 00:00:00+00:00 1005 2.298012 \n", - "\n", - " total_transactions \n", - "0 45 \n", - "1 77 \n", - "2 8 \n", - "3 40 \n", - "4 53 \n", - "5 33 \n", - "6 93 \n", - "7 68 \n", - "8 53 \n", - "9 11 \n", - "10 19 \n", - "11 2 \n", - "12 1 \n", - "13 59 \n", - "14 95 \n", - "15 37 \n", - "16 93 \n", - "17 73 \n", - "18 46 \n", - "19 12 \n" - ] - } - ], - "source": [ - "customer_features = pd.DataFrame(\n", - " {\n", - " \"datetime\": [day for day in days for customer in customers], # Datetime is required\n", - " \"customer_id\": [customer for day in days for customer in customers], # Customer is the entity\n", - " \"daily_transactions\": [np.random.rand() * 10 for _ in range(len(days) * len(customers))], # Feature 1\n", - " \"total_transactions\": [np.random.randint(100) for _ in range(len(days) * len(customers))], # Feature 2\n", - " }\n", - ")\n", - "\n", - "print(customer_features.head(20))" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### 5. Create feature set for customer features" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Now we will create a feature set for these features. Feature sets are essentially a schema that represent\n", - "feature values. Feature sets allow Feast to both identify feature values and their structure. The following feature set contains no features yet." - ] - }, - { - "cell_type": "code", - "execution_count": 6, - "metadata": {}, - "outputs": [], - "source": [ - "customer_fs = FeatureSet(\n", - " \"customer_transactions\",\n", - " entities=[Entity(name='customer_id', dtype=ValueType.INT64)]\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Here we are automatically inferring the schema from the provided dataset. The two features from the dataset will be added to the feature set" - ] - }, - { - "cell_type": "code", - "execution_count": 7, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Feature daily_transactions (ValueType.DOUBLE) added from dataframe.\n", - "Feature total_transactions (ValueType.INT64) added from dataframe.\n", - "\n" - ] - } - ], - "source": [ - "customer_fs.infer_fields_from_df(customer_features, replace_existing_features=True)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### 6. Register feature set with Feast Core" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The apply() method will register the provided feature set with Feast Core (the feature registry)." - ] - }, - { - "cell_type": "code", - "execution_count": 8, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Feature set created: \"customer_transactions\"\n" - ] - } - ], - "source": [ - "client.apply(customer_fs)" - ] - }, - { - "cell_type": "code", - "execution_count": 9, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "{\n", - " \"spec\": {\n", - " \"name\": \"customer_transactions\",\n", - " \"entities\": [\n", - " {\n", - " \"name\": \"customer_id\",\n", - " \"valueType\": \"INT64\"\n", - " }\n", - " ],\n", - " \"features\": [\n", - " {\n", - " \"name\": \"daily_transactions\",\n", - " \"valueType\": \"DOUBLE\"\n", - " },\n", - " {\n", - " \"name\": \"total_transactions\",\n", - " \"valueType\": \"INT64\"\n", - " }\n", - " ],\n", - " \"source\": {\n", - " \"type\": \"KAFKA\",\n", - " \"kafkaSourceConfig\": {\n", - " \"bootstrapServers\": \"localhost:9094\",\n", - " \"topic\": \"feast-features\"\n", - " }\n", - " },\n", - " \"project\": \"default\"\n", - " },\n", - " \"meta\": {\n", - " \"createdTimestamp\": \"2020-06-26T12:27:17Z\",\n", - " \"status\": \"STATUS_PENDING\"\n", - " }\n", - "}\n" - ] - } - ], - "source": [ - "customer_fs = client.get_feature_set(\"customer_transactions\")\n", - "print(customer_fs)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### 7. Ingest data into Feast for a feature set" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Next we will ingest/load data into Feast. This process populates all registered stores (BigQuery, Redis) with your feature data." - ] - }, - { - "cell_type": "code", - "execution_count": 10, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Waiting for feature set to be ready for ingestion...\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "100%|██████████| 50/50 [00:01<00:00, 47.23rows/s]" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Ingestion complete!\n", - "\n", - "Ingestion statistics:\n", - "Success: 50/50\n", - "Removing temporary file(s)...\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "\n" - ] - }, - { - "data": { - "text/plain": [ - "'5e650050-f41d-39fc-bc56-d602c4a478d2'" - ] - }, - "execution_count": 10, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "client.ingest(\"customer_transactions\", customer_features)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### 8. Retrieve online features" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The example below retrieves online features for a single customer: \"1001\". Retrieval of features is not limited to a single feature set. Users can provide any features as long as they are present on the provided entities." - ] - }, - { - "cell_type": "code", - "execution_count": 11, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "[fields {\n", - " key: \"customer_id\"\n", - " value {\n", - " int64_val: 1001\n", - " }\n", - "}\n", - "fields {\n", - " key: \"daily_transactions\"\n", - " value {\n", - " double_val: 0.12021977894872915\n", - " }\n", - "}\n", - "fields {\n", - " key: \"total_transactions\"\n", - " value {\n", - " int64_val: 0\n", - " }\n", - "}\n", - "statuses {\n", - " key: \"customer_id\"\n", - " value: PRESENT\n", - "}\n", - "statuses {\n", - " key: \"daily_transactions\"\n", - " value: PRESENT\n", - "}\n", - "statuses {\n", - " key: \"total_transactions\"\n", - " value: PRESENT\n", - "}\n", - "]\n" - ] - } - ], - "source": [ - "online_features = client.get_online_features(\n", - " feature_refs=[\n", - " \"daily_transactions\",\n", - " \"total_transactions\",\n", - " ],\n", - " entity_rows=[\n", - " {\n", - " \"customer_id\": 1001\n", - " }\n", - " ],\n", - ")\n", - "print(online_features.field_values)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "To retrieve the fields in dictionary format, we can utilize `to_dict()` method." - ] - }, - { - "cell_type": "code", - "execution_count": 12, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "{'daily_transactions': [0.12021977894872915],\n", - " 'total_transactions': [0],\n", - " 'customer_id': [1001]}" - ] - }, - "execution_count": 12, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "online_features.to_dict()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - " " - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### 9. Retrieve training features" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "(Requires Google Cloud Platform)\n", - "\n", - "In order to retrieve historical feature data, the user must provide an `entity_rows` DataFrame. This DataFrame contains a combination of timestamps and entities (customers, in this case).\n", - "\n", - "The timestamps correlate to the event_time that a prediction needs to be made. At each one of these points in time you need to know the \"current\" feature values.\n", - "\n", - "We will randomly generate timestamps over the last 5 days and assign `customer_ids` to them.\n", - "\n", - "When these entity rows are sent to the Feast Serving API to retrieve feature values, along with a list of feature ids, Feast is then able to attach the correct feature values to each entity row. It will join the correct feature values at each point in time for each entity onto these entity rows." - ] - }, - { - "cell_type": "code", - "execution_count": 13, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - " datetime customer_id\n", - "0 2020-06-15 10:35:10.918716+00:00 1001\n", - "1 2020-06-15 14:00:10.918758+00:00 1002\n", - "2 2020-06-17 08:59:10.918767+00:00 1003\n", - "3 2020-06-13 16:51:10.918774+00:00 1004\n", - "4 2020-06-17 06:14:10.918780+00:00 1005\n", - "5 2020-06-17 14:33:10.918786+00:00 1001\n", - "6 2020-06-14 23:15:10.918792+00:00 1002\n", - "7 2020-06-15 11:25:10.918798+00:00 1003\n", - "8 2020-06-18 09:04:10.918804+00:00 1004\n", - "9 2020-06-16 10:27:10.918810+00:00 1005\n" - ] - } - ], - "source": [ - "event_timestamps = [datetime.utcnow().replace(tzinfo=utc) - timedelta(days=randrange(5), hours=randrange(24), minutes=randrange(60)) for day in range(30)]\n", - "\n", - "entity_rows = pd.DataFrame(\n", - " {\n", - " \"datetime\": event_timestamps,\n", - " \"customer_id\": [customers[idx % len(customers)] for idx in range(len(event_timestamps))],\n", - " }\n", - ")\n", - "\n", - "print(entity_rows.head(10))" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### 10. Retrieve historical/batch features" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Next we will create a new client object, but this time we will configure it to connect to the Feast Batch Serving. This service will allow us to retrieve historical feature data." - ] - }, - { - "cell_type": "code", - "execution_count": 15, - "metadata": {}, - "outputs": [], - "source": [ - "batch_client = Client(core_url=FEAST_CORE_URL, serving_url=FEAST_HISTORICAL_SERVING_URL)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "By calling the `get_historical_features` method we are able to retrieve a `job` object. This object can be used to retrieve the resulting training dataset that is exported by Feast. \n", - "\n", - "The dataset that is returned will contain feature values for each entity and timestamp combination in `entity_rows`." - ] - }, - { - "cell_type": "code", - "execution_count": 16, - "metadata": { - "scrolled": true - }, - "outputs": [], - "source": [ - "job = batch_client.get_historical_features(\n", - " feature_refs=[\n", - " \"daily_transactions\",\n", - " \"total_transactions\",\n", - " ],\n", - " entity_rows=entity_rows\n", - " )" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Once the job is complete, it is possible to retrieve the exported data (from Google Cloud Storage) and load it into memory as a Pandas Dataframe." - ] - }, - { - "cell_type": "code", - "execution_count": 17, - "metadata": {}, - "outputs": [], - "source": [ - "df = job.to_dataframe()" - ] - }, - { - "cell_type": "code", - "execution_count": 18, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - " event_timestamp customer_id daily_transactions \\\n", - "0 2020-06-13 23:45:10.918874+00:00 1001 1.879220 \n", - "1 2020-06-18 12:11:10.918845+00:00 1001 5.122846 \n", - "2 2020-06-17 20:46:10.918903+00:00 1001 2.145294 \n", - "3 2020-06-18 02:50:10.918816+00:00 1001 5.122846 \n", - "4 2020-06-15 10:35:10.918716+00:00 1001 5.758472 \n", - "5 2020-06-17 14:33:10.918786+00:00 1001 2.145294 \n", - "6 2020-06-14 23:15:10.918792+00:00 1002 5.467141 \n", - "7 2020-06-14 07:22:10.918851+00:00 1002 5.467141 \n", - "8 2020-06-17 23:40:10.918880+00:00 1002 3.338614 \n", - "9 2020-06-15 14:00:10.918758+00:00 1002 4.921264 \n", - "10 2020-06-15 18:22:10.918909+00:00 1002 4.921264 \n", - "11 2020-06-16 21:10:10.918822+00:00 1002 1.838186 \n", - "12 2020-06-18 05:47:10.918886+00:00 1003 2.702916 \n", - "13 2020-06-17 08:59:10.918767+00:00 1003 0.211125 \n", - "14 2020-06-15 11:25:10.918798+00:00 1003 4.476252 \n", - "15 2020-06-16 09:56:10.918857+00:00 1003 9.123597 \n", - "16 2020-06-14 11:39:10.918915+00:00 1003 6.353373 \n", - "17 2020-06-15 03:21:10.918828+00:00 1003 4.476252 \n", - "18 2020-06-18 09:04:10.918804+00:00 1004 8.756623 \n", - "19 2020-06-14 14:18:10.918834+00:00 1004 8.647374 \n", - "20 2020-06-17 03:10:10.918863+00:00 1004 2.377199 \n", - "21 2020-06-13 16:51:10.918774+00:00 1004 6.362085 \n", - "22 2020-06-15 03:54:10.918892+00:00 1004 8.235070 \n", - "23 2020-06-17 19:01:10.918921+00:00 1004 2.377199 \n", - "24 2020-06-17 06:14:10.918780+00:00 1005 9.958688 \n", - "25 2020-06-16 08:23:10.918839+00:00 1005 0.006388 \n", - "26 2020-06-16 00:30:10.918927+00:00 1005 0.006388 \n", - "27 2020-06-16 10:27:10.918810+00:00 1005 0.006388 \n", - "28 2020-06-17 01:50:10.918869+00:00 1005 9.958688 \n", - "29 2020-06-17 08:42:10.918897+00:00 1005 9.958688 \n", - "\n", - " total_transactions \n", - "0 7 \n", - "1 96 \n", - "2 63 \n", - "3 96 \n", - "4 85 \n", - "5 63 \n", - "6 10 \n", - "7 10 \n", - "8 50 \n", - "9 55 \n", - "10 55 \n", - "11 83 \n", - "12 50 \n", - "13 96 \n", - "14 61 \n", - "15 85 \n", - "16 69 \n", - "17 61 \n", - "18 84 \n", - "19 95 \n", - "20 25 \n", - "21 2 \n", - "22 58 \n", - "23 25 \n", - "24 6 \n", - "25 36 \n", - "26 36 \n", - "27 36 \n", - "28 6 \n", - "29 6 \n" - ] - } - ], - "source": [ - "print(df.head(50))" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The training dataset is staged on Google Cloud Storage and can be accessed directly if it is too large to load into memory" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "job.get_avro_files()" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "local-feast", - "language": "python", - "name": "local-feast" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.7.6" - } - }, - "nbformat": 4, - "nbformat_minor": 4 -} \ No newline at end of file diff --git a/examples/demo/Feast 101.ipynb b/examples/minimal/Feast 101.ipynb similarity index 100% rename from examples/demo/Feast 101.ipynb rename to examples/minimal/Feast 101.ipynb diff --git a/examples/minimal/README.md b/examples/minimal/README.md new file mode 100644 index 0000000000..dd8a6f9dc2 --- /dev/null +++ b/examples/minimal/README.md @@ -0,0 +1,11 @@ +# Feast Minimal Ride-Hailing Example + +This is a minimal example of using Feast. In this example, we will walk through the following: + +1. Create and register an Entity, and Feature Table. +2. Retrieve and inspect registered Feature Tables. +3. Generate mock data and ingest into Feature Table's GCS batch source. +4. Perform historical retrieval with data from Feature Table's GCS batch source. +5. Populate online storage with data from Feature Table's GCS batch source. +6. Populate online storage with data from Feature Table's Kafka stream source. +7. Perform online retrieval using Feature References. diff --git a/examples/demo/images/data-flow.png b/examples/minimal/images/data-flow.png similarity index 100% rename from examples/demo/images/data-flow.png rename to examples/minimal/images/data-flow.png diff --git a/examples/demo/images/features-join.png b/examples/minimal/images/features-join.png similarity index 100% rename from examples/demo/images/features-join.png rename to examples/minimal/images/features-join.png diff --git a/examples/demo/images/pit-1.png b/examples/minimal/images/pit-1.png similarity index 100% rename from examples/demo/images/pit-1.png rename to examples/minimal/images/pit-1.png diff --git a/examples/demo/images/pit-2.png b/examples/minimal/images/pit-2.png similarity index 100% rename from examples/demo/images/pit-2.png rename to examples/minimal/images/pit-2.png diff --git a/infra/terraform/aws/helm.tf b/infra/terraform/aws/helm.tf index 34d5e0c521..f38f6d2c45 100644 --- a/infra/terraform/aws/helm.tf +++ b/infra/terraform/aws/helm.tf @@ -77,22 +77,23 @@ locals { redis_port = 6379 } } + } + } - "feast-jupyter" = { - "envOverrides" = { - feast_redis_host = module.redis.endpoint - feast_redis_port = 6379 - feast_redis_ssl = true - feast_emr_cluster_id = (length(aws_emr_cluster.persistent_cluster) > 0) ? aws_emr_cluster.persistent_cluster[0].id : null - feast_emr_region = var.region - spark_staging_location = "s3://${aws_s3_bucket.feast_bucket.id}/artifacts/" - feast_emr_log_location = "s3://${aws_s3_bucket.feast_bucket.id}/emr-logs/" - feast_spark_launcher = "emr" - feast_historical_feature_output_location = "s3://${aws_s3_bucket.feast_bucket.id}/out/" - feast_historical_feature_output_format = "parquet" - kafka_brokers = aws_msk_cluster.msk.bootstrap_brokers - } - } + "feast-jupyter" = { + "envOverrides" = { + feast_redis_host = module.redis.endpoint + feast_redis_port = 6379 + feast_redis_ssl = true + feast_emr_cluster_id = (length(aws_emr_cluster.persistent_cluster) > 0) ? aws_emr_cluster.persistent_cluster[0].id : null + feast_emr_region = var.region + feast_spark_staging_location = "s3://${aws_s3_bucket.feast_bucket.id}/artifacts/" + feast_emr_log_location = "s3://${aws_s3_bucket.feast_bucket.id}/emr-logs/" + feast_spark_launcher = "emr" + feast_historical_feature_output_location = "s3://${aws_s3_bucket.feast_bucket.id}/out/" + feast_historical_feature_output_format = "parquet" + demo_kafka_brokers = aws_msk_cluster.msk.bootstrap_brokers + demo_data_location = "s3://${aws_s3_bucket.feast_bucket.id}/test-data/" } } } diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 94703e6af5..ccbab92d7b 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -893,8 +893,9 @@ def get_historical_features( feature_tables = self._get_feature_tables_from_feature_refs( feature_refs, project ) - output_location = self._config.get( - CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION + output_location = os.path.join( + self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION), + str(uuid.uuid4()), ) output_format = self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT) diff --git a/sdk/python/feast/pyspark/launchers/standalone/local.py b/sdk/python/feast/pyspark/launchers/standalone/local.py index c68fa22b24..37eb20789e 100644 --- a/sdk/python/feast/pyspark/launchers/standalone/local.py +++ b/sdk/python/feast/pyspark/launchers/standalone/local.py @@ -124,6 +124,7 @@ def get_output_file_uri(self, timeout_sec: int = None): with self._process as p: try: p.wait(timeout_sec) + return self._output_file_uri except Exception: p.kill() raise SparkJobFailure("Timeout waiting for subprocess to return") diff --git a/sdk/python/feast/staging/storage_client.py b/sdk/python/feast/staging/storage_client.py index 4747a1f641..1cb250a598 100644 --- a/sdk/python/feast/staging/storage_client.py +++ b/sdk/python/feast/staging/storage_client.py @@ -270,7 +270,7 @@ def list_files(self, bucket: str, path: str) -> List[str]: raise NotImplementedError("list files not implemented for Local file") def upload_file(self, local_path: str, bucket: str, remote_path: str): - dest_fpath = "/" + remote_path + dest_fpath = remote_path if remote_path.startswith("/") else "/" + remote_path os.makedirs(os.path.dirname(dest_fpath), exist_ok=True) shutil.copy(local_path, dest_fpath) diff --git a/sdk/python/tests/test_historical_feature_retrieval.py b/sdk/python/tests/test_historical_feature_retrieval.py index 286661148a..aa833c6dfe 100644 --- a/sdk/python/tests/test_historical_feature_retrieval.py +++ b/sdk/python/tests/test_historical_feature_retrieval.py @@ -6,10 +6,14 @@ from contextlib import closing from datetime import datetime from typing import List, Tuple +from urllib.parse import urlparse import grpc +import numpy as np +import pandas as pd import pytest from google.protobuf.duration_pb2 import Duration +from pandas.util.testing import assert_frame_equal from pyspark.sql import DataFrame, SparkSession from pyspark.sql.types import ( BooleanType, @@ -19,6 +23,7 @@ StructType, TimestampType, ) +from pytz import utc from feast import Client, Entity, Feature, FeatureTable, FileSource, ValueType from feast.core import CoreService_pb2_grpc as Core @@ -82,6 +87,26 @@ def client(server): return Client(core_url=f"localhost:{free_port}") +@pytest.yield_fixture() +def client_with_local_spark(tmpdir): + import pyspark + + spark_staging_location = f"file://{os.path.join(tmpdir, 'staging')}" + historical_feature_output_location = ( + f"file://{os.path.join(tmpdir, 'historical_feature_retrieval_output')}" + ) + + return Client( + core_url=f"localhost:{free_port}", + spark_launcher="standalone", + spark_standalone_master="local", + spark_home=os.path.dirname(pyspark.__file__), + spark_staging_location=spark_staging_location, + historical_feature_output_location=historical_feature_output_location, + historical_feature_output_format="parquet", + ) + + @pytest.fixture() def driver_entity(client): return client.apply_entity(Entity("driver_id", "description", ValueType.INT32)) @@ -116,36 +141,36 @@ def transactions_feature_table(spark, client): df_data = [ ( 1001, - datetime(year=2020, month=9, day=1), - datetime(year=2020, month=9, day=1), + datetime(year=2020, month=9, day=1, tzinfo=utc), + datetime(year=2020, month=9, day=1, tzinfo=utc), 50.0, True, ), ( 1001, - datetime(year=2020, month=9, day=1), - datetime(year=2020, month=9, day=2), + datetime(year=2020, month=9, day=1, tzinfo=utc), + datetime(year=2020, month=9, day=2, tzinfo=utc), 100.0, True, ), ( 2001, - datetime(year=2020, month=9, day=1), - datetime(year=2020, month=9, day=1), + datetime(year=2020, month=9, day=1, tzinfo=utc), + datetime(year=2020, month=9, day=1, tzinfo=utc), 400.0, False, ), ( 1001, - datetime(year=2020, month=9, day=2), - datetime(year=2020, month=9, day=1), + datetime(year=2020, month=9, day=2, tzinfo=utc), + datetime(year=2020, month=9, day=1, tzinfo=utc), 200.0, False, ), ( 1001, - datetime(year=2020, month=9, day=4), - datetime(year=2020, month=9, day=1), + datetime(year=2020, month=9, day=4, tzinfo=utc), + datetime(year=2020, month=9, day=1, tzinfo=utc), 300.0, False, ), @@ -180,20 +205,20 @@ def bookings_feature_table(spark, client): df_data = [ ( 8001, - datetime(year=2020, month=9, day=1), - datetime(year=2020, month=9, day=1), + datetime(year=2020, month=9, day=1, tzinfo=utc), + datetime(year=2020, month=9, day=1, tzinfo=utc), 100, ), ( 8001, - datetime(year=2020, month=9, day=2), - datetime(year=2020, month=9, day=2), + datetime(year=2020, month=9, day=2, tzinfo=utc), + datetime(year=2020, month=9, day=2, tzinfo=utc), 150, ), ( 8002, - datetime(year=2020, month=9, day=2), - datetime(year=2020, month=9, day=2), + datetime(year=2020, month=9, day=2, tzinfo=utc), + datetime(year=2020, month=9, day=2, tzinfo=utc), 200, ), ] @@ -225,20 +250,20 @@ def bookings_feature_table_with_mapping(spark, client): df_data = [ ( 8001, - datetime(year=2020, month=9, day=1), - datetime(year=2020, month=9, day=1), + datetime(year=2020, month=9, day=1, tzinfo=utc), + datetime(year=2020, month=9, day=1, tzinfo=utc), 100, ), ( 8001, - datetime(year=2020, month=9, day=2), - datetime(year=2020, month=9, day=2), + datetime(year=2020, month=9, day=2, tzinfo=utc), + datetime(year=2020, month=9, day=2, tzinfo=utc), 150, ), ( 8002, - datetime(year=2020, month=9, day=2), - datetime(year=2020, month=9, day=2), + datetime(year=2020, month=9, day=2, tzinfo=utc), + datetime(year=2020, month=9, day=2, tzinfo=utc), 200, ), ] @@ -273,12 +298,12 @@ def test_historical_feature_retrieval_from_local_spark_session( ] ) df_data = [ - (1001, 8001, datetime(year=2020, month=9, day=1),), - (2001, 8001, datetime(year=2020, month=9, day=2),), - (2001, 8002, datetime(year=2020, month=9, day=1),), - (1001, 8001, datetime(year=2020, month=9, day=2),), - (1001, 8001, datetime(year=2020, month=9, day=3),), - (1001, 8001, datetime(year=2020, month=9, day=4),), + (1001, 8001, datetime(year=2020, month=9, day=1, tzinfo=utc)), + (2001, 8001, datetime(year=2020, month=9, day=2, tzinfo=utc)), + (2001, 8002, datetime(year=2020, month=9, day=1, tzinfo=utc)), + (1001, 8001, datetime(year=2020, month=9, day=2, tzinfo=utc)), + (1001, 8001, datetime(year=2020, month=9, day=3, tzinfo=utc)), + (1001, 8001, datetime(year=2020, month=9, day=4, tzinfo=utc)), ] temp_dir, file_uri = create_temp_parquet_file( spark, "customer_driver_pair", schema, df_data @@ -300,12 +325,12 @@ def test_historical_feature_retrieval_from_local_spark_session( ] ) expected_joined_df_data = [ - (1001, 8001, datetime(year=2020, month=9, day=1), 100.0, 100), - (2001, 8001, datetime(year=2020, month=9, day=2), 400.0, 150), - (2001, 8002, datetime(year=2020, month=9, day=1), 400.0, None), - (1001, 8001, datetime(year=2020, month=9, day=2), 200.0, 150), - (1001, 8001, datetime(year=2020, month=9, day=3), 200.0, 150), - (1001, 8001, datetime(year=2020, month=9, day=4), 300.0, None), + (1001, 8001, datetime(year=2020, month=9, day=1, tzinfo=utc), 100.0, 100), + (2001, 8001, datetime(year=2020, month=9, day=2, tzinfo=utc), 400.0, 150), + (2001, 8002, datetime(year=2020, month=9, day=1, tzinfo=utc), 400.0, None), + (1001, 8001, datetime(year=2020, month=9, day=2, tzinfo=utc), 200.0, 150), + (1001, 8001, datetime(year=2020, month=9, day=3, tzinfo=utc), 200.0, 150), + (1001, 8001, datetime(year=2020, month=9, day=4, tzinfo=utc), 300.0, None), ] expected_joined_df = spark.createDataFrame( spark.sparkContext.parallelize(expected_joined_df_data), @@ -325,9 +350,9 @@ def test_historical_feature_retrieval_with_field_mappings_from_local_spark_sessi ] ) df_data = [ - (8001, datetime(year=2020, month=9, day=1)), - (8001, datetime(year=2020, month=9, day=2)), - (8002, datetime(year=2020, month=9, day=1)), + (8001, datetime(year=2020, month=9, day=1, tzinfo=utc)), + (8001, datetime(year=2020, month=9, day=2, tzinfo=utc)), + (8002, datetime(year=2020, month=9, day=1, tzinfo=utc)), ] temp_dir, file_uri = create_temp_parquet_file(spark, "drivers", schema, df_data) entity_source = FileSource( @@ -344,9 +369,9 @@ def test_historical_feature_retrieval_with_field_mappings_from_local_spark_sessi ] ) expected_joined_df_data = [ - (8001, datetime(year=2020, month=9, day=1), 100), - (8001, datetime(year=2020, month=9, day=2), 150), - (8002, datetime(year=2020, month=9, day=1), None), + (8001, datetime(year=2020, month=9, day=1, tzinfo=utc), 100), + (8001, datetime(year=2020, month=9, day=2, tzinfo=utc), 150), + (8002, datetime(year=2020, month=9, day=1, tzinfo=utc), None), ] expected_joined_df = spark.createDataFrame( spark.sparkContext.parallelize(expected_joined_df_data), @@ -354,3 +379,76 @@ def test_historical_feature_retrieval_with_field_mappings_from_local_spark_sessi ) assert_dataframe_equal(joined_df, expected_joined_df) shutil.rmtree(temp_dir) + + +@pytest.mark.usefixtures( + "driver_entity", + "customer_entity", + "bookings_feature_table", + "transactions_feature_table", +) +def test_historical_feature_retrieval_with_pandas_dataframe_input( + client_with_local_spark, +): + + customer_driver_pairs_pandas_df = pd.DataFrame( + np.array( + [ + [1001, 8001, datetime(year=2020, month=9, day=1, tzinfo=utc)], + [2001, 8001, datetime(year=2020, month=9, day=2, tzinfo=utc)], + [2001, 8002, datetime(year=2020, month=9, day=1, tzinfo=utc)], + [1001, 8001, datetime(year=2020, month=9, day=2, tzinfo=utc)], + [1001, 8001, datetime(year=2020, month=9, day=3, tzinfo=utc)], + [1001, 8001, datetime(year=2020, month=9, day=4, tzinfo=utc)], + ] + ), + columns=["customer_id", "driver_id", "event_timestamp"], + ) + customer_driver_pairs_pandas_df = customer_driver_pairs_pandas_df.astype( + {"customer_id": "int32", "driver_id": "int32"} + ) + + job_output = client_with_local_spark.get_historical_features( + ["transactions:total_transactions", "bookings:total_completed_bookings"], + customer_driver_pairs_pandas_df, + ) + + output_dir = job_output.get_output_file_uri() + joined_df = pd.read_parquet(urlparse(output_dir).path) + + expected_joined_df = pd.DataFrame( + np.array( + [ + [1001, 8001, datetime(year=2020, month=9, day=1), 100.0, 100], + [2001, 8001, datetime(year=2020, month=9, day=2), 400.0, 150], + [2001, 8002, datetime(year=2020, month=9, day=1), 400.0, None], + [1001, 8001, datetime(year=2020, month=9, day=2), 200.0, 150], + [1001, 8001, datetime(year=2020, month=9, day=3), 200.0, 150], + [1001, 8001, datetime(year=2020, month=9, day=4), 300.0, None], + ] + ), + columns=[ + "customer_id", + "driver_id", + "event_timestamp", + "transactions__total_transactions", + "bookings__total_completed_bookings", + ], + ) + expected_joined_df = expected_joined_df.astype( + { + "customer_id": "int32", + "driver_id": "int32", + "transactions__total_transactions": "float64", + "bookings__total_completed_bookings": "float64", + } + ) + + assert_frame_equal( + joined_df.sort_values( + by=["customer_id", "driver_id", "event_timestamp"] + ).reset_index(drop=True), + expected_joined_df.sort_values( + by=["customer_id", "driver_id", "event_timestamp"] + ).reset_index(drop=True), + ) diff --git a/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala index d958c2ea8d..7216541c85 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala @@ -57,6 +57,7 @@ trait BasePipeline { .set("spark.metrics.conf.*.sink.statsd.period", "30") .set("spark.metrics.conf.*.sink.statsd.unit", "seconds") .set("spark.metrics.namespace", jobConfig.mode.toString) + case None => () } SparkSession diff --git a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala index 9ca3612180..0a7b48730e 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala @@ -101,6 +101,6 @@ case class IngestionJobConfig( startTime: DateTime = DateTime.now(), endTime: DateTime = DateTime.now(), store: StoreConfig = RedisConfig("localhost", 6379, false), - metrics: Option[MetricConfig] = Some(StatsDConfig("localhost", 9125)), + metrics: Option[MetricConfig] = None, deadLetterPath: Option[String] = None ) diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index fcdbeae3d7..321ba5dcfc 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -1,5 +1,11 @@ +import os +from pathlib import Path + +import pyspark import pytest +from feast import Client + def pytest_addoption(parser): parser.addoption("--core_url", action="store", default="localhost:6565") @@ -34,3 +40,52 @@ def pytest_runtest_setup(item): previousfailed = getattr(item.parent, "_previousfailed", None) if previousfailed is not None: pytest.xfail("previous test failed (%s)" % previousfailed.name) + + +@pytest.fixture(scope="session") +def feast_version(): + return "0.8-SNAPSHOT" + + +@pytest.fixture(scope="session") +def ingestion_job_jar(pytestconfig, feast_version): + default_path = ( + Path(__file__).parent.parent.parent + / "spark" + / "ingestion" + / "target" + / f"feast-ingestion-spark-{feast_version}.jar" + ) + + return pytestconfig.getoption("ingestion_jar") or f"file://{default_path}" + + +@pytest.fixture(scope="session") +def feast_client(pytestconfig, ingestion_job_jar): + redis_host, redis_port = pytestconfig.getoption("redis_url").split(":") + + if pytestconfig.getoption("env") == "local": + return Client( + core_url=pytestconfig.getoption("core_url"), + serving_url=pytestconfig.getoption("serving_url"), + spark_launcher="standalone", + spark_standalone_master="local", + spark_home=os.getenv("SPARK_HOME") or os.path.dirname(pyspark.__file__), + spark_ingestion_jar=ingestion_job_jar, + redis_host=redis_host, + redis_port=redis_port, + ) + + if pytestconfig.getoption("env") == "gcloud": + return Client( + core_url=pytestconfig.getoption("core_url"), + serving_url=pytestconfig.getoption("serving_url"), + spark_launcher="dataproc", + dataproc_cluster_name=pytestconfig.getoption("dataproc_cluster_name"), + dataproc_project=pytestconfig.getoption("dataproc_project"), + dataproc_region=pytestconfig.getoption("dataproc_region"), + dataproc_staging_location=os.path.join( + pytestconfig.getoption("staging_path"), "dataproc" + ), + spark_ingestion_jar=ingestion_job_jar, + ) diff --git a/tests/e2e/requirements.txt b/tests/e2e/requirements.txt index c7f802a7f8..80380451c5 100644 --- a/tests/e2e/requirements.txt +++ b/tests/e2e/requirements.txt @@ -2,6 +2,7 @@ mock==2.0.0 numpy==1.16.4 pandas~=1.0.0 pandavro==1.5.* +pyspark==2.4.2 pytest==6.0.0 pytest-benchmark==3.2.2 pytest-mock==1.10.4 diff --git a/tests/e2e/test_historical_features.py b/tests/e2e/test_historical_features.py new file mode 100644 index 0000000000..84c1af139f --- /dev/null +++ b/tests/e2e/test_historical_features.py @@ -0,0 +1,128 @@ +import os +import tempfile +import uuid +from datetime import datetime, timedelta +from urllib.parse import urlparse + +import numpy as np +import pandas as pd +import pytest +from google.protobuf.duration_pb2 import Duration +from pandas._testing import assert_frame_equal + +from feast import Client, Entity, Feature, FeatureTable, FileSource, ValueType +from feast.data_format import ParquetFormat +from feast.staging.storage_client import get_staging_client + +np.random.seed(0) + + +@pytest.fixture(scope="function") +def staging_path(pytestconfig, tmp_path): + if pytestconfig.getoption("env") == "local": + return f"file://{tmp_path}" + + staging_path = pytestconfig.getoption("staging_path") + return os.path.join(staging_path, str(uuid.uuid4())) + + +@pytest.mark.skip +def test_historical_features(feast_client: Client, staging_path: str): + customer_entity = Entity( + name="customer_id", description="Customer", value_type=ValueType.INT64 + ) + feast_client.apply_entity(customer_entity) + + max_age = Duration() + max_age.FromSeconds(2 * 86400) + + transactions_feature_table = FeatureTable( + name="transactions", + entities=["customer_id"], + features=[ + Feature("daily_transactions", ValueType.DOUBLE), + Feature("total_transactions", ValueType.DOUBLE), + ], + batch_source=FileSource( + "event_timestamp", + "created_timestamp", + ParquetFormat(), + os.path.join(staging_path, "transactions"), + ), + max_age=max_age, + ) + + feast_client.apply_feature_table(transactions_feature_table) + + retrieval_date = ( + datetime.utcnow() + .replace(hour=0, minute=0, second=0, microsecond=0) + .replace(tzinfo=None) + ) + retrieval_outside_max_age_date = retrieval_date + timedelta(1) + event_date = retrieval_date - timedelta(2) + creation_date = retrieval_date - timedelta(1) + + customers = [1001, 1002, 1003, 1004, 1005] + daily_transactions = [np.random.rand() * 10 for _ in customers] + total_transactions = [np.random.rand() * 100 for _ in customers] + + transactions_df = pd.DataFrame( + { + "event_timestamp": [event_date for _ in customers], + "created_timestamp": [creation_date for _ in customers], + "customer_id": customers, + "daily_transactions": daily_transactions, + "total_transactions": total_transactions, + } + ) + + feast_client.ingest(transactions_feature_table, transactions_df) + + feature_refs = ["transactions:daily_transactions"] + + customer_df = pd.DataFrame( + { + "event_timestamp": [retrieval_date for _ in customers] + + [retrieval_outside_max_age_date for _ in customers], + "customer_id": customers + customers, + } + ) + + with tempfile.TemporaryDirectory() as tempdir: + df_export_path = os.path.join(tempdir, "customers.parquets") + customer_df.to_parquet(df_export_path) + scheme, _, remote_path, _, _, _ = urlparse(staging_path) + staging_client = get_staging_client(scheme) + staging_client.upload_file(df_export_path, None, remote_path) + customer_source = FileSource( + "event_timestamp", + "event_timestamp", + ParquetFormat(), + os.path.join(staging_path, os.path.basename(df_export_path)), + ) + + job = feast_client.get_historical_features(feature_refs, customer_source) + output_dir = job.get_output_file_uri() + + _, _, joined_df_destination_path, _, _, _ = urlparse(output_dir) + joined_df = pd.read_parquet(joined_df_destination_path) + + expected_joined_df = pd.DataFrame( + { + "event_timestamp": [retrieval_date for _ in customers] + + [retrieval_outside_max_age_date for _ in customers], + "customer_id": customers + customers, + "transactions__daily_transactions": daily_transactions + + [None] * len(customers), + } + ) + + assert_frame_equal( + joined_df.sort_values(by=["customer_id", "event_timestamp"]).reset_index( + drop=True + ), + expected_joined_df.sort_values( + by=["customer_id", "event_timestamp"] + ).reset_index(drop=True), + ) diff --git a/tests/e2e/test_online_features.py b/tests/e2e/test_online_features.py index fc255c172b..8d81a9b79a 100644 --- a/tests/e2e/test_online_features.py +++ b/tests/e2e/test_online_features.py @@ -4,12 +4,10 @@ import time import uuid from datetime import datetime, timedelta -from pathlib import Path import avro.schema import numpy as np import pandas as pd -import pyspark import pytest import pytz from avro.io import BinaryEncoder, DatumWriter @@ -41,55 +39,6 @@ def generate_data(): return df -@pytest.fixture(scope="session") -def feast_version(): - return "0.8-SNAPSHOT" - - -@pytest.fixture(scope="session") -def ingestion_job_jar(pytestconfig, feast_version): - default_path = ( - Path(__file__).parent.parent.parent - / "spark" - / "ingestion" - / "target" - / f"feast-ingestion-spark-{feast_version}.jar" - ) - - return pytestconfig.getoption("ingestion_jar") or f"file://{default_path}" - - -@pytest.fixture(scope="session") -def feast_client(pytestconfig, ingestion_job_jar): - redis_host, redis_port = pytestconfig.getoption("redis_url").split(":") - - if pytestconfig.getoption("env") == "local": - return Client( - core_url=pytestconfig.getoption("core_url"), - serving_url=pytestconfig.getoption("serving_url"), - spark_launcher="standalone", - spark_standalone_master="local", - spark_home=os.getenv("SPARK_HOME") or os.path.dirname(pyspark.__file__), - spark_ingestion_jar=ingestion_job_jar, - redis_host=redis_host, - redis_port=redis_port, - ) - - if pytestconfig.getoption("env") == "gcloud": - return Client( - core_url=pytestconfig.getoption("core_url"), - serving_url=pytestconfig.getoption("serving_url"), - spark_launcher="dataproc", - dataproc_cluster_name=pytestconfig.getoption("dataproc_cluster_name"), - dataproc_project=pytestconfig.getoption("dataproc_project"), - dataproc_region=pytestconfig.getoption("dataproc_region"), - dataproc_staging_location=os.path.join( - pytestconfig.getoption("staging_path"), "dataproc" - ), - spark_ingestion_jar=ingestion_job_jar, - ) - - @pytest.fixture(scope="function") def staging_path(pytestconfig, tmp_path): if pytestconfig.getoption("env") == "local":