-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PROD-39429 Add parameter for connector name in channel name: #732
PROD-39429 Add parameter for connector name in channel name: #732
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments, PTAL, and thanks for the quick change!
@@ -168,6 +168,18 @@ public class SnowflakeSinkConnectorConfig { | |||
"Whether to optimize the streaming client to reduce cost. Note that this may affect" | |||
+ " throughput or latency and can only be set if Streaming Snowpipe is enabled"; | |||
|
|||
public static final String ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME = | |||
"enable.connector_name.in.streaming_channel_name"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a Snowflake config, similar to most of the Snowflake specific configs, let's prefix it with snowflake
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I suggest we make it a more general config, something like snowflake.enable.new.channel.format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, like that suggestion, let me do that!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it too. We should have done the same for one client optimization but too late now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have used snowflake.enable.streaming.channel.format.v2
Please let me know if you all like this instead!
(Chances are we will not change it ever and hence I went with v2)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, this proves again that naming is the toughest task in SWE, so living upto that.
We have agreed on snowflake.enable.new.channel.name.format
Will make changes in description.
src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java
Show resolved
Hide resolved
120233d
to
1684a0d
Compare
@@ -168,6 +168,20 @@ public class SnowflakeSinkConnectorConfig { | |||
"Whether to optimize the streaming client to reduce cost. Note that this may affect" | |||
+ " throughput or latency and can only be set if Streaming Snowpipe is enabled"; | |||
|
|||
public static final String SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2 = | |||
"snowflake.enable.streaming.channel.format.v2"; | |||
public static final boolean SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2_DEFAULT = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we doing default false or default true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default false ensures 2.0.1 or below upgrade is safe.
default true ensures 2.1.0 upgrade is safe.
default false makes sense to me but we need to make sure the scenario for 2.1.0 that upgrade are well tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default should be false, and we will include the communication on how 2.1.0 customers will do the upgrade (they have the choice of making it true, or using false with their own risk (stop the ingestion and waiting for everything to be committed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default should be false.
src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall lgtm, lets wait on toby and xin's approvals too.
Have you gotten a repro of the upgrade issue and confirmed that this change fixes it for them?
@@ -527,11 +556,20 @@ public Optional<MetricRegistry> getMetricRegistry(String partitionChannelKey) { | |||
* or PROD) | |||
* @param topic topic name | |||
* @param partition partition number | |||
* @param shouldUseConnectorNameInChannelName If true, use connectorName, else not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add jira number or pr number in case we plan to revert this change in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We dont have plans to revert it for now!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, thanks!
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT; | ||
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD; | ||
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD; | ||
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: No star import
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack!
Repro is not done yet since it is non-trivial and also this fix doesn't confirm it will fix issue for existing 2.1.0 customers. Existing 2.1.0 customers will have to use the configuration and set it to true. This revert and parameter is to reduce the blast radius and going into an unwanted territory. (I will modify description) Repro'ing this along with blast radius analysis is going to be my next priority. |
if (shouldUseConnectorNameInChannelName) { | ||
return connectorName + "_" + topic + "_" + partition; | ||
} else { | ||
return topic + "_" + partition; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, this can be reduced to:
return shouldUseConnectorNameInChannelName ?
connectorName + "_" + topic + "_" + partition :
topic + "_" + partition;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM barring the outstanding comments others have made.
PROD-39429
Description
This Commit is adding a parameter to 3bf9106 and is also reverting the behavior.
i.e If this commit is pushed and is in release branch, the behavior of determining the
channelName
is reverted. (Contains only topic name and partition number)Users will have to enable the parameter
snowflake.enable.new.channel.name.format
(set to true) to have connector name in their channel name.Scenarios for users reading this commit:
Why
TLDR:
Commit 3bf9106 which was release in v2.1.0 is not compatible with old versions upgrading to 2.1.0 because it changes the channel name and we lose offset information in new channels.
Longer Description:
Plan
Tests
Added tests using ParameterizedTest feature.