diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index d94ed6118d..b96da71e59 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -153,9 +153,9 @@ private ImportOptions getPipelineOptions(String jobName, List fe pipelineOptions.setJobName(jobName); if (metrics.isEnabled()) { pipelineOptions.setMetricsExporterType(metrics.getType()); - if (metrics.getType().equals("prometheus")) { - pipelineOptions.setPrometheusExporterAddress( - String.format("%s:%s", metrics.getHost(), metrics.getPort())); + if (metrics.getType().equals("statsd")) { + pipelineOptions.setStatsdHost(metrics.getHost()); + pipelineOptions.setStatsdPort(metrics.getPort()); } } return pipelineOptions; diff --git a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java index e7d4b70a90..a9b968bcb9 100644 --- a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java +++ b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java @@ -37,7 +37,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.sdk.PipelineResult; @@ -103,9 +102,9 @@ private ImportOptions getPipelineOptions(List featureSetSpecs, pipelineOptions.setProject(""); // set to default value to satisfy validation if (metrics.isEnabled()) { pipelineOptions.setMetricsExporterType(metrics.getType()); - if (metrics.getType().equals("prometheus")) { - pipelineOptions.setPrometheusExporterAddress( - String.format("%s:%s", metrics.getHost(), metrics.getPort())); + if (metrics.getType().equals("statsd")) { + pipelineOptions.setStatsdHost(metrics.getHost()); + pipelineOptions.setStatsdPort(metrics.getPort()); } } pipelineOptions.setBlockOnRun(false); diff --git a/infra/charts/feast/charts/prometheus-statsd-exporter/.helmignore b/infra/charts/feast/charts/prometheus-statsd-exporter/.helmignore new file mode 100644 index 0000000000..c13e3c8fbb --- /dev/null +++ b/infra/charts/feast/charts/prometheus-statsd-exporter/.helmignore @@ -0,0 +1,21 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*~ +# Various IDEs +.project +.idea/ +*.tmproj \ No newline at end of file diff --git a/infra/charts/feast/charts/prometheus-statsd-exporter/Chart.yaml b/infra/charts/feast/charts/prometheus-statsd-exporter/Chart.yaml new file mode 100644 index 0000000000..98a9356dcd --- /dev/null +++ b/infra/charts/feast/charts/prometheus-statsd-exporter/Chart.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +appVersion: 0.8.0 +description: A Helm chart for prometheus statsd-exporter Scrape metrics stored statsd +home: https://github.com/prometheus/statsd_exporter +keywords: + - prometheus + - statsd +maintainers: + - name: enflo + email: toniflorithomar@gmail.com +name: prometheus-statsd-exporter +version: 0.1.2 \ No newline at end of file diff --git a/infra/charts/feast/charts/prometheus-statsd-exporter/README.md b/infra/charts/feast/charts/prometheus-statsd-exporter/README.md new file mode 100644 index 0000000000..69eb33039b --- /dev/null +++ b/infra/charts/feast/charts/prometheus-statsd-exporter/README.md @@ -0,0 +1,56 @@ +# Prometheus statsd-exporter + + ## TL;DR; + + ```console +$ helm install incubator/prometheus-statsd-exporter +``` + + ## Introduction + + This chart bootstraps a prometheus-statsd-exporter deployment on a [Kubernetes](http://kubernetes.io) cluster using the [Helm](https://helm.sh) package manager. + + ## Installing the Chart + + To install the chart with the release name `my-release`: + + ```console +$ helm install incubator/prometheus-statsd-exporter --name my-release +``` + + + The command deploys prometheus-statsd-exporter on the Kubernetes cluster in the default configuration. The [configuration](#configuration) section lists the parameters that can be configured during installation. + + ## Uninstalling the Chart + + To uninstall/delete the `my-release` deployment: + + ```console +$ helm delete my-release +``` + + The command removes all the Kubernetes components associated with the chart and deletes the release. + + ## Configuration + + |Parameter | Description | Default | +|`extraArgs` | key:value list of extra arguments to give the binary | `{}` | +|`image.pullPolicy` | Image pull policy | `IfNotPresent` | +|`image.repository` | Image repository | `prom/statsd-exporter` | +|`image.tag` | Image tag | `v0.8.0` | +|`ingress.enabled` | enable ingress | `false` | +|`ingress.path` | ingress base path | `/` | +|`ingress.host` | Ingress accepted hostnames | `nil` | +|`ingress.tls` | Ingress TLS configuration | `[]` | +|`ingress.annotations` | Ingress annotations | `{}` | +|`service.type` | type of service | `ClusterIP` | +|`tolerations` | List of node taints to tolerate | `[]` | +|`resources` | pod resource requests & limits | `{}` | +| `persistence.enabled` | Create a volume to store data | true | + + Alternatively, a YAML file that specifies the values for the above parameters can be provided while installing the chart. For example, + + ```console +$ helm install incubator/prometheus-statsd-exporter --name my-release -f values.yaml +``` +> **Tip**: You can use the default [values.yaml](values.yaml) \ No newline at end of file diff --git a/infra/charts/feast/charts/prometheus-statsd-exporter/templates/NOTES.txt b/infra/charts/feast/charts/prometheus-statsd-exporter/templates/NOTES.txt new file mode 100644 index 0000000000..bbd06f118a --- /dev/null +++ b/infra/charts/feast/charts/prometheus-statsd-exporter/templates/NOTES.txt @@ -0,0 +1,17 @@ + +To verify that prometheus-statsd-exporter has started, run: + +{{- if contains "NodePort" .Values.service.type }} + export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ template "prometheus-statsd-exporter.fullname" . }}) + export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}") + echo http://$NODE_IP:$NODE_PORT +{{- else if contains "LoadBalancer" .Values.service.type }} + NOTE: It may take a few minutes for the LoadBalancer IP to be available. + You can watch the status of by running 'kubectl get svc --namespace {{ .Release.Namespace }} -w {{ template "prometheus-statsd-exporter.fullname" . }}' + + export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ template "prometheus-statsd-exporter.fullname" . }} -o jsonpath='{.status.loadBalancer.ingress[0].ip}') + echo http://$SERVICE_IP:{{ .Values.service.servicePort }} +{{- else if contains "ClusterIP" .Values.service.type }} + export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app={{ template "prometheus-statsd-exporter.name" . }},component={{ .Chart.Name }}" -o jsonpath="{.items[0].metadata.name}") + kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 9090 +{{- end }} \ No newline at end of file diff --git a/infra/charts/feast/charts/prometheus-statsd-exporter/templates/_helpers.tpl b/infra/charts/feast/charts/prometheus-statsd-exporter/templates/_helpers.tpl new file mode 100644 index 0000000000..1c5f01342b --- /dev/null +++ b/infra/charts/feast/charts/prometheus-statsd-exporter/templates/_helpers.tpl @@ -0,0 +1,44 @@ +{{/* vim: set filetype=mustache: */}} +{{/* +Expand the name of the chart. +*/}} +{{- define "prometheus-statsd-exporter.name" -}} +{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" -}} +{{- end -}} + +{{/* +Create a default fully qualified app name. +We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec). +If release name contains chart name it will be used as a full name. +*/}} +{{- define "prometheus-statsd-exporter.fullname" -}} +{{- if .Values.fullnameOverride -}} +{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" -}} +{{- else -}} +{{- $name := default .Chart.Name .Values.nameOverride -}} +{{- if contains $name .Release.Name -}} +{{- .Release.Name | trunc 63 | trimSuffix "-" -}} +{{- else -}} +{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" -}} +{{- end -}} +{{- end -}} +{{- end -}} + +{{/* +Create chart name and version as used by the chart label. +*/}} +{{- define "prometheus-statsd-exporter.chart" -}} +{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" -}} +{{- end -}} + + +{{/* +Create the name of the service account to use +*/}} +{{- define "prometheus-statsd-exporter.serviceAccountName" -}} +{{- if .Values.serviceAccount.enable -}} + {{ default (include "prometheus-statsd-expoter.fullname" .) .Values.serviceAccount.name }} +{{- else -}} + {{ default "default" .Values.serviceAccount.name }} +{{- end -}} +{{- end -}} \ No newline at end of file diff --git a/infra/charts/feast/charts/prometheus-statsd-exporter/templates/config.yaml b/infra/charts/feast/charts/prometheus-statsd-exporter/templates/config.yaml new file mode 100644 index 0000000000..0f9de1e953 --- /dev/null +++ b/infra/charts/feast/charts/prometheus-statsd-exporter/templates/config.yaml @@ -0,0 +1,14 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ template "prometheus-statsd-exporter.fullname" . }}-config + labels: + app: {{ template "prometheus-statsd-exporter.name" . }} + chart: {{ .Chart.Name }}-{{ .Chart.Version }} + release: {{ .Release.Name }} + heritage: {{ .Release.Service }} +data: + statsd_mappings.yaml: | +# +# defaults: +# ttl: "45s" \ No newline at end of file diff --git a/infra/charts/feast/charts/prometheus-statsd-exporter/templates/deployment.yaml b/infra/charts/feast/charts/prometheus-statsd-exporter/templates/deployment.yaml new file mode 100644 index 0000000000..47308ef89b --- /dev/null +++ b/infra/charts/feast/charts/prometheus-statsd-exporter/templates/deployment.yaml @@ -0,0 +1,80 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: {{ template "prometheus-statsd-exporter.fullname" . }} + labels: + app: {{ template "prometheus-statsd-exporter.name" . }} + chart: {{ .Chart.Name }}-{{ .Chart.Version }} + release: {{ .Release.Name }} + heritage: {{ .Release.Service }} +spec: + replicas: {{ .Values.statsdexporter.replicaCount }} + selector: + matchLabels: + app: {{ template "prometheus-statsd-exporter.name" . }} + release: {{ .Release.Name }} + template: + metadata: + annotations: +{{ toYaml .Values.statsdexporter.annotations | indent 8 }} + labels: + app: {{ template "prometheus-statsd-exporter.name" . }} + release: {{ .Release.Name }} + spec: + serviceAccountName: {{ template "prometheus-statsd-exporter.serviceAccountName" . }} + containers: + - name: {{ .Chart.Name }} + image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + args: + - --statsd.mapping-config=/etc/statsd_conf/statsd_mappings.yaml + {{- range $key, $value := .Values.statsdexporter.extraArgs }} + - --{{ $key }}={{ $value }} + {{- end }} + volumeMounts: + - mountPath: /data + name: {{ .Values.persistentVolume.name }} + - name: statsd-config + mountPath: /etc/statsd_conf + env: + - name: HOME + value: /data + ports: + - name: metrics + containerPort: 9102 + protocol: TCP + - name: statsd-tcp + containerPort: 9125 + protocol: TCP + - name: statsd-udp + containerPort: 9125 + protocol: UDP + livenessProbe: + httpGet: + path: /#/status + port: 9102 + initialDelaySeconds: 10 + timeoutSeconds: 10 + readinessProbe: + httpGet: + path: /#/status + port: 9102 + initialDelaySeconds: 10 + timeoutSeconds: 10 + resources: +{{ toYaml .Values.statsdexporter.resources | indent 12 }} +{{- if .Values.statsdexporter.nodeSelector }} + nodeSelector: +{{ toYaml .Values.statsdexporter.nodeSelector | indent 8 }} + {{- end }} + volumes: + - name: statsd-config + configMap: + name: {{ template "prometheus-statsd-exporter.fullname" . }}-config + - name: {{ .Values.persistentVolume.name }} + {{- if .Values.persistentVolume.enabled }} + persistentVolumeClaim: + claimName: {{ if .Values.persistentVolume.claimName }}{{- else }}{{ template "prometheus-statsd-exporter.fullname" . }}{{- end }} + {{- else }} + emptyDir: {} + {{- end -}} diff --git a/infra/charts/feast/charts/prometheus-statsd-exporter/templates/pvc.yaml b/infra/charts/feast/charts/prometheus-statsd-exporter/templates/pvc.yaml new file mode 100644 index 0000000000..e314920059 --- /dev/null +++ b/infra/charts/feast/charts/prometheus-statsd-exporter/templates/pvc.yaml @@ -0,0 +1,23 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + labels: + app: {{ template "prometheus-statsd-exporter.fullname" . }} + chart: {{ .Chart.Name }}-{{ .Chart.Version }} + component: "{{ .Chart.Name }}" + heritage: {{ .Release.Service }} + release: {{ .Release.Name }} + name: {{ template "prometheus-statsd-exporter.fullname" . }} +spec: + accessModes: +{{ toYaml .Values.persistentVolume.accessModes | indent 4 }} +{{- if .Values.persistentVolume.storageClass }} +{{- if (eq "-" .Values.persistentVolume.storageClass) }} + storageClassName: "" +{{- else }} + storageClassName: "{{ .Values.persistentVolume.storageClass }}" +{{- end }} +{{- end }} + resources: + requests: + storage: "{{ .Values.persistentVolume.size }}" \ No newline at end of file diff --git a/infra/charts/feast/charts/prometheus-statsd-exporter/templates/service.yaml b/infra/charts/feast/charts/prometheus-statsd-exporter/templates/service.yaml new file mode 100644 index 0000000000..88d01b24a6 --- /dev/null +++ b/infra/charts/feast/charts/prometheus-statsd-exporter/templates/service.yaml @@ -0,0 +1,51 @@ +apiVersion: v1 +kind: Service +metadata: +{{- if .Values.service.annotations }} + annotations: +{{ toYaml .Values.service.annotations | indent 4 }} +{{- end }} + labels: + app: {{ template "prometheus-statsd-exporter.fullname" . }} + chart: {{ .Chart.Name }}-{{ .Chart.Version }} + component: "{{ .Chart.Name }}" + heritage: {{ .Release.Service }} + release: {{ .Release.Name }} +{{- if .Values.service.labels }} +{{ toYaml .Values.service.labels | indent 4 }} +{{- end }} + name: {{ template "prometheus-statsd-exporter.fullname" . }} +spec: +{{- if .Values.service.clusterIP }} + clusterIP: {{ .Values.service.clusterIP }} +{{- end }} +{{- if .Values.service.externalIPs }} + externalIPs: +{{ toYaml .Values.service.externalIPs | indent 4 }} +{{- end }} +{{- if .Values.service.loadBalancerIP }} + loadBalancerIP: {{ .Values.service.loadBalancerIP }} +{{- end }} +{{- if .Values.service.loadBalancerSourceRanges }} + loadBalancerSourceRanges: + {{- range $cidr := .Values.service.loadBalancerSourceRanges }} + - {{ $cidr }} + {{- end }} +{{- end }} + ports: + - name: metrics + port: {{ .Values.service.metricsPort }} + protocol: TCP + targetPort: 9102 + - name: statsd-tcp + port: {{ .Values.service.statsdPort }} + protocol: TCP + targetPort: 9125 + - name: statsd-udp + port: {{ .Values.service.statsdPort }} + protocol: UDP + targetPort: 9125 + selector: + app: {{ template "prometheus-statsd-exporter.name" . }} + release: {{ .Release.Name }} + type: "{{ .Values.service.type }}" \ No newline at end of file diff --git a/infra/charts/feast/charts/prometheus-statsd-exporter/templates/serviceaccount.yaml b/infra/charts/feast/charts/prometheus-statsd-exporter/templates/serviceaccount.yaml new file mode 100644 index 0000000000..8e80777835 --- /dev/null +++ b/infra/charts/feast/charts/prometheus-statsd-exporter/templates/serviceaccount.yaml @@ -0,0 +1,12 @@ +{{- if .Values.serviceAccount.enable -}} +apiVersion: v1 +kind: ServiceAccount +metadata: + labels: + app: {{ template "prometheus-statsd-exporter.fullname" . }} + chart: {{ .Chart.Name }}-{{ .Chart.Version }} + component: "{{ .Values.serviceaccount.componentName }}" + heritage: {{ .Release.Service }} + release: {{ .Release.Name }} + name: {{ template "prometheus-statsd-exporter.fullname" . }} +{{- end -}} \ No newline at end of file diff --git a/infra/charts/feast/charts/prometheus-statsd-exporter/values.yaml b/infra/charts/feast/charts/prometheus-statsd-exporter/values.yaml new file mode 100644 index 0000000000..f2d523771e --- /dev/null +++ b/infra/charts/feast/charts/prometheus-statsd-exporter/values.yaml @@ -0,0 +1,113 @@ +image: + repository: prom/statsd-exporter + tag: v0.12.1 + pullPolicy: IfNotPresent + +service: + annotations: {} + labels: {} + clusterIP: "" + ## List of IP addresses at which the alertmanager service is available + ## Ref: https://kubernetes.io/docs/user-guide/services/#external-ips + ## + externalIPs: [] + loadBalancerIP: "" + loadBalancerSourceRanges: [] + servicePort: 80 + type: ClusterIP + metricsPort: 9102 + statsdPort: 9125 + +statsdexporter: + podAnnotations: + + extraArgs: {} + # - --persistence.file=data-perst + + resources: {} + # We usually recommend not to specify default resources and to leave this as a conscious + # choice for the user. This also increases chances charts run on environments with little + # resources, such as Minikube. If you do want to specify resources, uncomment the following + # lines, adjust them as necessary, and remove the curly braces after 'resources:'. + + # limits: + # cpu: 500m + # memory: 1Gi + # requests: + # cpu: 500m + # memory: 512Mi + + ingress: + ## Enable Ingress. + ## + enabled: false + + ## Annotations. + ## + # annotations: + # kubernetes.io/ingress.class: nginx + # kubernetes.io/tls-acme: 'true' + + ## Hostnames. + ## Must be provided if Ingress is enabled. + ## + # hosts: + # - prometheusstatsdexoirter.domain.com + + ## TLS configuration. + ## Secrets must be manually created in the namespace. + ## + # tls: + # - secretName: prometheusstatsdexoirter-tls + # hosts: + # - prometheusstatsdexoirter.domain.com + + tolerations: {} + # - effect: NoSchedule + # operator: Exists + + + replicaCount: 1 + + ## Affinity for pod assignment + ## Ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity + affinity: {} + nodeSelector: {} + +serviceAccount: + ## If false, serviceaccount will not be installed + ## + enable: false + componentName: prometheus-statsd-exporter + +persistentVolume: + name: storage-volume + claimName: prometheus-statsd-exporter + + ## If true, statsd-export will create/use a Persistent Volume Claim + ## If false, use emptyDir + ## + enabled: true + + ## statsd-exporter data Persistent Volume access modes + ## Must match those of existing PV or dynamic provisioner + ## Ref: http://kubernetes.io/docs/user-guide/persistent-volumes/ + ## + accessModes: + - ReadWriteOnce + + ## statsd-exporter data Persistent Volume Claim annotations + ## + annotations: {} + + ## statsd-exporter data Persistent Volume existing claim name + ## Requires statsd-exporter.persistentVolume.enabled: true + ## If defined, PVC must be created manually before volume will be bound + existingClaim: "" + + ## statsd-exporter data Persistent Volume mount root path + ## + mountPath: /data + size: 20Gi + subPath: "" + storageClass: {} \ No newline at end of file diff --git a/infra/charts/feast/requirements.lock b/infra/charts/feast/requirements.lock index a8e1434dd6..f2c77d4477 100644 --- a/infra/charts/feast/requirements.lock +++ b/infra/charts/feast/requirements.lock @@ -14,9 +14,6 @@ dependencies: - name: kafka repository: https://kubernetes-charts-incubator.storage.googleapis.com/ version: 0.17.0 -- name: prometheus-pushgateway - repository: https://kubernetes-charts.storage.googleapis.com - version: 1.0.1 - name: nginx-ingress repository: https://kubernetes-charts.storage.googleapis.com version: 1.24.2 diff --git a/infra/charts/feast/requirements.yaml b/infra/charts/feast/requirements.yaml index ab9def9cfe..6c6b728e21 100644 --- a/infra/charts/feast/requirements.yaml +++ b/infra/charts/feast/requirements.yaml @@ -22,10 +22,10 @@ dependencies: version: 0.17.0 repository: "@incubator" condition: kafka.provision -- name: prometheus-pushgateway - version: 1.0.1 - repository: "@stable" - condition: prometheus-pushgateway.provision +- name: prometheus-statsd-exporter + version: 0.1.2 + repository: "@local" + condition: prometheus-statsd-exporter.provision - name: nginx-ingress version: 1.24.2 repository: "@stable" diff --git a/infra/charts/feast/values.yaml b/infra/charts/feast/values.yaml index be3b413829..9ea235fcfa 100644 --- a/infra/charts/feast/values.yaml +++ b/infra/charts/feast/values.yaml @@ -20,14 +20,12 @@ feast-core: # subnetwork: default metrics: enabled: true - ## Type of metrics sink. Only prometheus is currently supported. - type: prometheus - ## Host of the metrics sink. In the case of prometheus, this is the host of the prometheus - ## pushGateway to sink metrics to. - host: feast-prometheus-pushgateway - ## Port of the metrics sink. In the case of prometheus, this is the port of the prometheus - ## pushGateway to sink metrics to. - port: 9091 + ## Type of metrics sink. Only statsd is currently supported. + type: statsd + ## Host of the metrics sink. In the case of statsd, this is the host of the statsd exporter + host: feast-prometheus-statsd-exporter + ## Port of the metrics sink. + port: 9125 stream: type: kafka @@ -295,7 +293,7 @@ kafka: cpu: 200m memory: 128Mi -prometheus-pushgateway: +prometheus-statsd-exporter: provision: true redis: diff --git a/ingestion/pom.xml b/ingestion/pom.xml index 03477f7a23..eb89233518 100644 --- a/ingestion/pom.xml +++ b/ingestion/pom.xml @@ -218,12 +218,6 @@ jedis - - io.prometheus - simpleclient_pushgateway - 0.7.0 - - org.slf4j slf4j-api @@ -255,6 +249,11 @@ com.google.guava guava + + com.datadoghq + java-dogstatsd-client + 2.8.1 + diff --git a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java index 20929eb3da..1417d22a4d 100644 --- a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java +++ b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java @@ -21,7 +21,6 @@ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.direct.DirectOptions; import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Default.Boolean; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.Validation.Required; @@ -77,11 +76,18 @@ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions, void setMetricsExporterType(String metricsExporterType); @Description( - "Address to write the metrics to. Required if the metrics exporter is set to prometheus." + "Host to write the metrics to. Required if the metrics exporter is set to StatsD." ) - @Default.String("localhost:9091") - String getPrometheusExporterAddress(); + @Default.String("localhost") + String getStatsdHost(); - void setPrometheusExporterAddress(String prometheusExporterAddress); + void setStatsdHost(String StatsdHost); + @Description( + "Port on StatsD server to write metrics to. Required if the metrics exporter is set to StatsD." + ) + @Default.Integer(8125) + int getStatsdPort(); + + void setStatsdPort(int StatsdPort); } diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteDeadletterRowMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteDeadletterRowMetricsDoFn.java index 407dc705d2..a3c814158f 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteDeadletterRowMetricsDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteDeadletterRowMetricsDoFn.java @@ -1,12 +1,11 @@ package feast.ingestion.transform.metrics; import com.google.auto.value.AutoValue; +import com.timgroup.statsd.NonBlockingStatsDClient; +import com.timgroup.statsd.StatsDClient; +import com.timgroup.statsd.StatsDClientException; import feast.core.FeatureSetProto.FeatureSetSpec; import feast.ingestion.values.FailedElement; -import io.prometheus.client.CollectorRegistry; -import io.prometheus.client.Gauge; -import io.prometheus.client.exporter.PushGateway; -import java.io.IOException; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; import org.slf4j.Logger; @@ -17,6 +16,9 @@ public abstract class WriteDeadletterRowMetricsDoFn extends private static final Logger log = org.slf4j.LoggerFactory .getLogger(WriteDeadletterRowMetricsDoFn.class); + + private final String INGESTION_JOB_NAME_KEY = "ingestion_job_name"; + private final String METRIC_PREFIX = "feast_ingestion"; private final String STORE_TAG_KEY = "feast_store"; private final String FEATURE_SET_NAME_TAG_KEY = "feast_featureSet_name"; private final String FEATURE_SET_VERSION_TAG_KEY = "feast_featureSet_version"; @@ -25,7 +27,11 @@ public abstract class WriteDeadletterRowMetricsDoFn extends public abstract FeatureSetSpec getFeatureSetSpec(); - public abstract String getPgAddress(); + public abstract String getStatsdHost(); + + public abstract int getStatsdPort(); + + public StatsDClient statsd; public static WriteDeadletterRowMetricsDoFn.Builder newBuilder() { return new AutoValue_WriteDeadletterRowMetricsDoFn.Builder(); @@ -38,33 +44,39 @@ public abstract static class Builder { public abstract Builder setFeatureSetSpec(FeatureSetSpec featureSetSpec); - public abstract Builder setPgAddress(String pgAddress); + public abstract Builder setStatsdHost(String statsdHost); + + public abstract Builder setStatsdPort(int statsdPort); public abstract WriteDeadletterRowMetricsDoFn build(); + + } + + @Setup + public void setup() { + statsd = new NonBlockingStatsDClient( + METRIC_PREFIX, + getStatsdHost(), + getStatsdPort() + ); } @ProcessElement public void processElement(ProcessContext c) { - CollectorRegistry registry = new CollectorRegistry(); - FeatureSetSpec featureSetSpec = getFeatureSetSpec(); - Gauge rowCount = Gauge.build().name("deadletter_count") - .help("number of rows that were failed to be processed") - .labelNames(STORE_TAG_KEY, FEATURE_SET_NAME_TAG_KEY, FEATURE_SET_VERSION_TAG_KEY) - .register(registry); - - rowCount - .labels(getStoreName(), featureSetSpec.getName(), - String.valueOf(featureSetSpec.getVersion())); + long rowCount = 0; for (FailedElement ignored : c.element().getValue()) { - rowCount.inc(); + rowCount++; } try { - PushGateway pg = new PushGateway(getPgAddress()); - pg.pushAdd(registry, c.getPipelineOptions().getJobName()); - } catch (IOException e) { + statsd.count("deadletter_row_count", rowCount, + STORE_TAG_KEY + ":" + getStoreName(), + FEATURE_SET_NAME_TAG_KEY + ":" + featureSetSpec.getName(), + FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetSpec.getVersion(), + INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); + } catch (StatsDClientException e) { log.warn("Unable to push metrics to server", e); } } diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java index d0f31bbf66..b37947d936 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java @@ -51,7 +51,7 @@ public PDone expand(PCollectionTuple input) { ImportOptions options = input.getPipeline().getOptions() .as(ImportOptions.class); switch (options.getMetricsExporterType()) { - case "prometheus": + case "statsd": input.get(getFailureTag()) .apply("Window records", @@ -59,7 +59,8 @@ public PDone expand(PCollectionTuple input) { .apply("Write deadletter metrics", ParDo.of( WriteDeadletterRowMetricsDoFn.newBuilder() .setFeatureSetSpec(getFeatureSetSpec()) - .setPgAddress(options.getPrometheusExporterAddress()) + .setStatsdHost(options.getStatsdHost()) + .setStatsdPort(options.getStatsdPort()) .setStoreName(getStoreName()) .build())); @@ -69,7 +70,8 @@ public PDone expand(PCollectionTuple input) { .apply("Write row metrics", ParDo .of(WriteRowMetricsDoFn.newBuilder() .setFeatureSetSpec(getFeatureSetSpec()) - .setPgAddress(options.getPrometheusExporterAddress()) + .setStatsdHost(options.getStatsdHost()) + .setStatsdPort(options.getStatsdPort()) .setStoreName(getStoreName()) .build())); diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java index 6a713cf9da..d3401291b3 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java @@ -1,14 +1,13 @@ package feast.ingestion.transform.metrics; import com.google.auto.value.AutoValue; +import com.timgroup.statsd.NonBlockingStatsDClient; +import com.timgroup.statsd.StatsDClient; +import com.timgroup.statsd.StatsDClientException; import feast.core.FeatureSetProto.FeatureSetSpec; import feast.types.FeatureRowProto.FeatureRow; import feast.types.FieldProto.Field; import feast.types.ValueProto.Value.ValCase; -import io.prometheus.client.CollectorRegistry; -import io.prometheus.client.Summary; -import io.prometheus.client.exporter.PushGateway; -import java.io.IOException; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; import org.slf4j.Logger; @@ -17,16 +16,23 @@ public abstract class WriteRowMetricsDoFn extends DoFn>, Void> { private static final Logger log = org.slf4j.LoggerFactory.getLogger(WriteRowMetricsDoFn.class); + + private final String METRIC_PREFIX = "feast_ingestion"; private final String STORE_TAG_KEY = "feast_store"; private final String FEATURE_SET_NAME_TAG_KEY = "feast_featureSet_name"; private final String FEATURE_SET_VERSION_TAG_KEY = "feast_featureSet_version"; private final String FEATURE_TAG_KEY = "feast_feature_name"; + private final String INGESTION_JOB_NAME_KEY = "ingestion_job_name"; public abstract String getStoreName(); public abstract FeatureSetSpec getFeatureSetSpec(); - public abstract String getPgAddress(); + public abstract String getStatsdHost(); + + public abstract int getStatsdPort(); + + public StatsDClient statsd; public static Builder newBuilder() { return new AutoValue_WriteRowMetricsDoFn.Builder(); @@ -39,55 +45,73 @@ public abstract static class Builder { public abstract Builder setFeatureSetSpec(FeatureSetSpec featureSetSpec); - public abstract Builder setPgAddress(String pgAddress); + public abstract Builder setStatsdHost(String statsdHost); + + public abstract Builder setStatsdPort(int statsdPort); public abstract WriteRowMetricsDoFn build(); } + @Setup + public void setup() { + statsd = new NonBlockingStatsDClient( + METRIC_PREFIX, + getStatsdHost(), + getStatsdPort() + ); + } @ProcessElement public void processElement(ProcessContext c) { - CollectorRegistry registry = new CollectorRegistry(); - - Summary rowLag = Summary.build().name("row_lag_millis") - .help("delta between processing and event timestamps in millis") - .quantile(0.5, 0.05) - .quantile(0.9, 0.01) - .quantile(0.99, 0.001) - .labelNames(STORE_TAG_KEY, FEATURE_SET_NAME_TAG_KEY, FEATURE_SET_VERSION_TAG_KEY) - .register(registry); - Summary featureLag = Summary.build().name("feature_lag_millis") - .help("delta between processing and event timestamps in millis") - .quantile(0.5, 0.05) - .quantile(0.9, 0.01) - .quantile(0.99, 0.001) - .labelNames(STORE_TAG_KEY, FEATURE_SET_NAME_TAG_KEY, FEATURE_SET_VERSION_TAG_KEY, - FEATURE_TAG_KEY) - .register(registry); - - Long currentTimestamp = System.currentTimeMillis(); FeatureSetSpec featureSetSpec = getFeatureSetSpec(); - for (FeatureRow row : c.element().getValue()) { - long eventTimestamp = row.getEventTimestamp().getSeconds() * 1000; - long lag = currentTimestamp - eventTimestamp; - rowLag - .labels(getStoreName(), featureSetSpec.getName(), String.valueOf(featureSetSpec.getVersion())) - .observe(lag); - for (Field field : row.getFieldsList()) { - if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) { - featureLag - .labels(getStoreName(), featureSetSpec.getName(), - String.valueOf(featureSetSpec.getVersion()), field.getName()) - .observe(lag); + long rowCount = 0; + long missingValueCount = 0; + + try { + for (FeatureRow row : c.element().getValue()) { + rowCount++; + long eventTimestamp = com.google.protobuf.util.Timestamps.toMillis(row.getEventTimestamp()); + + statsd.gauge("feature_row_lag_ms", System.currentTimeMillis() - eventTimestamp, + STORE_TAG_KEY + ":" + getStoreName(), + FEATURE_SET_NAME_TAG_KEY + ":" + featureSetSpec.getName(), + FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetSpec.getVersion(), + INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); + + statsd.gauge("feature_row_event_time_epoch_ms", eventTimestamp, + STORE_TAG_KEY + ":" + getStoreName(), + FEATURE_SET_NAME_TAG_KEY + ":" + featureSetSpec.getName(), + FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetSpec.getVersion(), + INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); + + for (Field field : row.getFieldsList()) { + if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) { + statsd.gauge("feature_value_lag_ms", System.currentTimeMillis() - eventTimestamp, + STORE_TAG_KEY + ":" + getStoreName(), + FEATURE_SET_NAME_TAG_KEY + ":" + featureSetSpec.getName(), + FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetSpec.getVersion(), + FEATURE_TAG_KEY + ":" + field.getName(), + INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); + } else { + missingValueCount++; + } } } - } - try { - PushGateway pg = new PushGateway(getPgAddress()); - pg.pushAdd(registry, c.getPipelineOptions().getJobName()); - } catch (IOException e) { + statsd.count("feature_row_ingested_count", rowCount, + STORE_TAG_KEY + ":" + getStoreName(), + FEATURE_SET_NAME_TAG_KEY + ":" + featureSetSpec.getName(), + FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetSpec.getVersion(), + INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); + + statsd.count("feature_row_missing_value_count", missingValueCount, + STORE_TAG_KEY + ":" + getStoreName(), + FEATURE_SET_NAME_TAG_KEY + ":" + featureSetSpec.getName(), + FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetSpec.getVersion(), + INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); + + } catch (StatsDClientException e) { log.warn("Unable to push metrics to server", e); } } diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java index d08e569518..c1fb0e63a9 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java @@ -46,7 +46,7 @@ public class ImportJobTest { private static final Logger LOGGER = LoggerFactory.getLogger(ImportJobTest.class.getName()); private static final String KAFKA_HOST = "localhost"; - private static final int KAFKA_PORT = 9093; + private static final int KAFKA_PORT = 19092; private static final String KAFKA_BOOTSTRAP_SERVERS = KAFKA_HOST + ":" + KAFKA_PORT; private static final short KAFKA_REPLICATION_FACTOR = 1; private static final String KAFKA_TOPIC = "topic_1"; @@ -119,7 +119,7 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() options.setProject(""); options.setBlockOnRun(false); - int inputSize = 4096; + int inputSize = 128; List input = new ArrayList<>(); Map expected = new HashMap<>(); diff --git a/ingestion/src/test/java/feast/test/TestUtil.java b/ingestion/src/test/java/feast/test/TestUtil.java index 8c03d6da17..b0fa4cbd10 100644 --- a/ingestion/src/test/java/feast/test/TestUtil.java +++ b/ingestion/src/test/java/feast/test/TestUtil.java @@ -137,8 +137,8 @@ public static void publishFeatureRowsToKafka(String bootstrapServers, String top */ public static FeatureRow createRandomFeatureRow(FeatureSetSpec spec) { ThreadLocalRandom random = ThreadLocalRandom.current(); - int randomStringSizeMaxSize = 16; - return createRandomFeatureRow(spec, random.nextInt(0, randomStringSizeMaxSize)); + int randomStringSizeMaxSize = 12; + return createRandomFeatureRow(spec, random.nextInt(0, randomStringSizeMaxSize) + 4); } /**