Skip to content

Commit

Permalink
🐛 Source Github: bugfix WorkflowJobs stream (airbytehq#17287)
Browse files Browse the repository at this point in the history
Signed-off-by: Sergey Chvalyuk <[email protected]>
  • Loading branch information
grubberr authored and jhammarstedt committed Oct 31, 2022
1 parent 3e2e459 commit 167f09c
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 391 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@
- name: GitHub
sourceDefinitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
dockerRepository: airbyte/source-github
dockerImageTag: 0.3.2
dockerImageTag: 0.3.3
documentationUrl: https://docs.airbyte.io/integrations/sources/github
icon: github.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3503,7 +3503,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-github:0.3.2"
- dockerImage: "airbyte/source-github:0.3.3"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/github"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-github/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.3.2
LABEL io.airbyte.version=0.3.3
LABEL io.airbyte.name=airbyte/source-github
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
team_members_stream,
Users(**organization_args),
Workflows(**repository_args_with_start_date),
WorkflowRuns(**repository_args_with_start_date),
workflow_runs_stream,
WorkflowJobs(parent=workflow_runs_stream, **repository_args_with_start_date),
TeamMemberships(parent=team_members_stream, **repository_args),
]
Original file line number Diff line number Diff line change
Expand Up @@ -1353,18 +1353,26 @@ def __init__(self, parent: WorkflowRuns, **kwargs):
def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f"repos/{stream_slice['repository']}/actions/runs/{stream_slice['run_id']}/jobs"

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
parent_stream_slices = self.parent.stream_slices(
sync_mode=SyncMode.full_refresh, cursor_field=cursor_field, stream_state=stream_state
)
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
parent_stream_state = None
if stream_state is not None:
parent_stream_state = {repository: {self.parent.cursor_field: v[self.cursor_field]} for repository, v in stream_state.items()}
parent_stream_slices = self.parent.stream_slices(sync_mode=sync_mode, cursor_field=cursor_field, stream_state=parent_stream_state)
for stream_slice in parent_stream_slices:
parent_records = self.parent.read_records(
sync_mode=SyncMode.full_refresh, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=parent_stream_state
)
for record in parent_records:
yield {"repository": record["repository"]["full_name"], "run_id": record["id"]}
stream_slice["run_id"] = record["id"]
yield from super().read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
)

def parse_response(
self,
Expand All @@ -1373,14 +1381,16 @@ def parse_response(
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Iterable[Mapping]:
for record in response.json().get("jobs"): # GitHub puts records in an array.
yield self.transform(record=record, stream_slice=stream_slice)
for record in response.json()["jobs"]:
if record.get(self.cursor_field):
yield self.transform(record=record, stream_slice=stream_slice)

def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str, Any]) -> MutableMapping[str, Any]:
record = super().transform(record=record, stream_slice=stream_slice)
record["run_id"] = stream_slice["run_id"]
record["repository"] = stream_slice["repository"]
return record
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
params["filter"] = "all"
return params


class TeamMembers(GithubStream):
Expand Down
Loading

0 comments on commit 167f09c

Please sign in to comment.