-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-850959: Fix a wrong result issue that offsets are skipped when schematization is enabled #658
Conversation
Codecov Report
@@ Coverage Diff @@
## master #658 +/- ##
==========================================
- Coverage 87.88% 87.85% -0.03%
==========================================
Files 50 50
Lines 4144 4143 -1
Branches 449 451 +2
==========================================
- Hits 3642 3640 -2
- Misses 332 333 +1
Partials 170 170
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
@@ -345,7 +359,8 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) { | |||
*/ | |||
private boolean shouldIgnoreAddingRecordToBuffer( | |||
SinkRecord kafkaSinkRecord, long currentProcessedOffset) { | |||
if (!isOffsetResetInKafka) { | |||
if (!isOffsetResetInKafka | |||
|| currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused why is this needed. Mind adding a comment here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want to skip any rows when the offset token for a channel is NULL, think about the case when the data is expired so a row with higher offset is sent by Kafka upon restarting
test/test_suites.py
Outdated
from test_suit.test_schema_evolution_avro_sr import TestSchemaEvolutionAvroSR | ||
from test_suit.test_schema_evolution_drop_table import TestSchemaEvolutionDropTable | ||
from test_suit.test_schema_evolution_json import TestSchemaEvolutionJson | ||
# res tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: what is res?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added that for 'resilience tests', we can add the full word there. Can we move the comment back above the rest of the resilience tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all the formatting is done automatically so I have no control, I simply remove it
@@ -46,7 +46,7 @@ def send(self): | |||
self.driver.sendBytesData(self.topic, value, key) | |||
|
|||
# Sleep for some time and then verify the rows are ingested | |||
sleep(60) | |||
sleep(120) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious why it's taking more time. I see you have tried multiple times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^ as well as why we're removing the 30 sec sleep below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good question, this solution is not perfect, if we don't wait for more than 60 seconds than the in-memory offset is not updated and we will restart from the beginning, I put 120 seconds to make sure it's not flaky
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm! Thanks a lot!
Some minor comments to clarify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, just want more clarification on the different offsets
this.channel.getFullyQualifiedName()); | ||
return latestCommittedOffsetInSnowflake; | ||
} | ||
LOGGER.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the change here? the only diff i see is the warn log is no longer there, why not edit the content of the log instead of removing it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I don't think warn makes sense because there are valid cases that a offset token could be NULL
- We want to unify the format for both cases so it's easier to search and debug
@@ -266,7 +266,9 @@ public long getOffset(TopicPartition topicPartition) { | |||
String partitionChannelKey = | |||
partitionChannelKey(topicPartition.topic(), topicPartition.partition()); | |||
if (partitionsToChannel.containsKey(partitionChannelKey)) { | |||
return partitionsToChannel.get(partitionChannelKey).getOffsetSafeToCommitToKafka(); | |||
long offset = partitionsToChannel.get(partitionChannelKey).getOffsetSafeToCommitToKafka(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we do partitionsToChannel.get() once and save the value locally for perf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will be optimized by the complier so it's usually a matter of style?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh good to know!
@@ -94,6 +94,10 @@ public class TopicPartitionChannel { | |||
private final AtomicLong processedOffset = | |||
new AtomicLong(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE); | |||
|
|||
// The in-memory consumer offset managed by the connector, we need this to tell Kafka which | |||
// offset to resend when the channel offset token is NULL | |||
private long latestConsumerOffset = NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the above offsetPersistedInSnowflake
and processedOffset
are AtomicLongs, should this also be an AtomicLong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe everything should be long given that there is no concurrent logic?
@@ -94,6 +94,10 @@ public class TopicPartitionChannel { | |||
private final AtomicLong processedOffset = | |||
new AtomicLong(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE); | |||
|
|||
// The in-memory consumer offset managed by the connector, we need this to tell Kafka which |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we now have three offsets, can we unify the names or clarify the different uses in the comments? From what I can tell, latestConsumerOffset is either the first KC offset (initial processedOffset) or the snowflake (offsetPersistedInSnowflake). If so, can we comment something like:
offsetPersistedInSnowflake: This offset represents the data persisted in Snowflake. More specifically it is the Snowflake offset determined from the insertRows API call. It is set after calling the fetchOffsetToken API for this channel
processedOffset: This offset represents the data buffered in KC. More specifically it is the KC offset to ensure exactly once functionality. On creation it is set to the latest committed token in Snowflake (see offsetPersistedInSnowflake) and updated on each new row from KC.
latestConsumerOffset: This offset is a fallback to represent the data buffered in KC. It is similar to processedOffset, however it is only used to resend the offset when the channel offset token is NULL. It is updated to the first offset sent by KC (see processedOffset) or the offset persisted in snowflake (see offsetPersistedInSnowflake)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the suggestion, updated
@@ -46,7 +46,7 @@ def send(self): | |||
self.driver.sendBytesData(self.topic, value, key) | |||
|
|||
# Sleep for some time and then verify the rows are ingested | |||
sleep(60) | |||
sleep(120) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^ as well as why we're removing the 30 sec sleep below
test/test_suites.py
Outdated
from test_suit.test_schema_evolution_avro_sr import TestSchemaEvolutionAvroSR | ||
from test_suit.test_schema_evolution_drop_table import TestSchemaEvolutionDropTable | ||
from test_suit.test_schema_evolution_json import TestSchemaEvolutionJson | ||
# res tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added that for 'resilience tests', we can add the full word there. Can we move the comment back above the rest of the resilience tests?
@@ -112,91 +104,114 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS | |||
test_instance=TestAvrosrAvrosr(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=False | |||
)), | |||
("TestNativeStringAvrosr", EndToEndTestSuite( | |||
test_instance=TestNativeStringAvrosr(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=False | |||
test_instance=TestNativeStringAvrosr(driver, nameSalt), clean=True, run_in_confluent=True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why are all these 'run_in_apache' lines pushed down a line? i don't see any other differences, is there a formatting change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all the formatting is done automatically so I have no control
…chematization is enabled (snowflakedb#658) Looks like we have a gap in KC that may skip ingesting some offsets, consider this case where you have two topics with different schemas trying to ingest into the same table, internally KC will create two channels (channel A and channel B) with offset_token=NULL, then both channels start to buffer data and flush files, but channel A fails committing the first batch because the file schema doesn't match the latest table schema due to schema evolution, then channel A will be invalidated and reopened but we won't reset the consumer offset because the offset_token for channel A is still NULL, we say that we will rely on Kafka to send us the correct data when the offset_token is NULL, so in this case Kafka will continue send us the next batch and we will accept it, this means that the first batch for channel A will be skipped forever. We need to rethink about what we need to do when the offset_token for a channel is NULL and I don't think we can purely rely on Kafka to resend us the correct offset. The fix is to manage the Kafka consumer offset in the connector as well and use that to reset Kafka when the offset token for a channel is NULL instead of relying on Kafka to send us the correct offset
…chematization is enabled (snowflakedb#658) Looks like we have a gap in KC that may skip ingesting some offsets, consider this case where you have two topics with different schemas trying to ingest into the same table, internally KC will create two channels (channel A and channel B) with offset_token=NULL, then both channels start to buffer data and flush files, but channel A fails committing the first batch because the file schema doesn't match the latest table schema due to schema evolution, then channel A will be invalidated and reopened but we won't reset the consumer offset because the offset_token for channel A is still NULL, we say that we will rely on Kafka to send us the correct data when the offset_token is NULL, so in this case Kafka will continue send us the next batch and we will accept it, this means that the first batch for channel A will be skipped forever. We need to rethink about what we need to do when the offset_token for a channel is NULL and I don't think we can purely rely on Kafka to resend us the correct offset. The fix is to manage the Kafka consumer offset in the connector as well and use that to reset Kafka when the offset token for a channel is NULL instead of relying on Kafka to send us the correct offset
…chematization is enabled (snowflakedb#658) Looks like we have a gap in KC that may skip ingesting some offsets, consider this case where you have two topics with different schemas trying to ingest into the same table, internally KC will create two channels (channel A and channel B) with offset_token=NULL, then both channels start to buffer data and flush files, but channel A fails committing the first batch because the file schema doesn't match the latest table schema due to schema evolution, then channel A will be invalidated and reopened but we won't reset the consumer offset because the offset_token for channel A is still NULL, we say that we will rely on Kafka to send us the correct data when the offset_token is NULL, so in this case Kafka will continue send us the next batch and we will accept it, this means that the first batch for channel A will be skipped forever. We need to rethink about what we need to do when the offset_token for a channel is NULL and I don't think we can purely rely on Kafka to resend us the correct offset. The fix is to manage the Kafka consumer offset in the connector as well and use that to reset Kafka when the offset token for a channel is NULL instead of relying on Kafka to send us the correct offset
…chematization is enabled (snowflakedb#658) Looks like we have a gap in KC that may skip ingesting some offsets, consider this case where you have two topics with different schemas trying to ingest into the same table, internally KC will create two channels (channel A and channel B) with offset_token=NULL, then both channels start to buffer data and flush files, but channel A fails committing the first batch because the file schema doesn't match the latest table schema due to schema evolution, then channel A will be invalidated and reopened but we won't reset the consumer offset because the offset_token for channel A is still NULL, we say that we will rely on Kafka to send us the correct data when the offset_token is NULL, so in this case Kafka will continue send us the next batch and we will accept it, this means that the first batch for channel A will be skipped forever. We need to rethink about what we need to do when the offset_token for a channel is NULL and I don't think we can purely rely on Kafka to resend us the correct offset. The fix is to manage the Kafka consumer offset in the connector as well and use that to reset Kafka when the offset token for a channel is NULL instead of relying on Kafka to send us the correct offset
Looks like we have a gap in KC that may skip ingesting some offsets, consider this case where you have two topics with different schemas trying to ingest into the same table, internally KC will create two channels (channel A and channel B) with offset_token=NULL, then both channels start to buffer data and flush files, but channel A fails committing the first batch because the file schema doesn't match the latest table schema due to schema evolution, then channel A will be invalidated and reopened but we won't reset the consumer offset because the offset_token for channel A is still NULL, we say that we will rely on Kafka to send us the correct data when the offset_token is NULL, so in this case Kafka will continue send us the next batch and we will accept it, this means that the first batch for channel A will be skipped forever. We need to rethink about what we need to do when the offset_token for a channel is NULL and I don't think we can purely rely on Kafka to resend us the correct offset.
The fix is to manage the Kafka consumer offset in the connector as well and use that to reset Kafka when the offset token for a channel is NULL instead of relying on Kafka to send us the correct offset