Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply a fixed window before writing row metrics #590

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,14 @@ public void processElement(
ProcessContext context,
@Element KV<String, Iterable<FeatureRow>> featureSetRefToFeatureRows) {
if (statsDClient == null) {
log.error("StatsD client is null, likely because it encounters an error during setup");
return;
}

String featureSetRef = featureSetRefToFeatureRows.getKey();
if (featureSetRef == null) {
log.error(
"Feature set reference in the feature row is null. Please check the input feature rows from previous steps");
return;
}
String[] colonSplits = featureSetRef.split(":");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TupleTag;
Expand Down Expand Up @@ -73,52 +74,47 @@ public PDone expand(PCollectionTuple input) {
.setStoreName(getStoreName())
.build()));

input
.get(getSuccessTag())
.apply(
"WriteRowMetrics",
ParDo.of(
WriteRowMetricsDoFn.newBuilder()
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.build()));
// Fixed window is applied so the metric collector will not be overwhelmed with the metrics
// data. For validation, only summaries of the values are usually required vs the actual
// values.
PCollection<KV<String, Iterable<FeatureRow>>> validRowsGroupedByRef =
input
.get(getSuccessTag())
.apply(
"FixedWindow",
Window.into(
FixedWindows.of(
Duration.standardSeconds(
options.getWindowSizeInSecForFeatureValueMetric()))))
.apply(
"ConvertToKV_FeatureSetRefToFeatureRow",
ParDo.of(
new DoFn<FeatureRow, KV<String, FeatureRow>>() {
@ProcessElement
public void processElement(
ProcessContext c, @Element FeatureRow featureRow) {
c.output(KV.of(featureRow.getFeatureSet(), featureRow));
}
}))
.apply("GroupByFeatureSetRef", GroupByKey.create());

// 1. Apply a fixed window
// 2. Group feature row by feature set reference
// 3. Calculate min, max, mean, percentiles of numerical values of features in the window
// and
// 4. Send the aggregate value to StatsD metric collector.
//
// NOTE: window is applied here so the metric collector will not be overwhelmed with
// metrics data. And for metric data, only statistic of the values are usually required
// vs the actual values.
input
.get(getSuccessTag())
.apply(
"FixedWindow",
Window.into(
FixedWindows.of(
Duration.standardSeconds(
options.getWindowSizeInSecForFeatureValueMetric()))))
.apply(
"ConvertTo_FeatureSetRefToFeatureRow",
ParDo.of(
new DoFn<FeatureRow, KV<String, FeatureRow>>() {
@ProcessElement
public void processElement(ProcessContext c, @Element FeatureRow featureRow) {
c.output(KV.of(featureRow.getFeatureSet(), featureRow));
}
}))
.apply("GroupByFeatureSetRef", GroupByKey.create())
.apply(
"WriteFeatureValueMetrics",
ParDo.of(
WriteFeatureValueMetricsDoFn.newBuilder()
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.build()));
validRowsGroupedByRef.apply(
"WriteRowMetrics",
ParDo.of(
WriteRowMetricsDoFn.newBuilder()
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.build()));

validRowsGroupedByRef.apply(
"WriteFeatureValueMetrics",
ParDo.of(
WriteFeatureValueMetricsDoFn.newBuilder()
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.build()));

return PDone.in(input.getPipeline());
case "none":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,26 @@
package feast.ingestion.transform.metrics;

import com.google.auto.value.AutoValue;
import com.google.protobuf.util.Timestamps;
import com.timgroup.statsd.NonBlockingStatsDClient;
import com.timgroup.statsd.StatsDClient;
import com.timgroup.statsd.StatsDClientException;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.FieldProto.Field;
import feast.types.ValueProto.Value;
import feast.types.ValueProto.Value.ValCase;
import java.time.Clock;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.slf4j.Logger;

@AutoValue
public abstract class WriteRowMetricsDoFn extends DoFn<FeatureRow, Void> {
public abstract class WriteRowMetricsDoFn extends DoFn<KV<String, Iterable<FeatureRow>>, Void> {

private static final Logger log = org.slf4j.LoggerFactory.getLogger(WriteRowMetricsDoFn.class);

Expand All @@ -39,12 +48,38 @@ public abstract class WriteRowMetricsDoFn extends DoFn<FeatureRow, Void> {
public static final String FEATURE_TAG_KEY = "feast_feature_name";
public static final String INGESTION_JOB_NAME_KEY = "ingestion_job_name";

public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_MIN = "feature_row_lag_ms_min";
public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_MAX = "feature_row_lag_ms_max";
public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_MEAN = "feature_row_lag_ms_mean";
public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_PERCENTILE_90 =
"feature_row_lag_ms_percentile_90";
public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_PERCENTILE_95 =
"feature_row_lag_ms_percentile_95";
public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_PERCENTILE_99 =
"feature_row_lag_ms_percentile_99";

public static final String GAUGE_NAME_FEATURE_VALUE_LAG_MS_MIN = "feature_value_lag_ms_min";
public static final String GAUGE_NAME_FEATURE_VALUE_LAG_MS_MAX = "feature_value_lag_ms_max";
public static final String GAUGE_NAME_FEATURE_VALUE_LAG_MS_MEAN = "feature_value_lag_ms_mean";
public static final String GAUGE_NAME_FEATURE_VALUE_LAG_MS_PERCENTILE_90 =
"feature_value_lag_ms_percentile_90";
public static final String GAUGE_NAME_FEATURE_VALUE_LAG_MS_PERCENTILE_95 =
"feature_value_lag_ms_percentile_95";
public static final String GAUGE_NAME_FEATURE_VALUE_LAG_MS_PERCENTILE_99 =
"feature_value_lag_ms_percentile_99";

public static final String COUNT_NAME_FEATURE_ROW_INGESTED = "feature_row_ingested_count";
public static final String COUNT_NAME_FEATURE_VALUE_MISSING = "feature_value_missing_count";

public abstract String getStoreName();

public abstract String getStatsdHost();

public abstract int getStatsdPort();

@Nullable
public abstract Clock getClock();

public static WriteRowMetricsDoFn create(
String newStoreName, String newStatsdHost, int newStatsdPort) {
return newBuilder()
Expand All @@ -69,79 +104,147 @@ public abstract static class Builder {

public abstract Builder setStatsdPort(int statsdPort);

/**
* setClock will override the default system clock used to calculate feature row lag.
*
* @param clock Clock instance
*/
public abstract Builder setClock(Clock clock);

public abstract WriteRowMetricsDoFn build();
}

@Setup
public void setup() {
statsd = new NonBlockingStatsDClient(METRIC_PREFIX, getStatsdHost(), getStatsdPort());
// Note that exception may be thrown during StatsD client instantiation but no exception
// will be thrown when sending metrics (mimicking the UDP protocol behaviour).
// https://jar-download.com/artifacts/com.datadoghq/java-dogstatsd-client/2.1.1/documentation
// https://github.com/DataDog/java-dogstatsd-client#unix-domain-socket-support
try {
statsd = new NonBlockingStatsDClient(METRIC_PREFIX, getStatsdHost(), getStatsdPort());
} catch (Exception e) {
log.error("StatsD client cannot be started: " + e.getMessage());
}
}

@SuppressWarnings("DuplicatedCode")
@ProcessElement
public void processElement(ProcessContext c) {
public void processElement(
ProcessContext c, @Element KV<String, Iterable<FeatureRow>> featureSetRefToFeatureRows) {
if (statsd == null) {
log.error("StatsD client is null, likely because it encounters an error during setup");
return;
}

try {
FeatureRow row = c.element();
long eventTimestamp = com.google.protobuf.util.Timestamps.toMillis(row.getEventTimestamp());

String[] split = row.getFeatureSet().split(":");
String featureSetProject = split[0].split("/")[0];
String featureSetName = split[0].split("/")[1];
String featureSetVersion = split[1];

statsd.histogram(
"feature_row_lag_ms",
System.currentTimeMillis() - eventTimestamp,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_PROJECT_TAG_KEY + ":" + featureSetProject,
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());

statsd.histogram(
"feature_row_event_time_epoch_ms",
eventTimestamp,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_PROJECT_TAG_KEY + ":" + featureSetProject,
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());

for (Field field : row.getFieldsList()) {
if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) {
statsd.histogram(
"feature_value_lag_ms",
System.currentTimeMillis() - eventTimestamp,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_PROJECT_TAG_KEY + ":" + featureSetProject,
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
FEATURE_TAG_KEY + ":" + field.getName(),
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());
String featureSetRef = featureSetRefToFeatureRows.getKey();
if (featureSetRef == null) {
log.error(
"Feature set reference in the feature row is null. Please check the input feature rows from previous steps");
return;
}
String[] colonSplits = featureSetRef.split(":");
if (colonSplits.length != 2) {
log.error(
"Skip writing feature row metrics because the feature set reference '{}' does not"
+ "follow the required format <project>/<feature_set_name>:<version>",
featureSetRef);
return;
}
String[] slashSplits = colonSplits[0].split("/");
if (slashSplits.length != 2) {
log.error(
"Skip writing feature row metrics because the feature set reference '{}' does not"
+ "follow the required format <project>/<feature_set_name>:<version>",
featureSetRef);
return;
}

String featureSetProject = slashSplits[0];
String featureSetName = slashSplits[1];
String featureSetVersion = colonSplits[1];

// featureRowLagStats is stats for feature row lag for feature set "featureSetName"
DescriptiveStatistics featureRowLagStats = new DescriptiveStatistics();
// featureNameToLagStats is stats for feature lag for all features in feature set
// "featureSetName"
Map<String, DescriptiveStatistics> featureNameToLagStats = new HashMap<>();
// featureNameToMissingCount is count for "value_not_set" for all features in feature set
// "featureSetName"
Map<String, Long> featureNameToMissingCount = new HashMap<>();

for (FeatureRow featureRow : featureSetRefToFeatureRows.getValue()) {
long currentTime = getClock() == null ? System.currentTimeMillis() : getClock().millis();
long featureRowLag = currentTime - Timestamps.toMillis(featureRow.getEventTimestamp());
featureRowLagStats.addValue(featureRowLag);

for (Field field : featureRow.getFieldsList()) {
String featureName = field.getName();
Value featureValue = field.getValue();
if (!featureNameToLagStats.containsKey(featureName)) {
// Ensure map contains the "featureName" key
featureNameToLagStats.put(featureName, new DescriptiveStatistics());
}
if (!featureNameToMissingCount.containsKey(featureName)) {
// Ensure map contains the "featureName" key
featureNameToMissingCount.put(featureName, 0L);
}
if (featureValue.getValCase().equals(ValCase.VAL_NOT_SET)) {
featureNameToMissingCount.put(
featureName, featureNameToMissingCount.get(featureName) + 1);
} else {
statsd.count(
"feature_value_missing_count",
1,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_PROJECT_TAG_KEY + ":" + featureSetProject,
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
FEATURE_TAG_KEY + ":" + field.getName(),
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());
featureNameToLagStats.get(featureName).addValue(featureRowLag);
}
}
}

String[] tags = {
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_PROJECT_TAG_KEY + ":" + featureSetProject,
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName(),
};

statsd.count(COUNT_NAME_FEATURE_ROW_INGESTED, featureRowLagStats.getN(), tags);
// DescriptiveStatistics returns invalid NaN value for getMin(), getMax(), ... when there is no
// items in the stats.
if (featureRowLagStats.getN() > 0) {
statsd.gauge(GAUGE_NAME_FEATURE_ROW_LAG_MS_MIN, featureRowLagStats.getMin(), tags);
statsd.gauge(GAUGE_NAME_FEATURE_ROW_LAG_MS_MAX, featureRowLagStats.getMax(), tags);
statsd.gauge(GAUGE_NAME_FEATURE_ROW_LAG_MS_MEAN, featureRowLagStats.getMean(), tags);
statsd.gauge(
GAUGE_NAME_FEATURE_ROW_LAG_MS_PERCENTILE_90, featureRowLagStats.getPercentile(90), tags);
statsd.gauge(
GAUGE_NAME_FEATURE_ROW_LAG_MS_PERCENTILE_95, featureRowLagStats.getPercentile(95), tags);
statsd.gauge(
GAUGE_NAME_FEATURE_ROW_LAG_MS_PERCENTILE_99, featureRowLagStats.getPercentile(99), tags);
}

for (Entry<String, DescriptiveStatistics> entry : featureNameToLagStats.entrySet()) {
String featureName = entry.getKey();
String[] tagsWithFeatureName = ArrayUtils.add(tags, FEATURE_TAG_KEY + ":" + featureName);
DescriptiveStatistics stats = entry.getValue();
if (stats.getN() > 0) {
statsd.gauge(GAUGE_NAME_FEATURE_VALUE_LAG_MS_MIN, stats.getMin(), tagsWithFeatureName);
statsd.gauge(GAUGE_NAME_FEATURE_VALUE_LAG_MS_MAX, stats.getMax(), tagsWithFeatureName);
statsd.gauge(GAUGE_NAME_FEATURE_VALUE_LAG_MS_MEAN, stats.getMean(), tagsWithFeatureName);
statsd.gauge(
GAUGE_NAME_FEATURE_VALUE_LAG_MS_PERCENTILE_90,
stats.getPercentile(90),
tagsWithFeatureName);
statsd.gauge(
GAUGE_NAME_FEATURE_VALUE_LAG_MS_PERCENTILE_95,
stats.getPercentile(95),
tagsWithFeatureName);
statsd.gauge(
GAUGE_NAME_FEATURE_VALUE_LAG_MS_PERCENTILE_99,
stats.getPercentile(99),
tagsWithFeatureName);
}
statsd.count(
"feature_row_ingested_count",
1,
STORE_TAG_KEY + ":" + getStoreName(),
FEATURE_SET_PROJECT_TAG_KEY + ":" + featureSetProject,
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName());

} catch (StatsDClientException e) {
log.warn("Unable to push metrics to server", e);
COUNT_NAME_FEATURE_VALUE_MISSING,
featureNameToMissingCount.get(featureName),
tagsWithFeatureName);
}
}
}
Loading