From 325695275da610cecf2b9e820fd71f7f04179ccf Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 8 Jun 2022 12:41:04 -0700 Subject: [PATCH] feat: Update stream fcos to have watermark and sliding interval (#2765) * Add sliding window to aggregations Signed-off-by: Kevin Zhang * Fix Signed-off-by: Kevin Zhang * update apis Signed-off-by: Kevin Zhang * Lint Signed-off-by: Kevin Zhang * Fix lint Signed-off-by: Kevin Zhang * Fix Signed-off-by: Kevin Zhang * Fix Signed-off-by: Kevin Zhang --- protos/feast/core/Aggregation.proto | 1 + protos/feast/core/DataSource.proto | 2 + sdk/python/feast/aggregation.py | 24 +++++++- sdk/python/feast/data_source.py | 59 ++++++++++++++++++- .../integration/registration/test_registry.py | 1 + .../test_stream_feature_view_apply.py | 1 + 6 files changed, 85 insertions(+), 3 deletions(-) diff --git a/protos/feast/core/Aggregation.proto b/protos/feast/core/Aggregation.proto index d848ce6972..d2d6cab702 100644 --- a/protos/feast/core/Aggregation.proto +++ b/protos/feast/core/Aggregation.proto @@ -11,4 +11,5 @@ message Aggregation { string column = 1; string function = 2; google.protobuf.Duration time_window = 3; + google.protobuf.Duration slide_interval = 4; } \ No newline at end of file diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index 9e6028ccfa..e71066ee70 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -22,6 +22,7 @@ option go_package = "github.com/feast-dev/feast/go/protos/feast/core"; option java_outer_classname = "DataSourceProto"; option java_package = "feast.proto.core"; +import "google/protobuf/duration.proto"; import "feast/core/DataFormat.proto"; import "feast/types/Value.proto"; import "feast/core/Feature.proto"; @@ -135,6 +136,7 @@ message DataSource { // Defines the stream data format encoding feature/entity data in Kafka messages. StreamFormat message_format = 3; + google.protobuf.Duration watermark = 4; } // Defines options for DataSource that sources features from Kinesis records. diff --git a/sdk/python/feast/aggregation.py b/sdk/python/feast/aggregation.py index 0a5fe84565..d0af49b425 100644 --- a/sdk/python/feast/aggregation.py +++ b/sdk/python/feast/aggregation.py @@ -14,21 +14,28 @@ class Aggregation: column: str # Column name of the feature we are aggregating. function: str # Provided built in aggregations sum, max, min, count mean time_window: timedelta # The time window for this aggregation. + slide_interval: timedelta # The sliding window for these aggregations """ column: str function: str time_window: Optional[timedelta] + slide_interval: Optional[timedelta] def __init__( self, column: Optional[str] = "", function: Optional[str] = "", time_window: Optional[timedelta] = None, + slide_interval: Optional[timedelta] = None, ): self.column = column or "" self.function = function or "" self.time_window = time_window + if not slide_interval: + self.slide_interval = self.time_window + else: + self.slide_interval = slide_interval def to_proto(self) -> AggregationProto: window_duration = None @@ -36,8 +43,16 @@ def to_proto(self) -> AggregationProto: window_duration = Duration() window_duration.FromTimedelta(self.time_window) + slide_interval_duration = None + if self.slide_interval is not None: + slide_interval_duration = Duration() + slide_interval_duration.FromTimedelta(self.slide_interval) + return AggregationProto( - column=self.column, function=self.function, time_window=window_duration + column=self.column, + function=self.function, + time_window=window_duration, + slide_interval=slide_interval_duration, ) @classmethod @@ -48,10 +63,16 @@ def from_proto(cls, agg_proto: AggregationProto): else agg_proto.time_window.ToTimedelta() ) + slide_interval = ( + timedelta(days=0) + if agg_proto.slide_interval.ToNanoseconds() == 0 + else agg_proto.slide_interval.ToTimedelta() + ) aggregation = cls( column=agg_proto.column, function=agg_proto.function, time_window=time_window, + slide_interval=slide_interval, ) return aggregation @@ -63,6 +84,7 @@ def __eq__(self, other): self.column != other.column or self.function != other.function or self.time_window != other.time_window + or self.slide_interval != other.slide_interval ): return False diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 6f416e70d3..1211edd54a 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -15,8 +15,10 @@ import enum import warnings from abc import ABC, abstractmethod +from datetime import timedelta from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union +from google.protobuf.duration_pb2 import Duration from google.protobuf.json_format import MessageToJson from feast import type_map @@ -47,11 +49,16 @@ class KafkaOptions: """ def __init__( - self, bootstrap_servers: str, message_format: StreamFormat, topic: str, + self, + bootstrap_servers: str, + message_format: StreamFormat, + topic: str, + watermark: Optional[timedelta] = None, ): self.bootstrap_servers = bootstrap_servers self.message_format = message_format self.topic = topic + self.watermark = watermark or None @classmethod def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions): @@ -64,11 +71,18 @@ def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions): Returns: Returns a BigQueryOptions object based on the kafka_options protobuf """ - + watermark = None + if kafka_options_proto.HasField("watermark"): + watermark = ( + timedelta(days=0) + if kafka_options_proto.watermark.ToNanoseconds() == 0 + else kafka_options_proto.watermark.ToTimedelta() + ) kafka_options = cls( bootstrap_servers=kafka_options_proto.bootstrap_servers, message_format=StreamFormat.from_proto(kafka_options_proto.message_format), topic=kafka_options_proto.topic, + watermark=watermark, ) return kafka_options @@ -80,11 +94,16 @@ def to_proto(self) -> DataSourceProto.KafkaOptions: Returns: KafkaOptionsProto protobuf """ + watermark_duration = None + if self.watermark is not None: + watermark_duration = Duration() + watermark_duration.FromTimedelta(self.watermark) kafka_options_proto = DataSourceProto.KafkaOptions( bootstrap_servers=self.bootstrap_servers, message_format=self.message_format.to_proto(), topic=self.topic, + watermark=watermark_duration, ) return kafka_options_proto @@ -369,7 +388,32 @@ def __init__( owner: Optional[str] = "", timestamp_field: Optional[str] = "", batch_source: Optional[DataSource] = None, + watermark: Optional[timedelta] = None, ): + """ + Creates a KafkaSource stream source object. + Args: + name: str. Name of data source, which should be unique within a project + event_timestamp_column (optional): str. (Deprecated) Event timestamp column used for point in time + joins of feature values. + bootstrap_servers: str. The servers of the kafka broker in the form "localhost:9092". + message_format: StreamFormat. StreamFormat of serialized messages. + topic: str. The name of the topic to read from in the kafka source. + created_timestamp_column (optional): str. Timestamp column indicating when the row + was created, used for deduplicating rows. + field_mapping (optional): dict(str, str). A dictionary mapping of column names in this data + source to feature names in a feature table or view. Only used for feature + columns, not entity or timestamp columns. + date_partition_column (optional): str. Timestamp column used for partitioning. + description (optional): str. A human-readable description. + tags (optional): dict(str, str). A dictionary of key-value pairs to store arbitrary metadata. + owner (optional): str. The owner of the data source, typically the email of the primary + maintainer. + timestamp_field (optional): str. Event timestamp field used for point + in time joins of feature values. + batch_source: DataSource. The datasource that acts as a batch source. + watermark: timedelta. The watermark for stream data. Specifically how late stream data can arrive without being discarded. + """ positional_attributes = [ "name", "event_timestamp_column", @@ -425,10 +469,12 @@ def __init__( timestamp_field=timestamp_field, ) self.batch_source = batch_source + self.kafka_options = KafkaOptions( bootstrap_servers=_bootstrap_servers, message_format=_message_format, topic=_topic, + watermark=watermark, ) def __eq__(self, other): @@ -445,6 +491,7 @@ def __eq__(self, other): != other.kafka_options.bootstrap_servers or self.kafka_options.message_format != other.kafka_options.message_format or self.kafka_options.topic != other.kafka_options.topic + or self.kafka_options.watermark != other.kafka_options.watermark ): return False @@ -455,6 +502,13 @@ def __hash__(self): @staticmethod def from_proto(data_source: DataSourceProto): + watermark = None + if data_source.kafka_options.HasField("watermark"): + watermark = ( + timedelta(days=0) + if data_source.kafka_options.watermark.ToNanoseconds() == 0 + else data_source.kafka_options.watermark.ToTimedelta() + ) return KafkaSource( name=data_source.name, event_timestamp_column=data_source.timestamp_field, @@ -463,6 +517,7 @@ def from_proto(data_source: DataSourceProto): message_format=StreamFormat.from_proto( data_source.kafka_options.message_format ), + watermark=watermark, topic=data_source.kafka_options.topic, created_timestamp_column=data_source.created_timestamp_column, timestamp_field=data_source.timestamp_field, diff --git a/sdk/python/tests/integration/registration/test_registry.py b/sdk/python/tests/integration/registration/test_registry.py index 222eb116d2..fcf65570a0 100644 --- a/sdk/python/tests/integration/registration/test_registry.py +++ b/sdk/python/tests/integration/registration/test_registry.py @@ -319,6 +319,7 @@ def simple_udf(x: int): message_format=AvroFormat(""), topic="topic", batch_source=FileSource(path="some path"), + watermark=timedelta(days=1), ) sfv = StreamFeatureView( diff --git a/sdk/python/tests/integration/registration/test_stream_feature_view_apply.py b/sdk/python/tests/integration/registration/test_stream_feature_view_apply.py index d24618b270..e19641f291 100644 --- a/sdk/python/tests/integration/registration/test_stream_feature_view_apply.py +++ b/sdk/python/tests/integration/registration/test_stream_feature_view_apply.py @@ -27,6 +27,7 @@ def test_apply_stream_feature_view(environment) -> None: message_format=AvroFormat(""), topic="topic", batch_source=FileSource(path="test_path", timestamp_field="event_timestamp"), + watermark=timedelta(days=1), ) @stream_feature_view(