From 2268ea8deef5ba9332eeddcf68ff0b1e030a1d41 Mon Sep 17 00:00:00 2001 From: a-rampalli Date: Fri, 1 Sep 2023 15:43:51 +0530 Subject: [PATCH] fix: adds client side filtering to export --- .../source_mixpanel/streams/export.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py index 7baa2126ac7d..5ed243a84dbe 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py @@ -142,7 +142,8 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma } } """ - + stream_state = kwargs["stream_state"] + cursor_date = (stream_state or {}).get(self.cursor_field) # We prefer response.iter_lines() to response.text.split_lines() as the later can missparse text properties embeding linebreaks for record in self.iter_dicts(response.iter_lines(decode_unicode=True)): # transform record into flat dict structure @@ -157,6 +158,8 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma # convert timestamp to datetime string if "time" in item: # time is not always present in the response item["time"] = pendulum.from_timestamp(int(item["time"]), tz="UTC").to_iso8601_string() + if cursor_date and item["time"] < cursor_date: + continue yield item @@ -186,15 +189,6 @@ def get_json_schema(self) -> Mapping[str, Any]: return schema - def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None - ) -> MutableMapping[str, Any]: - mapping = super().request_params(stream_state, stream_slice, next_page_token) - if stream_state and "date" in stream_state: - timestamp = int(pendulum.parse(stream_state["date"]).timestamp()) - mapping["where"] = f'properties["$time"]>=datetime({timestamp})' - return mapping - def request_kwargs( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> Mapping[str, Any]: