Skip to content

Commit

Permalink
#21076 source gitlab: fix missing data issue
Browse files Browse the repository at this point in the history
  • Loading branch information
davydov-d committed Jan 23, 2023
1 parent e35dc23 commit eec8de9
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 25 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-gitlab/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ COPY main.py ./

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.version=1.0.1
LABEL io.airbyte.name=airbyte/source-gitlab
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ acceptance_tests:
spec:
tests:
- spec_path: "source_gitlab/spec.json"
backward_compatibility_tests_config:
disable_for_version: "0.1.12"
connection:
tests:
- config_path: "secrets/config.json"
Expand All @@ -17,11 +15,7 @@ acceptance_tests:
discovery:
tests:
- config_path: "secrets/config.json"
backward_compatibility_tests_config:
disable_for_version: "0.1.12"
- config_path: "secrets/config_oauth.json"
backward_compatibility_tests_config:
disable_for_version: "0.1.12"
basic_read:
tests:
- config_path: "secrets/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


import datetime
from abc import ABC
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple

import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.streams.http import HttpStream


Expand All @@ -18,12 +19,22 @@ class GitlabStream(HttpStream, ABC):
stream_base_params = {}
flatten_id_keys = []
flatten_list_keys = []
page = 1
per_page = 50

def __init__(self, api_url: str, **kwargs):
super().__init__(**kwargs)
self.api_url = api_url
self.page = 1

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[StreamData]:
self.page = 1
yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state)

def request_params(
self,
Expand Down Expand Up @@ -107,7 +118,9 @@ def path_template(self) -> str:
template.append("repository")
return "/".join(template + [self.name])

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, any]]]:
for slice in self.parent_stream.stream_slices(sync_mode=SyncMode.full_refresh):
for record in self.parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice):
yield {path_key: record[path_key] for path_key in self.path_list}
Expand All @@ -125,7 +138,8 @@ def transform(self, record: Dict[str, Any], stream_slice: Mapping[str, Any] = No
class IncrementalGitlabChildStream(GitlabChildStream):
state_checkpoint_interval = 100
cursor_field = "updated_at"
filter_field = "updated_after"
lower_bound_filter = "updated_after"
upper_bound_filter = "updated_before"

def __init__(self, start_date, **kwargs):
super().__init__(**kwargs)
Expand All @@ -146,17 +160,40 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
current_stream_state[str(project_id)] = {self.cursor_field: str(max_value)}
return current_stream_state

def request_params(self, stream_state=None, stream_slice: Mapping[str, Any] = None, **kwargs):
@staticmethod
def _chunk_date_range(start_point: datetime.datetime) -> Iterable[Tuple[str, str]]:
end_point = datetime.datetime.now(datetime.timezone.utc)
if start_point > end_point:
return []
current_start, current_end = start_point, start_point
while current_end < end_point:
current_end = current_start + datetime.timedelta(days=180)
current_end = min(current_end, end_point)
yield str(current_start), str(current_end)
current_start = current_end + datetime.timedelta(seconds=1)

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
stream_state = stream_state or {}
params = super().request_params(stream_state, stream_slice, **kwargs)
super_slices = super().stream_slices(sync_mode, cursor_field, stream_state)
for super_slice in super_slices:
start_point = self._start_date
state_project_value = stream_state.get(str(super_slice["id"]))
if state_project_value:
state_value = state_project_value.get(self.cursor_field)
if state_value:
start_point = max(start_point, state_value)
for start_dt, end_dt in self._chunk_date_range(pendulum.parse(start_point)):
stream_slice = {key: value for key, value in super_slice.items()}
stream_slice[self.lower_bound_filter] = start_dt
stream_slice[self.upper_bound_filter] = end_dt
yield stream_slice

start_point = self._start_date
state_project_value = stream_state.get(str(stream_slice["id"]))
if state_project_value:
state_value = state_project_value.get(self.cursor_field)
if state_value:
start_point = max(start_point, state_value)
params[self.filter_field] = start_point
def request_params(self, stream_state=None, stream_slice: Mapping[str, Any] = None, **kwargs):
params = super().request_params(stream_state, stream_slice, **kwargs)
params[self.lower_bound_filter] = stream_slice[self.lower_bound_filter]
params[self.upper_bound_filter] = stream_slice[self.upper_bound_filter]
return params


Expand Down Expand Up @@ -265,7 +302,8 @@ class Branches(GitlabChildStream):

class Commits(IncrementalGitlabChildStream):
cursor_field = "created_at"
filter_field = "since"
lower_bound_filter = "since"
upper_bound_filter = "until"
flatten_parent_id = True
stream_base_params = {"with_stats": True}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import datetime

import pytest
from airbyte_cdk.sources.streams.http.auth import NoAuth
from source_gitlab.streams import Commits, Jobs, MergeRequestCommits, MergeRequests, Pipelines, Projects, Releases, Tags

auth_params = {"authenticator": NoAuth(), "api_url": "gitlab.com"}


start_date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=14)
projects = Projects(project_ids=["p_1"], **auth_params)
pipelines = Pipelines(parent_stream=projects, start_date="2021-01-01T00:00:00Z", **auth_params)
merge_requests = MergeRequests(parent_stream=projects, start_date="2021-01-01T00:00:00Z", **auth_params)
pipelines = Pipelines(parent_stream=projects, start_date=str(start_date), **auth_params)
merge_requests = MergeRequests(parent_stream=projects, start_date=str(start_date), **auth_params)
tags = Tags(parent_stream=projects, repository_part=True, **auth_params)
releases = Releases(parent_stream=projects, **auth_params)
jobs = Jobs(parent_stream=pipelines, **auth_params)
merge_request_commits = MergeRequestCommits(parent_stream=merge_requests, **auth_params)
commits = Commits(parent_stream=projects, repository_part=True, start_date="2021-01-01T00:00:00Z", **auth_params)
commits = Commits(parent_stream=projects, repository_part=True, start_date=str(start_date), **auth_params)


def test_should_retry(mocker, requests_mock):
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/gitlab.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ Gitlab has the [rate limits](https://docs.gitlab.com/ee/user/gitlab_com/index.ht

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------|
| 1.0.1 | 2022-01-23 | [00000](https://github.com/airbytehq/airbyte/pull/00000) | Fix missing data issue |
| 1.0.0 | 2022-12-05 | [7506](https://github.com/airbytehq/airbyte/pull/7506) | Add `OAuth2.0` authentication option |
| 0.1.12 | 2022-12-15 | [20542](https://github.com/airbytehq/airbyte/pull/20542) | Revert HttpAvailability changes, run on cdk 0.15.0 |
| 0.1.11 | 2022-12-14 | [20479](https://github.com/airbytehq/airbyte/pull/20479) | Use HttpAvailabilityStrategy + add unit tests |
Expand Down

0 comments on commit eec8de9

Please sign in to comment.