From a3fa78cd30d3d91845b64499dce0fda1a70e031c Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 31 Oct 2024 10:38:20 +0800 Subject: [PATCH] update --- .../prometheus/sink/PrometheusSink.java | 17 ++++++++++++----- .../prometheus/sink/PrometheusSinkFactory.java | 2 +- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSink.java b/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSink.java index 93d4e931e17..35ec257fc93 100644 --- a/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSink.java +++ b/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSink.java @@ -19,8 +19,8 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportMultiTableSink; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig; import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter; @@ -28,15 +28,16 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; public class PrometheusSink extends AbstractSimpleSink implements SupportMultiTableSink { protected final HttpParameter httpParameter = new HttpParameter(); - protected SeaTunnelRowType seaTunnelRowType; + protected CatalogTable catalogTable; protected ReadonlyConfig pluginConfig; - public PrometheusSink(ReadonlyConfig pluginConfig, SeaTunnelRowType rowType) { + public PrometheusSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) { this.pluginConfig = pluginConfig; httpParameter.setUrl(pluginConfig.get(HttpConfig.URL)); if (pluginConfig.getOptional(HttpConfig.HEADERS).isPresent()) { @@ -45,7 +46,7 @@ public PrometheusSink(ReadonlyConfig pluginConfig, SeaTunnelRowType rowType) { if (pluginConfig.getOptional(HttpConfig.PARAMS).isPresent()) { httpParameter.setHeaders(pluginConfig.get(HttpConfig.PARAMS)); } - this.seaTunnelRowType = rowType; + this.catalogTable = catalogTable; if (Objects.isNull(httpParameter.getHeaders())) { Map headers = new HashMap<>(); @@ -67,6 +68,12 @@ public String getPluginName() { @Override public PrometheusWriter createWriter(SinkWriter.Context context) { - return new PrometheusWriter(seaTunnelRowType, httpParameter, pluginConfig); + return new PrometheusWriter( + catalogTable.getSeaTunnelRowType(), httpParameter, pluginConfig); + } + + @Override + public Optional getWriteCatalogTable() { + return Optional.ofNullable(catalogTable); } } diff --git a/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSinkFactory.java b/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSinkFactory.java index dcd8e72c1a5..544f17c9a6f 100644 --- a/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSinkFactory.java +++ b/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSinkFactory.java @@ -39,7 +39,7 @@ public TableSink createSink(TableSinkFactoryContext context) { ReadonlyConfig readonlyConfig = context.getOptions(); CatalogTable catalogTable = context.getCatalogTable(); - return () -> new PrometheusSink(readonlyConfig, catalogTable.getSeaTunnelRowType()); + return () -> new PrometheusSink(readonlyConfig, catalogTable); } @Override