Skip to content

Commit

Permalink
fix: Resolve tap-dbt incremental replication (#392)
Browse files Browse the repository at this point in the history
Incremental replication from tap-dbt is failing with the following error
on line 120 in the streams.py

```bash
   record_last_received_datetime = datetime.datetime.fromisoformat(
                    self.replication_key,
                )
   ValueError: Invalid isoformat string: 'finished_at'
```

I believe when the re-run occurs, it is reading the current state and
obtaining the `record_last_received_datetime` from the state dictionary.
I believe the code is currently point to the the `self.replication_key`
rather than the `self.replication_key_value` which contains the
timestamp.

To prove the scenario I wrote a simple python program to parse the JSON
from the current state which was saved from the initial ingestion.

```python
import json
import pendulum
from typing import cast
import datetime
my_state = '{"completed": {"singer_state": {"bookmarks": {"connections": {"partitions": [{"context": {"account_id": "5"}}]}, "environments": {"partitions": [{"context": {"account_id": "5"}}]}, "jobs": {"partitions": [{"context": {"account_id": "5"}}]}, "projects": {"partitions": [{"context": {"account_id": "5"}}]}, "repositories": {"partitions": [{"context": {"account_id": "5"}}]}, "runs": {"partitions": [{"context": {"account_id": "5"}, "replication_key": "finished_at", "replication_key_value": "2024-09-09 11:01:05.436229+00:00"}]}, "users": {"partitions": [{"context": {"account_id": "5"}}]}, "accounts": {}}}}, "partial": {}}'

my_state_dict = json.loads(my_state)

replication_key_value = my_state_dict['completed']['singer_state']['bookmarks']['runs']['partitions'][0]['replication_key_value']
replication_key       = my_state_dict['completed']['singer_state']['bookmarks']['runs']['partitions'][0]['replication_key']
print(f'Last replication_key_value as a string = {replication_key_value}')
print(f'Last replication_key       as a string = {replication_key}')

# Use pendulum for replication_key_value timestamp conversion
pendulum_last_received_datetime: pendulum.DateTime = cast(pendulum.DateTime, pendulum.parse(replication_key_value))

print(f'Pendulum = {pendulum_last_received_datetime}')

# Use datetime for replication_key_value timestamp conversion
new_last_received_datetime = datetime.datetime.fromisoformat(replication_key_value)

print(f'Datetime = {new_last_received_datetime}')

# Use Monkey Patch backport for replication_key_value timestamp conversion
if 1==1:
    from backports.datetime_fromisoformat import MonkeyPatch

    MonkeyPatch.patch_fromisoformat()
    
monkeypatch_last_received_datetime = datetime.datetime.fromisoformat(replication_key_value)

print(f'Monkey Patch Datetime = {monkeypatch_last_received_datetime}')

# Use replication_key for timestamp conversion - this should break
incorrect_datetime_key = datetime.datetime.fromisoformat(replication_key)
```

The result emulate the issue.

```bash
(venv)  test_tap_gitlab]$ python test_gitlab_replication.py 
Last replication_key_value as a string = 2024-09-09 11:01:05.436229+00:00
Last replication_key       as a string = finished_at
Pendulum = 2024-09-09 11:01:05.436229+00:00
Datetime = 2024-09-09 11:01:05.436229+00:00
Monkey Patch Datetime = 2024-09-09 11:01:05.436229+00:00
Traceback (most recent call last):
  File "/home/me/test_tap_gitlab/test_gitlab_replication.py", line 35, in <module>
    incorrect_datetime_key = datetime.datetime.fromisoformat(replication_key)
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: Invalid isoformat string: 'finished_at'
```

---------

Co-authored-by: Edgar Ramírez Mondragón <[email protected]>
  • Loading branch information
s7clarke10 and edgarrmondragon authored Sep 26, 2024
1 parent 0d38df6 commit d7bce20
Showing 1 changed file with 1 addition and 1 deletion.
2 changes: 1 addition & 1 deletion tap_dbt/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]:
and record[self.replication_key] is not None
):
record_last_received_datetime = datetime.datetime.fromisoformat(
self.replication_key,
record[self.replication_key],
)

if record_last_received_datetime < starting_replication_key_value:
Expand Down

0 comments on commit d7bce20

Please sign in to comment.