diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index fcff721257..538dd20c02 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -263,22 +263,24 @@ def apply( ValueError: The 'objects' parameter could not be parsed properly. Examples: - Register a single Entity and FeatureView. + Register an Entity and a FeatureView. - >>> from feast.feature_store import FeatureStore - >>> from feast import Entity, FeatureView, Feature, ValueType, FileSource + >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig >>> from datetime import timedelta - >>> - >>> fs = FeatureStore() - >>> customer_entity = Entity(name="customer", value_type=ValueType.INT64, description="customer entity") - >>> customer_feature_view = FeatureView( - >>> name="customer_fv", - >>> entities=["customer"], - >>> features=[Feature(name="age", dtype=ValueType.INT64)], - >>> batch_source=FileSource(path="file.parquet", event_timestamp_column="timestamp"), - >>> ttl=timedelta(days=1) - >>> ) - >>> fs.apply([customer_entity, customer_feature_view]) + >>> fs = FeatureStore(config=RepoConfig(registry="feature_repo/data/registry.db", project="feature_repo", provider="local")) + >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") + >>> driver_hourly_stats = FileSource( + ... path="feature_repo/data/driver_stats.parquet", + ... event_timestamp_column="event_timestamp", + ... created_timestamp_column="created", + ... ) + >>> driver_hourly_stats_view = FeatureView( + ... name="driver_hourly_stats", + ... entities=["driver_id"], + ... ttl=timedelta(seconds=86400 * 1), + ... batch_source=driver_hourly_stats, + ... ) + >>> fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view """ # TODO: Add locking @@ -381,17 +383,49 @@ def get_historical_features( ValueError: Both or neither of features and feature_refs are specified. Examples: - Retrieve historical features using a BigQuery SQL entity dataframe + Retrieve historical features from a local offline store. - >>> from feast.feature_store import FeatureStore - >>> - >>> fs = FeatureStore(config=RepoConfig(provider="gcp")) + >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig + >>> from datetime import timedelta + >>> import pandas as pd + >>> fs = FeatureStore(config=RepoConfig(registry="feature_repo/data/registry.db", project="feature_repo", provider="local")) + >>> # Before retrieving historical features, we must register the appropriate entity and featureview. + >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") + >>> driver_hourly_stats = FileSource( + ... path="feature_repo/data/driver_stats.parquet", + ... event_timestamp_column="event_timestamp", + ... created_timestamp_column="created", + ... ) + >>> driver_hourly_stats_view = FeatureView( + ... name="driver_hourly_stats", + ... entities=["driver_id"], + ... ttl=timedelta(seconds=86400 * 1), + ... features=[ + ... Feature(name="conv_rate", dtype=ValueType.FLOAT), + ... Feature(name="acc_rate", dtype=ValueType.FLOAT), + ... Feature(name="avg_daily_trips", dtype=ValueType.INT64), + ... ], + ... batch_source=driver_hourly_stats, + ... ) + >>> fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view + >>> entity_df = pd.DataFrame.from_dict( + ... { + ... "driver_id": [1001, 1002], + ... "event_timestamp": [ + ... datetime(2021, 4, 12, 10, 59, 42), + ... datetime(2021, 4, 12, 8, 12, 10), + ... ], + ... } + ... ) >>> retrieval_job = fs.get_historical_features( - >>> entity_df="SELECT event_timestamp, order_id, customer_id from gcp_project.my_ds.customer_orders", - >>> features=["customer:age", "customer:avg_orders_1d", "customer:avg_orders_7d"] - >>> ) + ... entity_df=entity_df, + ... features=[ + ... "driver_hourly_stats:conv_rate", + ... "driver_hourly_stats:acc_rate", + ... "driver_hourly_stats:avg_daily_trips", + ... ], + ... ) >>> feature_data = retrieval_job.to_df() - >>> model.fit(feature_data) # insert your modeling framework here. """ if (features is not None and feature_refs is not None) or ( features is None and feature_refs is None @@ -456,11 +490,32 @@ def materialize_incremental( Examples: Materialize all features into the online store up to 5 minutes ago. - >>> from datetime import datetime, timedelta - >>> from feast.feature_store import FeatureStore - >>> - >>> fs = FeatureStore(config=RepoConfig(provider="gcp", registry="gs://my-fs/", project="my_fs_proj")) + >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig + >>> from datetime import timedelta + >>> fs = FeatureStore(config=RepoConfig(registry="feature_repo/data/registry.db", project="feature_repo", provider="local")) + >>> # Before materializing, we must register the appropriate entity and featureview. + >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",) + >>> driver_hourly_stats = FileSource( + ... path="feature_repo/data/driver_stats.parquet", + ... event_timestamp_column="event_timestamp", + ... created_timestamp_column="created", + ... ) + >>> driver_hourly_stats_view = FeatureView( + ... name="driver_hourly_stats", + ... entities=["driver_id"], + ... ttl=timedelta(seconds=86400 * 1), + ... features=[ + ... Feature(name="conv_rate", dtype=ValueType.FLOAT), + ... Feature(name="acc_rate", dtype=ValueType.FLOAT), + ... Feature(name="avg_daily_trips", dtype=ValueType.INT64), + ... ], + ... batch_source=driver_hourly_stats, + ... ) + >>> fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view >>> fs.materialize_incremental(end_date=datetime.utcnow() - timedelta(minutes=5)) + Materializing... + + ... """ feature_views_to_materialize = [] if feature_views is None: @@ -539,13 +594,34 @@ def materialize( Materialize all features into the online store over the interval from 3 hours ago to 10 minutes ago. - >>> from datetime import datetime, timedelta - >>> from feast.feature_store import FeatureStore - >>> - >>> fs = FeatureStore(config=RepoConfig(provider="gcp")) + >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig + >>> from datetime import timedelta + >>> fs = FeatureStore(config=RepoConfig(registry="feature_repo/data/registry.db", project="feature_repo", provider="local")) + >>> # Before materializing, we must register the appropriate entity and featureview. + >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",) + >>> driver_hourly_stats = FileSource( + ... path="feature_repo/data/driver_stats.parquet", + ... event_timestamp_column="event_timestamp", + ... created_timestamp_column="created", + ... ) + >>> driver_hourly_stats_view = FeatureView( + ... name="driver_hourly_stats", + ... entities=["driver_id"], + ... ttl=timedelta(seconds=86400 * 1), + ... features=[ + ... Feature(name="conv_rate", dtype=ValueType.FLOAT), + ... Feature(name="acc_rate", dtype=ValueType.FLOAT), + ... Feature(name="avg_daily_trips", dtype=ValueType.INT64), + ... ], + ... batch_source=driver_hourly_stats, + ... ) + >>> fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view >>> fs.materialize( - >>> start_date=datetime.utcnow() - timedelta(hours=3), end_date=datetime.utcnow() - timedelta(minutes=10) - >>> ) + ... start_date=datetime.utcnow() - timedelta(hours=3), end_date=datetime.utcnow() - timedelta(minutes=10) + ... ) + Materializing... + + ... """ if utils.make_tzaware(start_date) > utils.make_tzaware(end_date): raise ValueError( @@ -627,17 +703,47 @@ def get_online_features( Exception: No entity with the specified name exists. Examples: - >>> from feast import FeatureStore - >>> - >>> store = FeatureStore(repo_path="...") - >>> feature_refs = ["sales:daily_transactions"] - >>> entity_rows = [{"customer_id": 0},{"customer_id": 1}] - >>> - >>> online_response = store.get_online_features( - >>> feature_refs, entity_rows) + Materialize all features into the online store over the interval + from 3 hours ago to 10 minutes ago, and then retrieve these online features. + + >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig + >>> from datetime import timedelta + >>> import pandas as pd + >>> fs = FeatureStore(config=RepoConfig(registry="feature_repo/data/registry.db", project="feature_repo", provider="local")) + >>> # Before getting online features, we must register the appropriate entity and featureview and then materialize the features. + >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",) + >>> driver_hourly_stats = FileSource( + ... path="feature_repo/data/driver_stats.parquet", + ... event_timestamp_column="event_timestamp", + ... created_timestamp_column="created", + ... ) + >>> driver_hourly_stats_view = FeatureView( + ... name="driver_hourly_stats", + ... entities=["driver_id"], + ... ttl=timedelta(seconds=86400 * 1), + ... features=[ + ... Feature(name="conv_rate", dtype=ValueType.FLOAT), + ... Feature(name="acc_rate", dtype=ValueType.FLOAT), + ... Feature(name="avg_daily_trips", dtype=ValueType.INT64), + ... ], + ... batch_source=driver_hourly_stats, + ... ) + >>> fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view + >>> fs.materialize( + ... start_date=datetime.utcnow() - timedelta(hours=3), end_date=datetime.utcnow() - timedelta(minutes=10) + ... ) + Materializing... + + ... + >>> online_response = fs.get_online_features( + ... features=[ + ... "driver_hourly_stats:conv_rate", + ... "driver_hourly_stats:acc_rate", + ... "driver_hourly_stats:avg_daily_trips", + ... ], + ... entity_rows=[{"driver_id": 1001}, {"driver_id": 1002}, {"driver_id": 1003}, {"driver_id": 1004}], + ... ) >>> online_response_dict = online_response.to_dict() - >>> print(online_response_dict) - {'sales:daily_transactions': [1.1,1.2], 'sales:customer_id': [0,1]} """ _feature_refs = self._get_features(features, feature_refs) diff --git a/sdk/python/feast/infra/offline_stores/file_source.py b/sdk/python/feast/infra/offline_stores/file_source.py index 5d5d0f4560..624c812b23 100644 --- a/sdk/python/feast/infra/offline_stores/file_source.py +++ b/sdk/python/feast/infra/offline_stores/file_source.py @@ -35,7 +35,8 @@ def __init__( or view. Only used for feature columns, not entities or timestamp columns. Examples: - >>> FileSource(path="/data/my_features.parquet", event_timestamp_column="event_timestamp") + >>> from feast import FileSource + >>> file_source = FileSource(path="my_features.parquet", event_timestamp_column="event_timestamp") """ if path is None and file_url is None: raise ValueError( diff --git a/sdk/python/feast/infra/utils/aws_utils.py b/sdk/python/feast/infra/utils/aws_utils.py index fcd84d4b6f..aea460cfb8 100644 --- a/sdk/python/feast/infra/utils/aws_utils.py +++ b/sdk/python/feast/infra/utils/aws_utils.py @@ -233,7 +233,7 @@ def temporarily_upload_df_to_redshift( This is essentially the same as upload_df_to_redshift (check out its docstring for full details), but unlike it this method is a generator and should be used with `with` block. For example: - >>> with temporarily_upload_df_to_redshift(...): + >>> with temporarily_upload_df_to_redshift(...): # doctest: +SKIP >>> # Use `table_name` table in Redshift here >>> # `table_name` will not exist at this point, since it's cleaned up by the `with` block diff --git a/sdk/python/tests/doctest/test_all.py b/sdk/python/tests/doctest/test_all.py new file mode 100644 index 0000000000..cf93f5f6ef --- /dev/null +++ b/sdk/python/tests/doctest/test_all.py @@ -0,0 +1,71 @@ +import doctest +import importlib +import pkgutil +import sys +import unittest + +import feast + + +def setup_feature_store(docstring_tests): + """Prepares the local environment for a FeatureStore docstring test.""" + from feast.repo_operations import init_repo + + init_repo("feature_repo", "local") + + +def teardown_feature_store(docstring_tests): + """Cleans up the local environment after a FeatureStore docstring test.""" + import shutil + + shutil.rmtree("feature_repo", ignore_errors=True) + shutil.rmtree("data", ignore_errors=True) + + +def test_docstrings(): + """Runs all docstring tests. + + Imports all submodules of the feast package. Checks the submodules for docstring + tests and runs them. Setup functions for a submodule named "feast.x.y.z" should be + defined in this module as a function named "setup_x_y_z". Teardown functions can be + defined similarly. Setup and teardown functions are per-submodule. + """ + successful = True + current_packages = [feast] + + while current_packages: + next_packages = [] + + for package in current_packages: + for _, name, is_pkg in pkgutil.walk_packages(package.__path__): + full_name = package.__name__ + "." + name + + try: + temp_module = importlib.import_module(full_name) + relative_path_from_feast = full_name.split(".", 1)[1] + function_suffix = relative_path_from_feast.replace(".", "_") + setup_function_name = "setup_" + function_suffix + teardown_function_name = "teardown_" + function_suffix + setup_function = globals().get(setup_function_name) + teardown_function = globals().get(teardown_function_name) + + test_suite = doctest.DocTestSuite( + temp_module, + setUp=setup_function, + tearDown=teardown_function, + optionflags=doctest.ELLIPSIS, + ) + if test_suite.countTestCases() > 0: + result = unittest.TextTestRunner(sys.stdout).run(test_suite) + if not result.wasSuccessful(): + successful = False + + if is_pkg: + next_packages.append(temp_module) + except ModuleNotFoundError: + pass + + current_packages = next_packages + + if not successful: + raise Exception("Docstring tests failed.")