From 06e88849f0d8a05d651a3b5a47390a0babb5e888 Mon Sep 17 00:00:00 2001 From: Denys Davydov Date: Tue, 24 Jan 2023 09:13:05 +0200 Subject: [PATCH] Source Gitlab: fix missing data issue (#21713) * #21076 source gitlab: fix missing data issue * #21076 source gitlab: upd changelog * auto-bump connector version Co-authored-by: Octavia Squidington III --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-gitlab/Dockerfile | 2 +- .../source-gitlab/acceptance-test-config.yml | 6 -- .../source-gitlab/source_gitlab/streams.py | 68 +++++++++++++++---- .../source-gitlab/unit_tests/test_streams.py | 9 ++- docs/integrations/sources/gitlab.md | 1 + 7 files changed, 63 insertions(+), 27 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 44dd3a5f2957..867c293e56fe 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -605,7 +605,7 @@ - name: Gitlab sourceDefinitionId: 5e6175e5-68e1-4c17-bff9-56103bbb0d80 dockerRepository: airbyte/source-gitlab - dockerImageTag: 1.0.0 + dockerImageTag: 1.0.1 documentationUrl: https://docs.airbyte.com/integrations/sources/gitlab icon: gitlab.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 63f14ef901a5..4a7a7cb364af 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -4732,7 +4732,7 @@ path_in_connector_config: - "credentials" - "client_secret" -- dockerImage: "airbyte/source-gitlab:1.0.0" +- dockerImage: "airbyte/source-gitlab:1.0.1" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/gitlab" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-gitlab/Dockerfile b/airbyte-integrations/connectors/source-gitlab/Dockerfile index 5466ec492148..f3d921dc247b 100644 --- a/airbyte-integrations/connectors/source-gitlab/Dockerfile +++ b/airbyte-integrations/connectors/source-gitlab/Dockerfile @@ -13,5 +13,5 @@ COPY main.py ./ ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=1.0.0 +LABEL io.airbyte.version=1.0.1 LABEL io.airbyte.name=airbyte/source-gitlab diff --git a/airbyte-integrations/connectors/source-gitlab/acceptance-test-config.yml b/airbyte-integrations/connectors/source-gitlab/acceptance-test-config.yml index 2fcb99e43b6d..fe27c4586a77 100644 --- a/airbyte-integrations/connectors/source-gitlab/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-gitlab/acceptance-test-config.yml @@ -4,8 +4,6 @@ acceptance_tests: spec: tests: - spec_path: "source_gitlab/spec.json" - backward_compatibility_tests_config: - disable_for_version: "0.1.12" connection: tests: - config_path: "secrets/config.json" @@ -17,11 +15,7 @@ acceptance_tests: discovery: tests: - config_path: "secrets/config.json" - backward_compatibility_tests_config: - disable_for_version: "0.1.12" - config_path: "secrets/config_oauth.json" - backward_compatibility_tests_config: - disable_for_version: "0.1.12" basic_read: tests: - config_path: "secrets/config.json" diff --git a/airbyte-integrations/connectors/source-gitlab/source_gitlab/streams.py b/airbyte-integrations/connectors/source-gitlab/source_gitlab/streams.py index 5d550b7caeb2..ff7e6ab3b1dc 100644 --- a/airbyte-integrations/connectors/source-gitlab/source_gitlab/streams.py +++ b/airbyte-integrations/connectors/source-gitlab/source_gitlab/streams.py @@ -2,13 +2,14 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # - +import datetime from abc import ABC -from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional +from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple import pendulum import requests from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.streams.http import HttpStream @@ -18,12 +19,22 @@ class GitlabStream(HttpStream, ABC): stream_base_params = {} flatten_id_keys = [] flatten_list_keys = [] - page = 1 per_page = 50 def __init__(self, api_url: str, **kwargs): super().__init__(**kwargs) self.api_url = api_url + self.page = 1 + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[StreamData]: + self.page = 1 + yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state) def request_params( self, @@ -107,7 +118,9 @@ def path_template(self) -> str: template.append("repository") return "/".join(template + [self.name]) - def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: + def stream_slices( + self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + ) -> Iterable[Optional[Mapping[str, any]]]: for slice in self.parent_stream.stream_slices(sync_mode=SyncMode.full_refresh): for record in self.parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice): yield {path_key: record[path_key] for path_key in self.path_list} @@ -125,7 +138,8 @@ def transform(self, record: Dict[str, Any], stream_slice: Mapping[str, Any] = No class IncrementalGitlabChildStream(GitlabChildStream): state_checkpoint_interval = 100 cursor_field = "updated_at" - filter_field = "updated_after" + lower_bound_filter = "updated_after" + upper_bound_filter = "updated_before" def __init__(self, start_date, **kwargs): super().__init__(**kwargs) @@ -146,17 +160,40 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late current_stream_state[str(project_id)] = {self.cursor_field: str(max_value)} return current_stream_state - def request_params(self, stream_state=None, stream_slice: Mapping[str, Any] = None, **kwargs): + @staticmethod + def _chunk_date_range(start_point: datetime.datetime) -> Iterable[Tuple[str, str]]: + end_point = datetime.datetime.now(datetime.timezone.utc) + if start_point > end_point: + return [] + current_start, current_end = start_point, start_point + while current_end < end_point: + current_end = current_start + datetime.timedelta(days=180) + current_end = min(current_end, end_point) + yield str(current_start), str(current_end) + current_start = current_end + datetime.timedelta(seconds=1) + + def stream_slices( + self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + ) -> Iterable[Optional[Mapping[str, Any]]]: stream_state = stream_state or {} - params = super().request_params(stream_state, stream_slice, **kwargs) + super_slices = super().stream_slices(sync_mode, cursor_field, stream_state) + for super_slice in super_slices: + start_point = self._start_date + state_project_value = stream_state.get(str(super_slice["id"])) + if state_project_value: + state_value = state_project_value.get(self.cursor_field) + if state_value: + start_point = max(start_point, state_value) + for start_dt, end_dt in self._chunk_date_range(pendulum.parse(start_point)): + stream_slice = {key: value for key, value in super_slice.items()} + stream_slice[self.lower_bound_filter] = start_dt + stream_slice[self.upper_bound_filter] = end_dt + yield stream_slice - start_point = self._start_date - state_project_value = stream_state.get(str(stream_slice["id"])) - if state_project_value: - state_value = state_project_value.get(self.cursor_field) - if state_value: - start_point = max(start_point, state_value) - params[self.filter_field] = start_point + def request_params(self, stream_state=None, stream_slice: Mapping[str, Any] = None, **kwargs): + params = super().request_params(stream_state, stream_slice, **kwargs) + params[self.lower_bound_filter] = stream_slice[self.lower_bound_filter] + params[self.upper_bound_filter] = stream_slice[self.upper_bound_filter] return params @@ -265,7 +302,8 @@ class Branches(GitlabChildStream): class Commits(IncrementalGitlabChildStream): cursor_field = "created_at" - filter_field = "since" + lower_bound_filter = "since" + upper_bound_filter = "until" flatten_parent_id = True stream_base_params = {"with_stats": True} diff --git a/airbyte-integrations/connectors/source-gitlab/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-gitlab/unit_tests/test_streams.py index 7f2b795ce973..6e0c63098c29 100644 --- a/airbyte-integrations/connectors/source-gitlab/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-gitlab/unit_tests/test_streams.py @@ -2,6 +2,8 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import datetime + import pytest from airbyte_cdk.sources.streams.http.auth import NoAuth from source_gitlab.streams import Commits, Jobs, MergeRequestCommits, MergeRequests, Pipelines, Projects, Releases, Tags @@ -9,14 +11,15 @@ auth_params = {"authenticator": NoAuth(), "api_url": "gitlab.com"} +start_date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=14) projects = Projects(project_ids=["p_1"], **auth_params) -pipelines = Pipelines(parent_stream=projects, start_date="2021-01-01T00:00:00Z", **auth_params) -merge_requests = MergeRequests(parent_stream=projects, start_date="2021-01-01T00:00:00Z", **auth_params) +pipelines = Pipelines(parent_stream=projects, start_date=str(start_date), **auth_params) +merge_requests = MergeRequests(parent_stream=projects, start_date=str(start_date), **auth_params) tags = Tags(parent_stream=projects, repository_part=True, **auth_params) releases = Releases(parent_stream=projects, **auth_params) jobs = Jobs(parent_stream=pipelines, **auth_params) merge_request_commits = MergeRequestCommits(parent_stream=merge_requests, **auth_params) -commits = Commits(parent_stream=projects, repository_part=True, start_date="2021-01-01T00:00:00Z", **auth_params) +commits = Commits(parent_stream=projects, repository_part=True, start_date=str(start_date), **auth_params) def test_should_retry(mocker, requests_mock): diff --git a/docs/integrations/sources/gitlab.md b/docs/integrations/sources/gitlab.md index 0fa5220f5121..fabfc75c1f27 100644 --- a/docs/integrations/sources/gitlab.md +++ b/docs/integrations/sources/gitlab.md @@ -105,6 +105,7 @@ Gitlab has the [rate limits](https://docs.gitlab.com/ee/user/gitlab_com/index.ht | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------| +| 1.0.1 | 2022-01-23 | [21713](https://github.com/airbytehq/airbyte/pull/21713) | Fix missing data issue | | 1.0.0 | 2022-12-05 | [7506](https://github.com/airbytehq/airbyte/pull/7506) | Add `OAuth2.0` authentication option | | 0.1.12 | 2022-12-15 | [20542](https://github.com/airbytehq/airbyte/pull/20542) | Revert HttpAvailability changes, run on cdk 0.15.0 | | 0.1.11 | 2022-12-14 | [20479](https://github.com/airbytehq/airbyte/pull/20479) | Use HttpAvailabilityStrategy + add unit tests |