Skip to content

Commit

Permalink
chore: save last next token for klaviyo stream (#136)
Browse files Browse the repository at this point in the history
* chore: save last next token for klaviyo stream

* chore: review changes
  • Loading branch information
am6010 authored Jul 18, 2023
1 parent 6b3e741 commit f0b5931
Showing 1 changed file with 38 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,21 @@ def request_params(self, stream_state=None, **kwargs):
stream_state = stream_state or {}
params = super().request_params(stream_state=stream_state, **kwargs)

params["sort"] = "asc"
if not params.get("since"): # skip state filter if already have one from pagination
state_ts = int(stream_state.get(self.cursor_field, 0))
last_next_token = stream_state.get("last_next_token", None)
if last_next_token is not None:
token_timestamp = int(str(last_next_token).split(":")[0])
# if the token stamp is equal to the state timestamp then we will use the next token as since value
# This will allow us to recover from extreme cases where there millions of events for the same timestamp.
if token_timestamp == state_ts:
params["since"] = last_next_token
return params

if state_ts > 0 and self.look_back_window_in_seconds:
state_ts -= self.look_back_window_in_seconds
params["since"] = max(state_ts, self._start_ts)
params["sort"] = "asc"

return params

Expand Down Expand Up @@ -406,6 +415,10 @@ class Events(IncrementalKlaviyoStreamV1):

cursor_field = "timestamp"

def __init__(self, **kwargs):
super().__init__(**kwargs)
self.last_next_token = None

@property
def look_back_window_in_seconds(self) -> Optional[int]:
return timedelta(minutes=30).seconds
Expand All @@ -427,6 +440,30 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp

yield process_record(record)

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
super_state = super().get_updated_state(current_stream_state, latest_record)
super_state["last_next_token"] = self.last_next_token
return super_state

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
This method should return a Mapping (e.g: dict) containing whatever information required to make paginated requests. This dict is passed
to most other methods in this class to help you form headers, request bodies, query params, etc..
:param response: the most recent response from the API
:return If there is another page in the result, a mapping (e.g: dict) containing information needed to query the next page in the response.
If there are no more pages in the result, return None.
"""
decoded_response = response.json()
if decoded_response.get("next"):
next_token = decoded_response["next"]
self.last_next_token = next_token
return {"since": next_token}

self.last_next_token = None
data = decoded_response.get("data", [{}]) or [{}]
self.logger.info("Last timestamp -> " + str(data[-1].get("timestamp", "No timestamp")))

return None

class Flows(ReverseIncrementalKlaviyoStreamV1):
cursor_field = "created"
Expand Down

0 comments on commit f0b5931

Please sign in to comment.