Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reporting metrics from validation UDF #1256

Merged
merged 10 commits into from
Jan 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion infra/docker-compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ services:
FEAST_HISTORICAL_FEATURE_OUTPUT_FORMAT: parquet
FEAST_REDIS_HOST: redis
FEAST_SPARK_INGESTION_JAR: ${INGESTION_JAR_PATH}
FEAST_STATSD_ENABLED: "true"
FEAST_STATSD_HOST: prometheus_statsd
FEAST_STATSD_PORT: 9125

jupyter:
image: gcr.io/kf-feast/feast-jupyter:${FEAST_VERSION}
Expand Down Expand Up @@ -106,4 +109,10 @@ services:
redis:
image: redis:5-alpine
ports:
- "6379:6379"
- "6379:6379"

prometheus_statsd:
image: prom/statsd-exporter:v0.12.1
ports:
- "9125:9125"
- "9102:9102"
3 changes: 2 additions & 1 deletion infra/scripts/test-docker-compose.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ clean_up () {
ARG=$?

# Shut down docker-compose images

docker-compose down

exit $ARG
Expand Down Expand Up @@ -69,4 +70,4 @@ docker exec \
-e DISABLE_FEAST_SERVICE_FIXTURES=true \
--user root \
feast_jupyter_1 bash \
-c 'cd /feast/tests && python -m pip install -r requirements.txt && pytest e2e/ --ingestion-jar https://storage.googleapis.com/feast-jobs/spark/ingestion/feast-ingestion-spark-${FEAST_VERSION}.jar --redis-url redis:6379 --core-url core:6565 --serving-url online_serving:6566 --job-service-url jobservice:6568 --staging-path file:///shared/staging/ --kafka-brokers kafka:9092 --feast-version develop'
-c 'cd /feast/tests && python -m pip install -r requirements.txt && pytest e2e/ --ingestion-jar https://storage.googleapis.com/feast-jobs/spark/ingestion/feast-ingestion-spark-${FEAST_VERSION}.jar --redis-url redis:6379 --core-url core:6565 --serving-url online_serving:6566 --job-service-url jobservice:6568 --staging-path file:///shared/staging/ --kafka-brokers kafka:9092 --statsd-url prometheus_statsd:9125 --prometheus-url prometheus_statsd:9102 --feast-version develop'
69 changes: 66 additions & 3 deletions sdk/python/feast/contrib/validation/ge.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io
import json
import os
from typing import TYPE_CHECKING
from urllib.parse import urlparse

Expand All @@ -10,7 +11,7 @@
from feast.staging.storage_client import get_staging_client

try:
from great_expectations.core import ExpectationSuite
from great_expectations.core import ExpectationConfiguration, ExpectationSuite
from great_expectations.dataset import PandasDataset
except ImportError:
raise ImportError(
Expand Down Expand Up @@ -41,7 +42,28 @@ def __init__(self, name: str, pickled_code: bytes):
self.pickled_code = pickled_code


def create_validation_udf(name: str, expectations: ExpectationSuite) -> ValidationUDF:
def drop_feature_table_prefix(
expectation_configuration: ExpectationConfiguration, prefix
):
kwargs = expectation_configuration.kwargs
for arg_name in ("column", "column_A", "column_B"):
if arg_name not in kwargs:
continue

if kwargs[arg_name].startswith(prefix):
kwargs[arg_name] = kwargs[arg_name][len(prefix) :]


def prepare_expectations(suite: ExpectationSuite, feature_table: "FeatureTable"):
for expectation in suite.expectations:
drop_feature_table_prefix(expectation, f"{feature_table.name}__")

return suite


def create_validation_udf(
name: str, expectations: ExpectationSuite, feature_table: "FeatureTable",
) -> ValidationUDF:
"""
Wraps your expectations into Spark UDF.

Expand All @@ -60,10 +82,25 @@ def create_validation_udf(name: str, expectations: ExpectationSuite) -> Validati

:param name
:param expectations: collection of expectation gathered on training dataset
:param feature_table
:return: ValidationUDF with serialized code
"""

expectations = prepare_expectations(expectations, feature_table)

def udf(df: pd.DataFrame) -> pd.Series:
from datadog.dogstatsd import DogStatsd

reporter = (
DogStatsd(
host=os.environ["STATSD_HOST"],
port=int(os.environ["STATSD_PORT"]),
telemetry_min_flush_interval=0,
)
if os.getenv("STATSD_HOST") and os.getenv("STATSD_PORT")
else DogStatsd()
)

ds = PandasDataset.from_dataset(df)
result = ds.validate(expectations, result_format="COMPLETE")
valid_rows = pd.Series([True] * df.shape[0])
Expand All @@ -72,6 +109,32 @@ def udf(df: pd.DataFrame) -> pd.Series:
if check.success:
continue

unexpected_count = (
check.result["unexpected_count"]
if "unexpected_count" in check.result
else df.shape[0]
)

check_kwargs = check.expectation_config.kwargs
check_kwargs.pop("result_format", None)
check_name = "_".join(
[check.expectation_config.expectation_type]
+ [
str(v)
for v in check_kwargs.values()
if isinstance(v, (str, int, float))
]
)

reporter.increment(
"feast_feature_validation_check_failed",
value=unexpected_count,
tags=[
f"feature_table:{os.getenv('FEAST_INGESTION_FEATURE_TABLE', 'unknown')}",
f"check:{check_name}",
],
)

if check.exception_info["raised_exception"]:
# ToDo: probably we should mark all rows as invalid
continue
Expand Down Expand Up @@ -106,7 +169,7 @@ def apply_validation(
staging_client = get_staging_client(staging_scheme, client._config)

pickled_code_fp = io.BytesIO(udf.pickled_code)
remote_path = f"{staging_location}/udfs/{udf.name}.pickle"
remote_path = f"{staging_location}/udfs/{feature_table.name}/{udf.name}.pickle"
staging_client.upload_fileobj(
pickled_code_fp, f"{udf.name}.pickle", remote_uri=urlparse(remote_path)
)
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/feature_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,6 @@ def _update_from_feature_table(self, feature_table):
self.stream_source = feature_table.stream_source
self._created_timestamp = feature_table.created_timestamp
self._last_updated_timestamp = feature_table.last_updated_timestamp

def __repr__(self):
return f"FeatureTable <{self.name}>"
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,19 @@ object StreamingPipeline extends BasePipeline with Serializable {
val fileName = validationConfig.pickledCodePath.split("/").last
val pickledCode = FileUtils.readFileToByteArray(new File(SparkFiles.get(fileName)))

val env = config.metrics match {
case Some(c: StatsDConfig) =>
Map(
"STATSD_HOST" -> c.host,
"STATSD_PORT" -> c.port.toString,
"FEAST_INGESTION_FEATURE_TABLE" -> config.featureTable.name
)
case _ => Map.empty[String, String]
}

UserDefinedPythonFunction(
validationConfig.name,
DynamicPythonFunction.create(pickledCode),
DynamicPythonFunction.create(pickledCode, env),
BooleanType,
pythonEvalType = 200, // SQL_SCALAR_PANDAS_UDF (original constant is in private object)
udfDeterministic = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@ object DynamicPythonFunction {
)
}

def create(pickledCode: Array[Byte], includePath: String = "libs/"): PythonFunction = {
val envVars = new JHashMap[String, String]()
def create(
pickledCode: Array[Byte],
env: Map[String, String] = Map.empty,
includePath: String = "libs/"
): PythonFunction = {
val envVars = new JHashMap[String, String](env.asJava)
val broadcasts = new JArrayList[Broadcast[PythonBroadcast]]()

if (!sys.env.contains("SPARK_HOME")) {
Expand Down
9 changes: 8 additions & 1 deletion tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def pytest_addoption(parser):
parser.addoption("--feast-version", action="store")
parser.addoption("--bq-project", action="store")
parser.addoption("--feast-project", action="store", default="default")
parser.addoption("--statsd-url", action="store", default="localhost:8125")
parser.addoption("--prometheus-url", action="store", default="localhost:9102")


def pytest_runtest_setup(item):
Expand All @@ -51,10 +53,15 @@ def pytest_runtest_setup(item):
kafka_port,
kafka_server,
redis_server,
statsd_server,
zookeeper_server,
)
else:
from .fixtures.external_services import kafka_server, redis_server # noqa
from .fixtures.external_services import ( # type: ignore # noqa
kafka_server,
redis_server,
statsd_server,
)

if not os.environ.get("DISABLE_FEAST_SERVICE_FIXTURES"):
from .fixtures.feast_services import * # type: ignore # noqa
Expand Down
5 changes: 5 additions & 0 deletions tests/e2e/fixtures/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
from pytest_redis.executor import RedisExecutor

from feast import Client
from tests.e2e.fixtures.statsd_stub import StatsDServer


@pytest.fixture
def feast_client(
pytestconfig,
ingestion_job_jar,
redis_server: RedisExecutor,
statsd_server: StatsDServer,
feast_core: Tuple[str, int],
feast_serving: Tuple[str, int],
local_staging_path,
Expand Down Expand Up @@ -44,6 +46,9 @@ def feast_client(
local_staging_path, "historical_output"
),
ingestion_drop_invalid_rows=True,
statsd_enabled=True,
statsd_host=statsd_server.host,
statsd_port=statsd_server.port,
**job_service_env,
)

Expand Down
12 changes: 12 additions & 0 deletions tests/e2e/fixtures/external_services.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import pytest
from pytest_redis.executor import NoopRedis

from tests.e2e.fixtures.statsd_stub import PrometheusStatsDServer

__all__ = (
"feast_core",
"feast_serving",
"redis_server",
"kafka_server",
"enable_auth",
"feast_jobservice",
"statsd_server",
)


Expand Down Expand Up @@ -44,3 +47,12 @@ def enable_auth():
def feast_jobservice(pytestconfig):
host, port = pytestconfig.getoption("job_service_url").split(":")
return host, port


@pytest.fixture(scope="session")
def statsd_server(pytestconfig):
host, port = pytestconfig.getoption("statsd_url").split(":")
prometheus_host, prometheus_port = pytestconfig.getoption("prometheus_url").split(
":"
)
return PrometheusStatsDServer(host, port, prometheus_host, prometheus_port)
29 changes: 23 additions & 6 deletions tests/e2e/fixtures/services.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
import pathlib
import shutil
import tempfile

import port_for
import pytest
import requests
from pytest_kafka import make_kafka_server, make_zookeeper_process
Expand All @@ -14,16 +15,23 @@
"zookeeper_server",
"postgres_server",
"redis_server",
"statsd_server",
)

from tests.e2e.fixtures.statsd_stub import StatsDStub


def download_kafka(version="2.12-2.6.0"):
r = requests.get(f"https://downloads.apache.org/kafka/2.6.0/kafka_{version}.tgz")
temp_dir = pathlib.Path(tempfile.mkdtemp())
local_path = temp_dir / "kafka.tgz"
temp_dir = pathlib.Path("/tmp")
local_path = temp_dir / f"kafka_{version}.tgz"

if not os.path.isfile(local_path):
r = requests.get(
f"https://downloads.apache.org/kafka/2.6.0/kafka_{version}.tgz"
)

with open(local_path, "wb") as f:
f.write(r.content)
with open(local_path, "wb") as f:
f.write(r.content)

shutil.unpack_archive(str(local_path), str(temp_dir))
return temp_dir / f"kafka_{version}" / "bin"
Expand All @@ -35,6 +43,15 @@ def kafka_server(kafka_port):
return "localhost", port


@pytest.fixture
def statsd_server():
port = port_for.select_random(None)
server = StatsDStub(port=port)
server.start()
yield server
server.stop()


postgres_server = pg_factories.postgresql_proc(password="password")
redis_server = redis_factories.redis_proc(
executable=shutil.which("redis-server"), timeout=3600
Expand Down
Loading