diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java index 977e002478..a82d574cac 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.dirtydata; import lombok.Builder; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.text.StringEscapeUtils; @@ -34,6 +35,9 @@ public class DirtyMessageWrapper { private static DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private String delimiter; + @Builder.Default + @Getter + private int retryTimes = 0; private String inlongGroupId; private String inlongStreamId; @@ -71,4 +75,8 @@ public String format() { .add(StringEscapeUtils.escapeXSI(formatData)) .toString(); } + + public void increaseRetry() { + retryTimes++; + } } diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java index 88e2e88a74..80cc596c26 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java @@ -21,13 +21,17 @@ import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; import org.apache.inlong.sdk.dataproxy.common.SendResult; -import org.apache.inlong.sdk.dataproxy.network.ProxysdkException; import com.google.common.base.Preconditions; import lombok.Builder; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.net.InetAddress; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; @Slf4j @Builder @@ -40,9 +44,14 @@ public class InlongSdkDirtySender { private String authId; private String authKey; private boolean ignoreErrors; + private int maxRetryTimes; + private int maxCallbackSize; + @Builder.Default + private boolean closed = false; - private SendMessageCallback callback; + private LinkedBlockingQueue dirtyDataQueue; private DefaultMessageSender sender; + private Executor executor; public void init() throws Exception { Preconditions.checkNotNull(inlongGroupId, "inlongGroupId cannot be null"); @@ -51,45 +60,79 @@ public void init() throws Exception { Preconditions.checkNotNull(authId, "authId cannot be null"); Preconditions.checkNotNull(authKey, "authKey cannot be null"); - this.callback = new LogCallBack(); ProxyClientConfig proxyClientConfig = new ProxyClientConfig(InetAddress.getLocalHost().getHostAddress(), true, inlongManagerAddr, inlongManagerPort, inlongGroupId, authId, authKey); proxyClientConfig.setReadProxyIPFromLocal(false); + proxyClientConfig.setAsyncCallbackSize(maxCallbackSize); this.sender = DefaultMessageSender.generateSenderByClusterId(proxyClientConfig); this.sender.setMsgtype(7); + + this.dirtyDataQueue = new LinkedBlockingQueue<>(maxCallbackSize); + this.executor = Executors.newSingleThreadExecutor(); + executor.execute(this::doSendDirtyMessage); log.info("init InlongSdkDirtySink successfully, target group={}, stream={}", inlongGroupId, inlongStreamId); } - public void sendDirtyMessage(DirtyMessageWrapper messageWrapper) - throws ProxysdkException { - sender.asyncSendMessage(inlongGroupId, inlongStreamId, messageWrapper.format().getBytes(), callback); + public void sendDirtyMessage(DirtyMessageWrapper messageWrapper) throws InterruptedException { + dirtyDataQueue.offer(messageWrapper, 10, TimeUnit.SECONDS); + } + + private void doSendDirtyMessage() { + while (!closed) { + try { + DirtyMessageWrapper messageWrapper = dirtyDataQueue.poll(); + if (messageWrapper == null) { + Thread.sleep(100L); + continue; + } + messageWrapper.increaseRetry(); + if (messageWrapper.getRetryTimes() > maxRetryTimes) { + log.error("failed to send dirty message after {} times, dirty data ={}", maxRetryTimes, + messageWrapper); + continue; + } + + sender.asyncSendMessage(inlongGroupId, inlongStreamId, + messageWrapper.format().getBytes(), new LogCallBack(messageWrapper)); + + } catch (Throwable t) { + log.error("failed to send inlong dirty message", t); + if (!ignoreErrors) { + throw new RuntimeException("writing dirty message to inlong sdk failed", t); + } + } + + } } public void close() { + closed = true; + dirtyDataQueue.clear(); if (sender != null) { sender.close(); } } + @Getter class LogCallBack implements SendMessageCallback { + private final DirtyMessageWrapper wrapper; + + public LogCallBack(DirtyMessageWrapper wrapper) { + this.wrapper = wrapper; + } + @Override public void onMessageAck(SendResult result) { - if (result == SendResult.OK) { - return; - } - log.error("failed to send inlong dirty message, response={}", result); - - if (!ignoreErrors) { - throw new RuntimeException("writing dirty message to inlong sdk failed, response=" + result); + if (SendResult.OK != result) { + dirtyDataQueue.offer(wrapper); } } @Override public void onException(Throwable e) { log.error("failed to send inlong dirty message", e); - if (!ignoreErrors) { throw new RuntimeException("writing dirty message to inlong sdk failed", e); } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java index 1ac2b60a1d..24c5dddecd 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.base.dirty; +import org.apache.inlong.sort.base.dirty.sink.DirtyServerType; import org.apache.inlong.sort.base.util.PatternReplaceUtils; import org.apache.flink.table.types.logical.LogicalType; @@ -63,6 +64,8 @@ public class DirtyData { * Dirty type */ private final DirtyType dirtyType; + + private final DirtyServerType serverType; /** * Dirty describe message, it is the cause of dirty data */ @@ -85,10 +88,11 @@ public class DirtyData { private final T data; public DirtyData(T data, String identifier, String labels, - String logTag, DirtyType dirtyType, String dirtyMessage, + String logTag, DirtyType dirtyType, DirtyServerType serverType, String dirtyMessage, @Nullable LogicalType rowType, long dataTime, String extParams) { this.data = data; this.dirtyType = dirtyType; + this.serverType = serverType; this.dirtyMessage = dirtyMessage; this.rowType = rowType; Map paramMap = genParamMap(); @@ -127,6 +131,10 @@ public DirtyType getDirtyType() { return dirtyType; } + public DirtyServerType getServerType() { + return serverType; + } + public String getIdentifier() { return identifier; } @@ -154,6 +162,7 @@ public static class Builder { private String labels; private String logTag; private DirtyType dirtyType = DirtyType.UNDEFINED; + private DirtyServerType serverType = DirtyServerType.UNDEFINED; private String dirtyMessage; private LogicalType rowType; private long dataTime; @@ -175,6 +184,11 @@ public Builder setDirtyType(DirtyType dirtyType) { return this; } + public Builder setServerType(DirtyServerType serverType) { + this.serverType = serverType; + return this; + } + public Builder setLabels(String labels) { this.labels = labels; return this; @@ -206,7 +220,7 @@ public Builder setRowType(LogicalType rowType) { } public DirtyData build() { - return new DirtyData<>(data, identifier, labels, logTag, dirtyType, + return new DirtyData<>(data, identifier, labels, logTag, dirtyType, serverType, dirtyMessage, rowType, dataTime, extParams); } } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java new file mode 100644 index 0000000000..63f993c146 --- /dev/null +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.dirty.sink; + +public enum DirtyServerType { + + UNDEFINED("Undefined"), + TUBE_MQ("TubeMQ"), + ICEBERG("Iceberg") + + ; + + private final String format; + + DirtyServerType(String format) { + this.format = format; + } + + public String format() { + return format; + } +} diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java index 5cb03f0f80..84581dd4ce 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java @@ -49,4 +49,6 @@ public class InlongSdkDirtyOptions implements Serializable { private String csvLineDelimiter = DEFAULT_CSV_LINE_DELIMITER; private String kvFieldDelimiter = DEFAULT_KV_FIELD_DELIMITER; private String kvEntryDelimiter = DEFAULT_KV_ENTRY_DELIMITER; + private int retryTimes; + private int maxCallbackSize; } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java index 4441cca830..8513f841bc 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java @@ -57,7 +57,6 @@ public void invoke(DirtyData dirtyData) throws Exception { Map labelMap = LabelUtils.parseLabels(dirtyData.getLabels()); String dataGroupId = Preconditions.checkNotNull(labelMap.get("groupId")); String dataStreamId = Preconditions.checkNotNull(labelMap.get("streamId")); - String serverType = Preconditions.checkNotNull(labelMap.get("serverType")); String dataflowId = Preconditions.checkNotNull(labelMap.get("dataflowId")); String dirtyMessage = formatData(dirtyData, labelMap); @@ -68,7 +67,7 @@ public void invoke(DirtyData dirtyData) throws Exception { .inlongStreamId(dataStreamId) .dataflowId(dataflowId) .dataTime(dirtyData.getDataTime()) - .serverType(serverType) + .serverType(dirtyData.getServerType().format()) .dirtyType(dirtyData.getDirtyType().format()) .dirtyMessage(dirtyData.getDirtyMessage()) .ext(dirtyData.getExtParams()) @@ -99,6 +98,8 @@ public void open(Configuration configuration) throws Exception { .ignoreErrors(options.isIgnoreSideOutputErrors()) .inlongGroupId(options.getSendToGroupId()) .inlongStreamId(options.getSendToStreamId()) + .maxRetryTimes(options.getRetryTimes()) + .maxCallbackSize(options.getMaxCallbackSize()) .build(); dirtySender.init(); } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java index 8d7e399c5c..053aa58364 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java @@ -35,6 +35,7 @@ import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT; import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS; import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_ENABLE; +import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_RETRIES; @Slf4j public class InlongSdkDirtySinkFactory implements DirtySinkFactory { @@ -77,6 +78,12 @@ public class InlongSdkDirtySinkFactory implements DirtySinkFactory { .noDefaultValue() .withDescription("The inlong stream id of dirty sink"); + private static final ConfigOption DIRTY_SIDE_OUTPUT_MAX_CALLBACK_SIZE = + ConfigOptions.key("dirty.side-output.inlong-sdk.max-callback-size") + .intType() + .defaultValue(100000) + .withDescription("The inlong stream id of dirty sink"); + @Override public DirtySink createDirtySink(DynamicTableFactory.Context context) { ReadableConfig config = Configuration.fromMap(context.getCatalogTable().getOptions()); @@ -95,8 +102,10 @@ private InlongSdkDirtyOptions getOptions(ReadableConfig config) { .csvFieldDelimiter(config.get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER)) .inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY)) .inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID)) - .ignoreSideOutputErrors(config.getOptional(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS).orElse(true)) - .enableDirtyLog(true) + .ignoreSideOutputErrors(config.get(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS)) + .retryTimes(config.get(DIRTY_SIDE_OUTPUT_RETRIES)) + .maxCallbackSize(config.get(DIRTY_SIDE_OUTPUT_MAX_CALLBACK_SIZE)) + .enableDirtyLog(config.get(DIRTY_SIDE_OUTPUT_LOG_ENABLE)) .build(); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java index a178aa57a8..94631e7cd3 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java @@ -20,6 +20,7 @@ import org.apache.inlong.sort.base.dirty.DirtyData; import org.apache.inlong.sort.base.dirty.DirtyOptions; import org.apache.inlong.sort.base.dirty.DirtyType; +import org.apache.inlong.sort.base.dirty.sink.DirtyServerType; import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricsCollector; @@ -143,6 +144,7 @@ public void deserialize(Message message, Collector out) throws IOExcept builder.setData(message.getData()) .setDirtyType(DirtyType.KEY_DESERIALIZE_ERROR) + .setServerType(DirtyServerType.TUBE_MQ) .setDirtyDataTime(dataTime) .setExtParams(message.getAttribute()) .setLabels(dirtyOptions.getLabels())