Skip to content

Commit

Permalink
fixed bug which crashes okta log incremental sync (airbytehq#7584)
Browse files Browse the repository at this point in the history
* fixed bug which crashes okta log incremental sync

* bump connector version

* revert to pendulum
  • Loading branch information
colleen-love authored and schlattk committed Jan 4, 2022
1 parent c1f781b commit 297929a
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "1d4fdb25-64fc-4569-92da-fcdca79a8372",
"name": "Okta",
"dockerRepository": "airbyte/source-okta",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/okta"
}
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@
- name: Okta
sourceDefinitionId: 1d4fdb25-64fc-4569-92da-fcdca79a8372
dockerRepository: airbyte/source-okta
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/sources/okta
sourceType: api
- name: OneSignal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@
"destination_sync_mode": "overwrite",
"cursor_field": ["lastUpdated"],
"primary_key": [["id"]]
},
{
"stream": {
"name": "logs",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["published"],
"primary_key": [["uuid"]]
}
]
}
26 changes: 24 additions & 2 deletions airbyte-integrations/connectors/source-okta/source_okta/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from urllib import parse

Expand Down Expand Up @@ -90,9 +91,14 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
)
}

def request_params(self, stream_state=None, **kwargs):
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
stream_state = stream_state or {}
params = super().request_params(stream_state=stream_state, **kwargs)
params = super().request_params(stream_state, stream_slice, next_page_token)
latest_entry = stream_state.get(self.cursor_field)
if latest_entry:
params["filter"] = f'{self.cursor_field} gt "{latest_entry}"'
Expand All @@ -114,6 +120,22 @@ class Logs(IncrementalOktaStream):
def path(self, **kwargs) -> str:
return "logs"

def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
stream_state = stream_state or {}
params = {
"limit": self.page_size,
**(next_page_token or {}),
}
latest_entry = stream_state.get(self.cursor_field)
if latest_entry:
params["since"] = latest_entry
return params


class Users(IncrementalOktaStream):
cursor_field = "lastUpdated"
Expand Down

0 comments on commit 297929a

Please sign in to comment.