Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ignore_stream_slicer_parameters_on_paginated_requests flag #35462

Merged
merged 7 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1988,6 +1988,10 @@ definitions:
anyOf:
- "$ref": "#/definitions/DefaultPaginator"
- "$ref": "#/definitions/NoPagination"
ignore_stream_slicer_parameters_on_paginated_requests:
description: If true, the stream slicer parameters will be ignored when paginating requests. This is useful when the stream slicer parameters are not needed for pagination.
girarda marked this conversation as resolved.
Show resolved Hide resolved
type: boolean
default: false
partition_router:
title: Partition Router
description: PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,10 @@ class SimpleRetriever(BaseModel):
None,
description="Paginator component that describes how to navigate through the API's pages.",
)
ignore_stream_slicer_parameters_on_paginated_requests: Optional[bool] = Field(
False,
description='If true, the stream slicer parameters will be ignored when paginating requests. This is useful when the stream slicer parameters are not needed for pagination.',
)
partition_router: Optional[
Union[
CustomPartitionRouter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -958,12 +958,17 @@ def create_simple_retriever(
cursor_used_for_stop_condition = cursor if stop_condition_on_cursor else None
paginator = (
self._create_component_from_model(
model=model.paginator, config=config, url_base=url_base, cursor_used_for_stop_condition=cursor_used_for_stop_condition
model=model.paginator,
config=config,
url_base=url_base,
cursor_used_for_stop_condition=cursor_used_for_stop_condition,
)
if model.paginator
else NoPagination(parameters={})
)

ignore_stream_slicer_parameters_on_paginated_requests = model.ignore_stream_slicer_parameters_on_paginated_requests or False

if self._limit_slices_fetched or self._emit_connector_builder_messages:
return SimpleRetrieverTestReadDecorator(
name=name,
Expand All @@ -975,6 +980,7 @@ def create_simple_retriever(
cursor=cursor,
config=config,
maximum_number_of_slices=self._limit_slices_fetched or 5,
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
parameters=model.parameters or {},
)
return SimpleRetriever(
Expand All @@ -986,6 +992,7 @@ def create_simple_retriever(
stream_slicer=stream_slicer,
cursor=cursor,
config=config,
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
parameters=model.parameters or {},
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class SimpleRetriever(Retriever):
paginator: Optional[Paginator] = None
stream_slicer: StreamSlicer = SinglePartitionRouter(parameters={})
cursor: Optional[Cursor] = None
ignore_stream_slicer_parameters_on_paginated_requests: bool = False

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._paginator = self.paginator or NoPagination(parameters=parameters)
Expand Down Expand Up @@ -105,12 +106,12 @@ def _get_request_options(
Returned merged mapping otherwise
"""
# FIXME we should eventually remove the usage of stream_state as part of the interpolation
return combine_mappings(
[
paginator_method(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
stream_slicer_method(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
]
)
mappings = [
paginator_method(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
]
if not next_page_token or not self.ignore_stream_slicer_parameters_on_paginated_requests:
mappings.append(stream_slicer_method(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token))
return combine_mappings(mappings)

def _request_headers(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,46 @@ def test_get_request_headers(test_name, paginator_mapping, expected_mapping):
pass


@pytest.mark.parametrize(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my education: assuming this thing makes the test below run each time for each of the parameter tuples in the list, right? Neat.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the parametrized tests, where each tuple element is another case for the test.

"test_name, paginator_mapping, ignore_stream_slicer_parameters_on_paginated_requests, next_page_token, expected_mapping",
[
("test_do_not_ignore_stream_slicer_params_if_ignore_is_true_but_no_next_page_token", {"key_from_pagination": "1000"}, True, None, {"key_from_pagination": "1000"}),
("test_do_not_ignore_stream_slicer_params_if_ignore_is_false_and_no_next_page_token", {"key_from_pagination": "1000"}, False, None, {"key_from_pagination": "1000", "key_from_slicer": "value"}),
("test_ignore_stream_slicer_params_on_paginated_request", {"key_from_pagination": "1000"}, True, {"page": 2}, {"key_from_pagination": "1000"}),
("test_do_not_ignore_stream_slicer_params_on_paginated_request", {"key_from_pagination": "1000"}, False, {"page": 2}, {"key_from_pagination": "1000", "key_from_slicer": "value"}),
],
)
def test_ignore_stream_slicer_parameters_on_paginated_requests(test_name, paginator_mapping, ignore_stream_slicer_parameters_on_paginated_requests, next_page_token, expected_mapping):
# This test is separate from the other request options because request headers must be strings
paginator = MagicMock()
paginator.get_request_headers.return_value = paginator_mapping
requester = MagicMock(use_cache=False)

stream_slicer = MagicMock()
stream_slicer.get_request_headers.return_value = {"key_from_slicer": "value"}

record_selector = MagicMock()
retriever = SimpleRetriever(
name="stream_name",
primary_key=primary_key,
requester=requester,
record_selector=record_selector,
stream_slicer=stream_slicer,
paginator=paginator,
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
parameters={},
config={},
)

request_option_type_to_method = {
RequestOptionType.header: retriever._request_headers,
}

for _, method in request_option_type_to_method.items():
actual_mapping = method(None, None, next_page_token={"next_page_token": "1000"})
assert expected_mapping == actual_mapping


@pytest.mark.parametrize(
"test_name, slicer_body_data, paginator_body_data, expected_body_data",
[
Expand Down
Loading