Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PROD-39429 Add parameter for connector name in channel name: #732

Merged
merged 6 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,20 @@ public class SnowflakeSinkConnectorConfig {
"Whether to optimize the streaming client to reduce cost. Note that this may affect"
+ " throughput or latency and can only be set if Streaming Snowpipe is enabled";

public static final String SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT =
"snowflake.enable.new.channel.name.format";
public static final boolean SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT = false;

public static final String SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DISPLAY =
"Enable Connector Name in Snowpipe Streaming Channel Name";

public static final String SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DOC =
"Whether to use connector name in streaming channels. If it is set to false, we will not use"
sfc-gh-japatel marked this conversation as resolved.
Show resolved Hide resolved
+ " connector name in channel name(Which is new version of Channel Name). Note: Please"
+ " use this config cautiously and it is not advised to use this if you are coming from"
+ " old Snowflake Kafka Connector Version where Default Channel Name doesnt contain"
+ " Connector Name, contains Topic Name and Partition # only.";

// MDC logging header
public static final String ENABLE_MDC_LOGGING_CONFIG = "enable.mdc.logging";
public static final String ENABLE_MDC_LOGGING_DISPLAY = "Enable MDC logging";
Expand Down Expand Up @@ -591,7 +605,17 @@ static ConfigDef newConfigDef() {
CONNECTOR_CONFIG,
8,
ConfigDef.Width.NONE,
ENABLE_MDC_LOGGING_DISPLAY);
ENABLE_MDC_LOGGING_DISPLAY)
.define(
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
Type.BOOLEAN,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT,
Importance.LOW,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DOC,
CONNECTOR_CONFIG,
9,
ConfigDef.Width.NONE,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DISPLAY);
}

public static class TopicToTableValidator implements ConfigDef.Validator {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_DEFAULT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ROLE;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_COUNT_RECORDS_DEFAULT;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_FLUSH_TIME_DEFAULT_SEC;
Expand Down Expand Up @@ -92,7 +94,7 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {
private boolean enableSchematization;

/**
* Key is formulated in {@link #partitionChannelKey(String, String, int)} }
* Key is formulated in {@link #partitionChannelKey(String, String, int, boolean)}
*
* <p>value is the Streaming Ingest Channel implementation (Wrapped around TopicPartitionChannel)
*/
Expand All @@ -101,6 +103,12 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {
// Cache for schema evolution
private final Map<String, Boolean> tableName2SchemaEvolutionPermission;

/**
* This is the new format for channel Names. (This corresponds to the config {@link
* SnowflakeSinkConnectorConfig#SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT} )
*/
private final boolean shouldUseConnectorNameInChannelName;

public SnowflakeSinkServiceV2(
SnowflakeConnectionService conn, Map<String, String> connectorConfig) {
if (conn == null || conn.isClosed()) {
Expand Down Expand Up @@ -138,6 +146,11 @@ public SnowflakeSinkServiceV2(
? "default_connector"
: this.conn.getConnectorName();
this.metricsJmxReporter = new MetricsJmxReporter(new MetricRegistry(), connectorName);
this.shouldUseConnectorNameInChannelName =
sfc-gh-japatel marked this conversation as resolved.
Show resolved Hide resolved
Boolean.parseBoolean(
connectorConfig.getOrDefault(
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT)));
}

@VisibleForTesting
Expand Down Expand Up @@ -183,6 +196,11 @@ public SnowflakeSinkServiceV2(
populateSchemaEvolutionPermissions(tableName);
});
}
this.shouldUseConnectorNameInChannelName =
Boolean.parseBoolean(
connectorConfig.getOrDefault(
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT)));
}

/**
Expand Down Expand Up @@ -236,7 +254,10 @@ private void createStreamingChannelForTopicPartition(
boolean hasSchemaEvolutionPermission) {
final String partitionChannelKey =
partitionChannelKey(
conn.getConnectorName(), topicPartition.topic(), topicPartition.partition());
conn.getConnectorName(),
topicPartition.topic(),
topicPartition.partition(),
this.shouldUseConnectorNameInChannelName);
// Create new instance of TopicPartitionChannel which will always open the channel.
partitionsToChannel.put(
partitionChannelKey,
Expand Down Expand Up @@ -296,7 +317,11 @@ public void insert(Collection<SinkRecord> records) {
@Override
public void insert(SinkRecord record) {
String partitionChannelKey =
partitionChannelKey(this.conn.getConnectorName(), record.topic(), record.kafkaPartition());
partitionChannelKey(
this.conn.getConnectorName(),
record.topic(),
record.kafkaPartition(),
this.shouldUseConnectorNameInChannelName);
// init a new topic partition if it's not presented in cache or if channel is closed
if (!partitionsToChannel.containsKey(partitionChannelKey)
|| partitionsToChannel.get(partitionChannelKey).isChannelClosed()) {
Expand All @@ -317,7 +342,10 @@ public void insert(SinkRecord record) {
public long getOffset(TopicPartition topicPartition) {
String partitionChannelKey =
partitionChannelKey(
conn.getConnectorName(), topicPartition.topic(), topicPartition.partition());
conn.getConnectorName(),
topicPartition.topic(),
topicPartition.partition(),
this.shouldUseConnectorNameInChannelName);
if (partitionsToChannel.containsKey(partitionChannelKey)) {
long offset = partitionsToChannel.get(partitionChannelKey).getOffsetSafeToCommitToKafka();
partitionsToChannel.get(partitionChannelKey).setLatestConsumerOffset(offset);
Expand Down Expand Up @@ -372,7 +400,10 @@ public void close(Collection<TopicPartition> partitions) {
topicPartition -> {
final String partitionChannelKey =
partitionChannelKey(
conn.getConnectorName(), topicPartition.topic(), topicPartition.partition());
conn.getConnectorName(),
topicPartition.topic(),
topicPartition.partition(),
this.shouldUseConnectorNameInChannelName);
TopicPartitionChannel topicPartitionChannel =
partitionsToChannel.get(partitionChannelKey);
// Check for null since it's possible that the something goes wrong even before the
Expand Down Expand Up @@ -527,11 +558,19 @@ public Optional<MetricRegistry> getMetricRegistry(String partitionChannelKey) {
* or PROD)
* @param topic topic name
* @param partition partition number
* @param shouldUseConnectorNameInChannelName If true, use connectorName, else not. This is the
* new format for channel Name.
* @return combinartion of topic and partition
*/
@VisibleForTesting
public static String partitionChannelKey(String connectorName, String topic, int partition) {
return connectorName + "_" + topic + "_" + partition;
public static String partitionChannelKey(
String connectorName,
String topic,
int partition,
final boolean shouldUseConnectorNameInChannelName) {
return shouldUseConnectorNameInChannelName
? connectorName + "_" + topic + "_" + partition
: topic + "_" + partition;
}

/* Used for testing */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ErrorTolerance;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD;

import com.google.common.base.Strings;
Expand Down Expand Up @@ -221,6 +222,11 @@ public static ImmutableMap<String, String> validateStreamingSnowpipeConfig(
BOOLEAN_VALIDATOR.ensureValid(
ERRORS_LOG_ENABLE_CONFIG, inputConfig.get(ERRORS_LOG_ENABLE_CONFIG));
}
if (inputConfig.containsKey(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT)) {
BOOLEAN_VALIDATOR.ensureValid(
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
inputConfig.get(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT));
}

// Valid schematization for Snowpipe Streaming
invalidParams.putAll(validateSchematizationConfig(inputConfig));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,34 @@ public void testInvalidEnableOptimizeStreamingClientConfig() {
}
}

@Test
public void testEnableStreamingChannelFormatV2Config() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, "true");
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
Utils.validateConfig(config);
}

@Test
public void testInvalidEnableStreamingChannelFormatV2Config() {
try {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, "yes");
Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT);
}
}

@Test
public void testInvalidEmptyConfig() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_TOLERANCE_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT;
import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME;
import static com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.partitionChannelKey;

Expand All @@ -19,8 +20,10 @@
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.records.RecordService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
Expand All @@ -36,15 +39,29 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/** Unit test for testing Snowflake Sink Task Behavior with Snowpipe Streaming */
@RunWith(Parameterized.class)
public class SnowflakeSinkTaskStreamingTest {
private String topicName;
private static int partition = 0;
private TopicPartition topicPartition;

private final boolean shouldUseConnectorNameInChannelName;

@Parameterized.Parameters
public static List<Boolean> input() {
return Arrays.asList(Boolean.TRUE, Boolean.FALSE);
}

public SnowflakeSinkTaskStreamingTest(boolean shouldUseConnectorNameInChannelName) {
this.shouldUseConnectorNameInChannelName = shouldUseConnectorNameInChannelName;
}

@Before
public void setup() {
topicName = TestUtils.randomTableName();
Expand All @@ -59,6 +76,9 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception {
config.put(INGESTION_METHOD_OPT, IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(ERRORS_TOLERANCE_CONFIG, SnowflakeSinkConnectorConfig.ErrorTolerance.ALL.toString());
config.put(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "test_DLQ");
config.put(
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(this.shouldUseConnectorNameInChannelName));
InMemoryKafkaRecordErrorReporter errorReporter = new InMemoryKafkaRecordErrorReporter();
SnowflakeConnectionService mockConnectionService =
Mockito.mock(SnowflakeConnectionServiceV1.class);
Expand Down Expand Up @@ -88,7 +108,11 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception {
new TopicPartitionChannel(
mockStreamingClient,
topicPartition,
SnowflakeSinkServiceV2.partitionChannelKey(TEST_CONNECTOR_NAME, topicName, partition),
SnowflakeSinkServiceV2.partitionChannelKey(
TEST_CONNECTOR_NAME,
topicName,
partition,
this.shouldUseConnectorNameInChannelName),
topicName,
new StreamingBufferThreshold(10, 10_000, 1),
config,
Expand All @@ -98,7 +122,12 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception {

Map topicPartitionChannelMap =
Collections.singletonMap(
partitionChannelKey(TEST_CONNECTOR_NAME, topicName, partition), topicPartitionChannel);
partitionChannelKey(
TEST_CONNECTOR_NAME,
topicName,
partition,
this.shouldUseConnectorNameInChannelName),
topicPartitionChannel);

SnowflakeSinkServiceV2 mockSinkService =
new SnowflakeSinkServiceV2(
Expand Down
Loading
Loading