Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset on connectors do not reset the SCD tables #5417

Closed
danieldiamond opened this issue Aug 16, 2021 · 30 comments · Fixed by #18015
Closed

Reset on connectors do not reset the SCD tables #5417

danieldiamond opened this issue Aug 16, 2021 · 30 comments · Fixed by #18015
Assignees
Labels
cdc normalization priority/high High priority team/destinations Destinations team's backlog type/bug Something isn't working zendesk

Comments

@danieldiamond
Copy link
Contributor

danieldiamond commented Aug 16, 2021

Enviroment

  • Is this your first time deploying Airbyte: no
  • OS Version / Instance: Linux EC2 m5.xlarge
  • Deployment: Docker
  • Airbyte Version: 0.29.7-alpha
  • Source name: MySQL 0.4.2
  • Destination: Snowflake 0.3.12
  • Severity: High
  • Step where error happened: Sync Job

Current Behavior

Reset job in CDC does not reset SCD table e.g. mytable is empty, mytable_scd is not empty, _airbyte_raw_mytable is empty.

Expected Behavior

All tables associated with connector should be reset

Logs

If applicable, please upload the logs from the failing operation.
For sync jobs, you can download the full logs from the UI by going to the sync attempt page and
clicking the download logs button at the top right of the logs display window.

LOG

2021-08-16 00:27:32 INFO () WorkerRun(call):62 - Executing worker wrapper. Airbyte version: 0.29.7-alpha
2021-08-16 00:27:32 INFO () TemporalAttemptExecution(get):110 - Executing worker wrapper. Airbyte version: 0.29.7-alpha
2021-08-16 00:27:32 INFO () DefaultReplicationWorker(run):102 - start sync worker. job id: 6265 attempt id: 0
2021-08-16 00:27:32 INFO () DefaultReplicationWorker(run):111 - configured sync modes: {projects.MY_TABLE=full_refresh - overwrite}
2021-08-16 00:27:32 INFO () DefaultAirbyteDestination(start):78 - Running destination...
2021-08-16 00:27:32 INFO () LineGobbler(voidCall):85 - Checking if airbyte/destination-snowflake:0.3.12 exists...
2021-08-16 00:27:32 INFO () LineGobbler(voidCall):85 - airbyte/destination-snowflake:0.3.12 was found locally.
2021-08-16 00:27:32 INFO () DockerProcessFactory(create):146 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/6265/0 --network host --log-driver none airbyte/destination-snowflake:0.3.12 write --config destination_config.json --catalog destination_catalog.json
2021-08-16 00:27:32 INFO () DefaultReplicationWorker(run):139 - Waiting for source thread to join.
2021-08-16 00:27:32 INFO () DefaultReplicationWorker(lambda$getDestinationOutputRunnable$3):246 - Destination output thread started.
2021-08-16 00:27:32 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):210 - Replication thread started.
2021-08-16 00:27:32 INFO () DefaultReplicationWorker(run):141 - Source thread complete.
2021-08-16 00:27:32 INFO () DefaultReplicationWorker(run):142 - Waiting for destination thread to join.
2021-08-16 00:27:37 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:37 �[32mINFO�[m i.a.i.d.s.SnowflakeDestination(main):81 - {} - starting destination: class io.airbyte.integrations.destination.snowflake.SnowflakeDestination
2021-08-16 00:27:37 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:37 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):96 - {} - Running integration: io.airbyte.integrations.destination.snowflake.SnowflakeDestination
2021-08-16 00:27:37 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:37 �[32mINFO�[m i.a.i.b.IntegrationCliParser(parseOptions):135 - {} - integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json}
2021-08-16 00:27:37 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:37 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):100 - {} - Command: WRITE
2021-08-16 00:27:37 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:37 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):101 - {} - Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'}
2021-08-16 00:27:37 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:37 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-08-16 00:27:37 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:37 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-08-16 00:27:37 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:37 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-08-16 00:27:37 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:37 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword multiline - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-08-16 00:27:38 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:38 �[32mINFO�[m i.a.i.d.j.c.SwitchingDestination(getConsumer):83 - {} - Using destination type: INSERT
2021-08-16 00:27:38 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:38 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):116 - {} - Write config: WriteConfig{streamName=MY_TABLE, namespace=MY_SCHEMA, outputSchemaName=MY_SCHEMA, tmpTableName=_airbyte_tmp_xin_MY_TABLE, outputTableName=_airbyte_raw_MY_TABLE, syncMode=overwrite}
2021-08-16 00:27:38 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:38 �[32mINFO�[m i.a.i.d.b.BufferedStreamConsumer(startTracked):142 - {} - class io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer started.
2021-08-16 00:27:38 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:38 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):139 - {} - Preparing tmp tables in destination started for 1 streams
2021-08-16 00:27:38 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:38 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):143 - {} - Preparing tmp table in destination started for stream MY_TABLE. schema: MY_SCHEMA, tmp table name: _airbyte_tmp_xin_MY_TABLE
2021-08-16 00:27:40 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:40 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):149 - {} - Preparing tables in destination completed.
2021-08-16 00:27:40 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:40 �[32mINFO�[m i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):80 - {} - Airbyte message consumer: succeeded.
2021-08-16 00:27:40 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:40 �[32mINFO�[m i.a.i.d.b.BufferedStreamConsumer(close):212 - {} - executing on success close procedure.
2021-08-16 00:27:40 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:40 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):176 - {} - Finalizing tables in destination started for 1 streams
2021-08-16 00:27:40 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:40 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):181 - {} - Finalizing stream MY_TABLE. schema MY_SCHEMA, tmp table _airbyte_tmp_xin_MY_TABLE, final table _airbyte_raw_MY_TABLE
2021-08-16 00:27:41 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:41 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):194 - {} - Executing finalization of tables.
2021-08-16 00:27:43 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:43 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):196 - {} - Finalizing tables in destination completed.
2021-08-16 00:27:43 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:43 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):199 - {} - Cleaning tmp tables in destination started for 1 streams
2021-08-16 00:27:43 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:43 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):203 - {} - Cleaning tmp table in destination started for stream MY_TABLE. schema MY_SCHEMA, tmp table name: _airbyte_tmp_xin_MY_TABLE
2021-08-16 00:27:43 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:43 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):208 - {} - Cleaning tmp tables in destination completed.
2021-08-16 00:27:43 INFO () DefaultReplicationWorker(lambda$getDestinationOutputRunnable$3):251 - state in DefaultReplicationWorker from Destination: io.airbyte.protocol.models.AirbyteMessage@1e166db8[type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=io.airbyte.protocol.models.AirbyteStateMessage@4823999a[data={},additionalProperties={}],additionalProperties={}]
2021-08-16 00:27:43 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:43 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):153 - {} - Completed integration: io.airbyte.integrations.destination.snowflake.SnowflakeDestination
2021-08-16 00:27:43 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-16 00:27:43 �[32mINFO�[m i.a.i.d.s.SnowflakeDestination(main):83 - {} - completed destination: class io.airbyte.integrations.destination.snowflake.SnowflakeDestination
2021-08-16 00:27:46 INFO () DefaultReplicationWorker(run):144 - Destination thread complete.
2021-08-16 00:27:46 INFO () DefaultReplicationWorker(run):172 - sync summary: io.airbyte.config.ReplicationAttemptSummary@1fd48e8e[status=completed,recordsSynced=0,bytesSynced=0,startTime=1629073652511,endTime=1629073666238]
2021-08-16 00:27:46 INFO () DefaultReplicationWorker(run):179 - Source output at least one state message
2021-08-16 00:27:46 INFO () DefaultReplicationWorker(run):185 - State capture: Updated state to: Optional[io.airbyte.config.State@23319705[state={}]]
2021-08-16 00:27:46 INFO () TemporalAttemptExecution(get):133 - Stopping cancellation check scheduling...
2021-08-16 00:27:46 INFO () RetryingTemporalAttemptExecution(get):118 - Last output present: true. Should attempt again: false
...
2021-08-16 00:27:46 INFO () TemporalAttemptExecution(get):110 - Executing worker wrapper. Airbyte version: 0.29.7-alpha
2021-08-16 00:27:46 INFO () DefaultNormalizationWorker(run):61 - Running normalization.
2021-08-16 00:27:46 INFO () LineGobbler(voidCall):85 - Checking if airbyte/normalization:0.1.39 exists...
2021-08-16 00:27:46 INFO () LineGobbler(voidCall):85 - airbyte/normalization:0.1.39 was found locally.
2021-08-16 00:27:46 INFO () DockerProcessFactory(create):146 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/6265/0/normalize --network host --log-driver none airbyte/normalization:0.1.39 run --integration-type snowflake --config destination_config.json --catalog destination_catalog.json
2021-08-16 00:27:48 INFO () LineGobbler(voidCall):85 - Running: transform-config --config destination_config.json --integration-type snowflake --out /data/6265/0/normalize
2021-08-16 00:27:48 INFO () LineGobbler(voidCall):85 - Namespace(config='destination_config.json', integration_type=<DestinationType.snowflake: 'snowflake'>, out='/data/6265/0/normalize')
2021-08-16 00:27:48 INFO () LineGobbler(voidCall):85 - transform_snowflake
2021-08-16 00:27:48 INFO () LineGobbler(voidCall):85 - Running: transform-catalog --integration-type snowflake --profile-config-dir /data/6265/0/normalize --catalog destination_catalog.json --out /data/6265/0/normalize/models/generated/ --json-column _airbyte_data
2021-08-16 00:27:49 INFO () LineGobbler(voidCall):85 - Processing destination_catalog.json...
2021-08-16 00:27:49 INFO () LineGobbler(voidCall):85 -   Generating airbyte_ctes/MY_SCHEMA/MY_TABLE_AB1.sql from MY_TABLE
2021-08-16 00:27:49 INFO () LineGobbler(voidCall):85 -   Generating airbyte_ctes/MY_SCHEMA/MY_TABLE_AB2.sql from MY_TABLE
2021-08-16 00:27:49 INFO () LineGobbler(voidCall):85 -   Generating airbyte_ctes/MY_SCHEMA/MY_TABLE_AB3.sql from MY_TABLE
2021-08-16 00:27:49 INFO () LineGobbler(voidCall):85 -   Generating airbyte_tables/MY_SCHEMA/MY_TABLE.sql from MY_TABLE
2021-08-16 00:27:49 INFO () LineGobbler(voidCall):85 - Running with dbt=0.19.0
2021-08-16 00:27:51 INFO () LineGobbler(voidCall):85 - [�[33mWARNING�[0m]: Configuration paths exist in your dbt_project.yml file which do not apply to any resources.
2021-08-16 00:27:51 INFO () LineGobbler(voidCall):85 - There are 1 unused configuration paths:
2021-08-16 00:27:51 INFO () LineGobbler(voidCall):85 - - models.airbyte_utils.generated.airbyte_views
2021-08-16 00:27:51 INFO () LineGobbler(voidCall):85 - 
2021-08-16 00:27:51 INFO () LineGobbler(voidCall):85 - Found 4 models, 0 tests, 0 snapshots, 0 analyses, 392 macros, 0 operations, 0 seed files, 1 source, 0 exposures
2021-08-16 00:27:51 INFO () LineGobbler(voidCall):85 - 
2021-08-16 00:27:55 INFO () LineGobbler(voidCall):85 - 00:27:55 | Concurrency: 32 threads (target='prod')
2021-08-16 00:27:55 INFO () LineGobbler(voidCall):85 - 00:27:55 | 
2021-08-16 00:27:55 INFO () LineGobbler(voidCall):85 - 00:27:55 | 1 of 1 START table model MY_SCHEMA.MY_TABLE................................................... [RUN]
2021-08-16 00:27:57 INFO () LineGobbler(voidCall):85 - 00:27:57 | 1 of 1 OK created table model MY_SCHEMA.MY_TABLE.............................................. [�[32mSUCCESS 1�[0m in 2.00s]
2021-08-16 00:27:59 INFO () LineGobbler(voidCall):85 - 00:27:58 | 
2021-08-16 00:27:59 INFO () LineGobbler(voidCall):85 - 00:27:58 | Finished running 1 table model in 7.39s.
2021-08-16 00:27:59 INFO () LineGobbler(voidCall):85 - 
2021-08-16 00:27:59 INFO () LineGobbler(voidCall):85 - �[32mCompleted successfully�[0m
2021-08-16 00:27:59 INFO () LineGobbler(voidCall):85 - 
2021-08-16 00:27:59 INFO () LineGobbler(voidCall):85 - Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
2021-08-16 00:28:01 INFO () DefaultNormalizationWorker(run):77 - Normalization executed in 0.
2021-08-16 00:28:01 INFO () TemporalAttemptExecution(get):133 - Stopping cancellation check scheduling...

Steps to Reproduce

  1. Create Regular or CDC connection with incremental dedupe
  2. Sync
  3. Reset

Are you willing to submit a PR?

Unfortunately not at this time

┆Issue is synchronized with this Asana task by Unito

@danieldiamond danieldiamond added the type/bug Something isn't working label Aug 16, 2021
@sherifnada sherifnada added area/platform issues related to the platform normalization labels Aug 16, 2021
@sherifnada sherifnada added this to the Core 2021-08-18 milestone Aug 16, 2021
@danieldiamond danieldiamond changed the title Reset on CDC connectors do not reset the SCD tables Reset on connectors do not reset the SCD tables Aug 18, 2021
@danieldiamond danieldiamond changed the title Reset on connectors do not reset the SCD tables Reset on CDC connectors do not reset the SCD tables Aug 18, 2021
@danieldiamond
Copy link
Contributor Author

Thinking this might occur in regular connectors as well but not sure.

@danieldiamond
Copy link
Contributor Author

Addditionally, seeing all the _airbyte_tmp_... tables still persisting after sync jobs

@davinchia
Copy link
Contributor

@danieldiamond this is happening because of how we are resetting jobs; the mechanism was never updated to look for the SCD tables. What is the impact of this? This is pretty low on our end - definitely open to moving it up if this is causing you pain.

@davinchia davinchia added the priority/low Low priority label Aug 23, 2021
@danieldiamond
Copy link
Contributor Author

This is not critical at all. Also am happy to help try solve this one if you can point me to the right file(s)

@danieldiamond
Copy link
Contributor Author

danieldiamond commented Sep 24, 2021

@davinchia actually running into issues which would be a massive concern to users and makes me think this should be refactored to critical. When resetting a connector, the SCD tables dont get deleted, which means for incremental dedupe syncs, the normalization process is incorrect as its using existing SCD data with new. This results in NULL values in your final tables.

Note: If you have an extremely large connector and need to go through the pain of resetting it e.g. when adding new tables or modifying underlying schema. To go through a reset and full sync, only to find that the SCD now contains previous and current data (duplicated), this would be particularly unfortunate.

@sherifnada sherifnada added cdc priority/high High priority and removed priority/low Low priority labels Sep 24, 2021
@danieldiamond
Copy link
Contributor Author

@sherifnada @davinchia if either of you point me to the related codebase (mechanism where this is missed) i'm happy to take a look and try to submit a PR. imagine this isn't a very heavy task

@sherifnada
Copy link
Contributor

@ChristopheDuong is this something you can point to?

@davinchia
Copy link
Contributor

I believe reset uses the same code path regardless of CDC, although I believe SCD is a result of the incremental dedup function. Are you using that Daniel?

This would be here by setting an empty source.

We could always check for these tables and try to overwrite them but that isn't too clear. Since these are generated by DBT, maybe we add a DBT operation in reset to do so? WDYT Chris?

@danieldiamond
Copy link
Contributor Author

yeah im using incremental dedup.

i also wonder if there are any implications with the normalization revamp going on now

@sherifnada sherifnada added the area/connectors Connector related issues label Nov 19, 2021
@ChristopheDuong ChristopheDuong changed the title Reset on CDC connectors do not reset the SCD tables Reset on connectors do not reset the SCD tables Nov 19, 2021
@ChristopheDuong
Copy link
Contributor

ChristopheDuong commented Nov 19, 2021

Yes, additional logic should be added in normalization when reset are triggered

The implementation strategy as described in the comments from:

configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);

  1. Set all streams to full refresh - overwrite.
  2. Create a job where the source emits no records.
  3. Run a sync from the empty source to the destination. This will overwrite all data for each stream in the destination.
  4. The Empty source emits no state message, so state will start at null (i.e. start from the beginning on the next sync).

Do we want normalization to always delete _scd tables if exists when ran in a sync mode that is not APPEND_DEDUP?

@misteryeo
Copy link
Contributor

@oustynova This looks like a pretty pressing issue that was reported again recently. Can we have someone from the team investigate this and determine the root cause? Thanks!

@danieldiamond
Copy link
Contributor Author

FYI @ChristopheDuong and team, this is a serious issue for database replication syncs using CDC. when airbyte loses the positiion of the binlog (when airbyte dies for a while for example), it needs to start again and sync the entire table. however in the SCD table it incorrectly labels an active row. this is a bug. and means the DB replication is selecting the wrong record as the active record.

i'm 95% sure that any airbyte users using CDC on db replication are experiencing this issue and are not aware of it

@toandm
Copy link
Contributor

toandm commented Jul 22, 2022

This issue was this long ago? I recently discovered this for Snowflake destination, for the latest version of 0.39.37-alpha, normalization docker: airbyte/normalization-snowflake:0.2.8.
Because the SCD table is not wiped, final denormalized nested tables still contains data from previous syncs. I think this is a really serious issue, causing massive data inaccuracy and we don't even know about it. From now on I gotta be careful and drop my entire schema before resetting data

@danieldiamond
Copy link
Contributor Author

yeah TBH I'm not sure why this isn't resolved yet and can only assume it's because not enough airbyte users actually use incremental + dedupe.
This is a massive flaw in the normalisation process.
cc'ing @misteryeo (forgot to mention this manual painful task as well particular when a DB source schema is updated and we reset the connector)

@toandm
Copy link
Contributor

toandm commented Jul 22, 2022

Okay, I would like to clarify a bit.
I think people are not discovering this is because the final tables are still correct (no data from previous syncs, though they are in _SCD tables)
The tables that are incorrect are the unnested tables from JSON columns. Here's an example:
My source: Facebook Marketing
My data: AdsInsights table with the field "actions" included. Use Incremental + Dedup method with cursor_field = "date_start", primary_key = ["account_id","ad_id","date_start"]
Final table after normalization looks like this:
image

"actions" column is a JSON column, so normalization went ahead and created an unnested table for it. In this table, I discover that data from previous syncs are there.

So the real consequence happens when data has nested JSON data. Tables at top level are still reset correctly.

@sherifnada
Copy link
Contributor

cc @grishick in case the team wasn't aware of this issue

@marcosmarxm
Copy link
Member

Zendesk ticket #1527 has been linked to this issue.

@marcosmarxm
Copy link
Member

Comment made from Zendesk by Marcos Marx on 2022-08-01 at 17:37:

Isaac, the first issue is #5417. I raised the issue to cnnector team.

@grishick
Copy link
Contributor

@grishick
Copy link
Contributor

Notes from grooming:
we can add callbacks to models in DBT. In this case we could add a callback that drops the table if it is empty.

@grishick
Copy link
Contributor

@grishick
Copy link
Contributor

@rodireich might you be able to look into this in the current sprint?

@rodireich rodireich self-assigned this Sep 14, 2022
@rodireich
Copy link
Contributor

definitely. Assigned it to myself to take a look

@grishick grishick added the team/destinations Destinations team's backlog label Sep 27, 2022
@danieldiamond
Copy link
Contributor Author

@rodireich how did you go with this?

@danieldiamond
Copy link
Contributor Author

This issue was this long ago? I recently discovered this for Snowflake destination, for the latest version of 0.39.37-alpha, normalization docker: airbyte/normalization-snowflake:0.2.8. Because the SCD table is not wiped, final denormalized nested tables still contains data from previous syncs. I think this is a really serious issue, causing massive data inaccuracy and we don't even know about it. From now on I gotta be careful and drop my entire schema before resetting data

i honestly cant understand why more users aren't sharing this exact concern. either it's a deterrent from anyone wishing to use incremental+dedupe or users just are not aware

@rodireich
Copy link
Contributor

Something that came out of data analytics office hours :
Another byproduct of normalization is in case of nested complex objects (json) in some connectors such as GitHub or intercom we create during normalization an auxiliary table exploding these objects to columns.
We should make sure these tables are also taken care of during a reset of connection

(@ChristopheDuong, @alex-gron)

@danieldiamond
Copy link
Contributor Author

This issue is over a year old. Can I ask what complexity is involved with working on a solution here? Is it not simply a matter of automatically deleting the SCD table in addition to the _AIRBYTE_RAW and final tables?

@sherifnada @marcosmarxm @ChristopheDuong can I also confirm that the impact of this issue is clearly understood? Reviewing the comments from users above - IMO this should be top priority given it's quite literally bad data & incorrect final tables - a clear malfunctioning of any incremental + dedupe DB connectors where reset is involved (particularly as they're starting to be classified as GA).

@davinchia
Copy link
Contributor

davinchia commented Oct 13, 2022

@danieldiamond yes the impact of this is understood. I believe this is in the current sprint for the Destination's team.

The complexity here is Airbyte doesn't actually have a direct way to reset tables. We currently do so via simulating an empty source, and relying on downstream behaviour to reset the tables i.e. empty source -> empty raw tables -> empty final tables. Unfortunately, not as simple as adding the scd tables to the wipe list.

We discussed internally and the long term solution is to add a new 'reset' operation to Destinations (see https://github.com/airbytehq/airbyte-internal-issues/issues/999).

The short term solution is to implement this as part of normalisation. However it's not terribly straight-forward since the append+dedup implementations aren't that simple to begin with.

All in all, it's on team's backlog and work should start in the next few weeks. @grishick is the manager and can provide more context!

@danieldiamond
Copy link
Contributor Author

@davinchia thanks so much for the reply and for that context. Really appreciated.

@grishick grishick self-assigned this Oct 17, 2022
@grishick grishick linked a pull request Oct 17, 2022 that will close this issue
11 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cdc normalization priority/high High priority team/destinations Destinations team's backlog type/bug Something isn't working zendesk
Projects
None yet
Development

Successfully merging a pull request may close this issue.