Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add suport for custom S3 object tagging #690

Merged
merged 5 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {

public static final String S3_OBJECT_TAGGING_CONFIG = "s3.object.tagging";
public static final boolean S3_OBJECT_TAGGING_DEFAULT = false;
public static final String S3_OBJECT_TAGGING_EXTRA_KV = "s3.object.tagging.key.value.pairs";
public static final String S3_OBJECT_TAGGING_EXTRA_KV_DEFAULT = "";

public static final String S3_OBJECT_BEHAVIOR_ON_TAGGING_ERROR_CONFIG =
"s3.object.behavior.on.tagging.error";
Expand Down Expand Up @@ -318,6 +320,18 @@ public static ConfigDef newConfigDef() {
"S3 Object Tagging"
);

configDef.define(
S3_OBJECT_TAGGING_EXTRA_KV,
Type.LIST,
S3_OBJECT_TAGGING_EXTRA_KV_DEFAULT,
Importance.LOW,
"Additional S3 tag key value pairs",
group,
++orderInGroup,
Width.LONG,
"S3 Object Tagging Extra Key Value pairs"
);

configDef.define(
S3_OBJECT_BEHAVIOR_ON_TAGGING_ERROR_CONFIG,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
Expand Down Expand Up @@ -76,6 +77,8 @@ public class TopicPartitionWriter {
private final Queue<SinkRecord> buffer;
private final SinkTaskContext context;
private final boolean isTaggingEnabled;
private final List<String> extraTagKeyValuePair;
private HashMap<String, String> hashMapTag;
private final boolean ignoreTaggingErrors;
private int recordCount;
private final int flushSize;
Expand Down Expand Up @@ -144,6 +147,8 @@ public TopicPartitionWriter(TopicPartition tp,
}

isTaggingEnabled = connectorConfig.getBoolean(S3SinkConnectorConfig.S3_OBJECT_TAGGING_CONFIG);
extraTagKeyValuePair = connectorConfig.getList(S3SinkConnectorConfig.S3_OBJECT_TAGGING_EXTRA_KV);
getS3Tag();
ignoreTaggingErrors = connectorConfig.getString(
S3SinkConnectorConfig.S3_OBJECT_BEHAVIOR_ON_TAGGING_ERROR_CONFIG)
.equalsIgnoreCase(S3SinkConnectorConfig.IgnoreOrFailBehavior.IGNORE.toString());
Expand Down Expand Up @@ -190,6 +195,16 @@ public TopicPartitionWriter(TopicPartition tp,
setNextScheduledRotation();
}

private void getS3Tag() {
hashMapTag = new HashMap<>();
if (extraTagKeyValuePair.size() != 0) {
for (int i = 0; i < extraTagKeyValuePair.size(); i++) {
String[] singleKv = extraTagKeyValuePair.get(i).split(":");
hashMapTag.put(singleKv[0], singleKv[1]);
}
}
}

private enum State {
WRITE_STARTED,
WRITE_PARTITION_PAUSED,
Expand Down Expand Up @@ -637,7 +652,11 @@ private void commitFiles() {
String encodedPartition = entry.getKey();
commitFile(encodedPartition);
if (isTaggingEnabled) {
RetryUtil.exponentialBackoffRetry(() -> tagFile(encodedPartition, entry.getValue()),
RetryUtil.exponentialBackoffRetry(() -> tagFile(
encodedPartition,
entry.getValue(),
hashMapTag
),
ConnectException.class,
connectorConfig.getInt(S3_PART_RETRIES_CONFIG),
connectorConfig.getLong(S3_RETRY_BACKOFF_CONFIG)
Expand Down Expand Up @@ -672,7 +691,10 @@ private void commitFile(String encodedPartition) {
}
}

private void tagFile(String encodedPartition, String s3ObjectPath) {
private void tagFile(
String encodedPartition,
String s3ObjectPath,
Map<String,String> extraHashMapTag) {
Long startOffset = startOffsets.get(encodedPartition);
Long endOffset = endOffsets.get(encodedPartition);
Long recordCount = recordCounts.get(encodedPartition);
Expand All @@ -695,7 +717,9 @@ private void tagFile(String encodedPartition, String s3ObjectPath) {
tags.put("startOffset", Long.toString(startOffset));
tags.put("endOffset", Long.toString(endOffset));
tags.put("recordCount", Long.toString(recordCount));

if (extraHashMapTag != null) {
tags.putAll(extraHashMapTag);
}
try {
storage.addTags(s3ObjectPath, tags);
log.info("Tagged S3 object {} with starting offset {}, ending offset {}, record count {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -337,6 +338,15 @@ public void testConfigurableS3ObjectTaggingConfigs() {
properties.put(S3SinkConnectorConfig.S3_OBJECT_TAGGING_CONFIG, "true");
connectorConfig = new S3SinkConnectorConfig(properties);
assertEquals(true, connectorConfig.get(S3SinkConnectorConfig.S3_OBJECT_TAGGING_CONFIG));
assertEquals(new ArrayList<String>(), connectorConfig.get(S3SinkConnectorConfig.S3_OBJECT_TAGGING_EXTRA_KV));

properties.put(S3SinkConnectorConfig.S3_OBJECT_TAGGING_EXTRA_KV, "key1:value1,key2:value2");
List<String> expectedConfigKeyValuePair = new ArrayList<String>() {{
add("key1:value1");
add("key2:value2");
}};
connectorConfig = new S3SinkConnectorConfig(properties);
assertEquals(expectedConfigKeyValuePair, connectorConfig.get(S3SinkConnectorConfig.S3_OBJECT_TAGGING_EXTRA_KV));

properties.put(S3SinkConnectorConfig.S3_OBJECT_TAGGING_CONFIG, "false");
connectorConfig = new S3SinkConnectorConfig(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,44 @@ public void testAddingS3ObjectTags() throws Exception{
verifyTags(expectedTaggedFiles);
}

@Test
public void testAddingAdditionalS3ObjectTags() throws Exception{
// Setting size-based rollup to 10 but will produce fewer records. Commit should not happen.
localProps.put(S3SinkConnectorConfig.S3_OBJECT_TAGGING_CONFIG, "true");
localProps.put(S3SinkConnectorConfig.S3_OBJECT_TAGGING_EXTRA_KV, "key1:value1,key2:value2");
setUp();

// Define the partitioner
Partitioner<?> partitioner = new DefaultPartitioner<>();
partitioner.configure(parsedConfig);
TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(
TOPIC_PARTITION, storage, writerProvider, partitioner, connectorConfig, context, null);

String key = "key";
Schema schema = createSchema();
List<Struct> records = createRecordBatches(schema, 3, 3);

Collection<SinkRecord> sinkRecords = createSinkRecords(records, key, schema);

for (SinkRecord record : sinkRecords) {
topicPartitionWriter.buffer(record);
}

// Test actual write
topicPartitionWriter.write();
topicPartitionWriter.close();

// Check expected s3 object tags
String dirPrefix = partitioner.generatePartitionedPath(TOPIC, "partition=" + PARTITION);
Map<String, List<Tag>> expectedTaggedFiles = new HashMap<>();
expectedTaggedFiles.put(FileUtils.fileKeyToCommit(topicsDir, dirPrefix, TOPIC_PARTITION, 0, extension, ZERO_PAD_FMT),
Arrays.asList(new Tag("startOffset", "0"), new Tag("endOffset", "2"), new Tag("recordCount", "3"), new Tag("key1", "value1"), new Tag("key2", "value2")));
expectedTaggedFiles.put(FileUtils.fileKeyToCommit(topicsDir, dirPrefix, TOPIC_PARTITION, 3, extension, ZERO_PAD_FMT),
Arrays.asList(new Tag("startOffset", "3"), new Tag("endOffset", "5"), new Tag("recordCount", "3"), new Tag("key1", "value1"), new Tag("key2", "value2")));
expectedTaggedFiles.put(FileUtils.fileKeyToCommit(topicsDir, dirPrefix, TOPIC_PARTITION, 6, extension, ZERO_PAD_FMT),
Arrays.asList(new Tag("startOffset", "6"), new Tag("endOffset", "8"), new Tag("recordCount", "3")));
verifyTags(expectedTaggedFiles);
}
@Test
public void testIgnoreS3ObjectTaggingSdkClientException() throws Exception {
// Tagging error occurred (SdkClientException) but getting ignored.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void testBasicRecordsWrittenAvro() throws Throwable {
props.put(FORMAT_CLASS_CONFIG, AvroFormat.class.getName());
final String topicNameWithExt = "other." + AVRO_EXTENSION + ".topic." + AVRO_EXTENSION;

// Add an extra topic with this extension inside of the name
// Add an extra topic with this extension inside the name
// Use a TreeSet for test determinism
Set<String> topicNames = new TreeSet<>(Collections.singletonList(topicName));

Expand Down