Skip to content

Commit

Permalink
Merge pull request #149 from rudderlabs/fix/etl-102-mixpanel-export-d…
Browse files Browse the repository at this point in the history
…ata-duplication

fix: adds client side filtering to export
  • Loading branch information
a-rampalli authored Sep 1, 2023
2 parents fd483ab + 2268ea8 commit f5448be
Showing 1 changed file with 4 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma
}
}
"""

stream_state = kwargs["stream_state"]
cursor_date = (stream_state or {}).get(self.cursor_field)
# We prefer response.iter_lines() to response.text.split_lines() as the later can missparse text properties embeding linebreaks
for record in self.iter_dicts(response.iter_lines(decode_unicode=True)):
# transform record into flat dict structure
Expand All @@ -157,6 +158,8 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma
# convert timestamp to datetime string
if "time" in item: # time is not always present in the response
item["time"] = pendulum.from_timestamp(int(item["time"]), tz="UTC").to_iso8601_string()
if cursor_date and item["time"] < cursor_date:
continue

yield item

Expand Down Expand Up @@ -186,15 +189,6 @@ def get_json_schema(self) -> Mapping[str, Any]:

return schema

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
mapping = super().request_params(stream_state, stream_slice, next_page_token)
if stream_state and "date" in stream_state:
timestamp = int(pendulum.parse(stream_state["date"]).timestamp())
mapping["where"] = f'properties["$time"]>=datetime({timestamp})'
return mapping

def request_kwargs(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
Expand Down

0 comments on commit f5448be

Please sign in to comment.