Skip to content

Commit

Permalink
Iterable adjustable stream slices (#8091)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmytro authored Nov 22, 2021
1 parent c0cb8f3 commit cf3be4a
Show file tree
Hide file tree
Showing 13 changed files with 542 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@
- name: Iterable
sourceDefinitionId: 2e875208-0c0b-4ee4-9e92-1cb3156ea799
dockerRepository: airbyte/source-iterable
dockerImageTag: 0.1.12
dockerImageTag: 0.1.13
documentationUrl: https://docs.airbyte.io/integrations/sources/iterable
sourceType: api
- name: Jira
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2759,7 +2759,7 @@
oauthFlowInitParameters: []
oauthFlowOutputParameters:
- - "access_token"
- dockerImage: "airbyte/source-iterable:0.1.12"
- dockerImage: "airbyte/source-iterable:0.1.13"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/iterable"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-iterable/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.12
LABEL io.airbyte.version=0.1.13
LABEL io.airbyte.name=airbyte/source-iterable
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ tests:
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/catalog.json"
empty_streams: ['email_send_skip', 'email_complaint']
empty_streams: ["email_send_skip", "email_complaint"]
timeout_seconds: 3600
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/catalog.json"
timeout_seconds: 3600
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import requests
from airbyte_cdk.models import SyncMode
from source_iterable.iterable_streams import IterableExportStream, IterableExportStreamRanged, IterableStream
from source_iterable.iterable_streams import IterableExportStreamAdjustableRange, IterableExportStreamRanged, IterableStream

EVENT_ROWS_LIMIT = 200
CAMPAIGNS_PER_REQUEST = 20
Expand Down Expand Up @@ -141,42 +141,42 @@ def path(self, **kwargs) -> str:
return "channels"


class EmailBounce(IterableExportStream):
class EmailBounce(IterableExportStreamAdjustableRange):
name = "email_bounce"
data_field = "emailBounce"


class EmailClick(IterableExportStreamRanged):
class EmailClick(IterableExportStreamAdjustableRange):
name = "email_click"
data_field = "emailClick"


class EmailComplaint(IterableExportStream):
class EmailComplaint(IterableExportStreamAdjustableRange):
name = "email_complaint"
data_field = "emailComplaint"


class EmailOpen(IterableExportStreamRanged):
class EmailOpen(IterableExportStreamAdjustableRange):
name = "email_open"
data_field = "emailOpen"


class EmailSend(IterableExportStreamRanged):
class EmailSend(IterableExportStreamAdjustableRange):
name = "email_send"
data_field = "emailSend"


class EmailSendSkip(IterableExportStreamRanged):
class EmailSendSkip(IterableExportStreamAdjustableRange):
name = "email_send_skip"
data_field = "emailSendSkip"


class EmailSubscribe(IterableExportStream):
class EmailSubscribe(IterableExportStreamAdjustableRange):
name = "email_subscribe"
data_field = "emailSubscribe"


class EmailUnsubscribe(IterableExportStream):
class EmailUnsubscribe(IterableExportStreamAdjustableRange):
name = "email_unsubscribe"
data_field = "emailUnsubscribe"

Expand Down Expand Up @@ -228,7 +228,7 @@ def path(self, **kwargs) -> str:
return "metadata"


class Templates(IterableExportStream):
class Templates(IterableExportStreamRanged):
data_field = "templates"
template_types = ["Base", "Blast", "Triggered", "Workflow"]
message_types = ["Email", "Push", "InApp", "SMS"]
Expand All @@ -251,6 +251,6 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
yield record


class Users(IterableExportStream):
class Users(IterableExportStreamRanged):
data_field = "user"
cursor_field = "profileUpdatedAt"
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,15 @@

import json
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream
from pendulum.datetime import DateTime


@dataclass
class StreamSlice:
start_date: DateTime
end_date: DateTime
from requests.exceptions import ChunkedEncodingError
from source_iterable.slice_generators import AdjustableSliceGenerator, RangeSliceGenerator, StreamSlice


class IterableStream(HttpStream, ABC):
Expand Down Expand Up @@ -61,6 +56,16 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp


class IterableExportStream(IterableStream, ABC):
"""
This stream utilize "export" Iterable api for getting large amount of data.
It can return data in form of new line separater strings each of each
representing json object.
Data could be windowed by date ranges by applying startDateTime and
endDateTime parameters. Single request could return large volumes of data
and request rate is limited by 4 requests per minute.
Details: https://api.iterable.com/api/docs#export_exportDataJson
"""

cursor_field = "createdAt"
primary_key = None
Expand Down Expand Up @@ -101,14 +106,25 @@ def _field_to_datetime(value: Union[int, str]) -> pendulum.datetime:
raise ValueError(f"Unsupported type of datetime field {type(value)}")
return value

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
def get_updated_state(
self,
current_stream_state: MutableMapping[str, Any],
latest_record: Mapping[str, Any],
) -> Mapping[str, Any]:
"""
Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object
and returning an updated state object.
"""
latest_benchmark = latest_record[self.cursor_field]
if current_stream_state.get(self.cursor_field):
return {self.cursor_field: str(max(latest_benchmark, self._field_to_datetime(current_stream_state[self.cursor_field])))}
return {
self.cursor_field: str(
max(
latest_benchmark,
self._field_to_datetime(current_stream_state[self.cursor_field]),
)
)
}
return {self.cursor_field: str(latest_benchmark)}

def request_params(
Expand Down Expand Up @@ -157,49 +173,96 @@ def get_start_date(self, stream_state: Mapping[str, Any]) -> DateTime:
return start_datetime

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Optional[StreamSlice]]:

start_datetime = self.get_start_date(stream_state)
return [StreamSlice(start_datetime, pendulum.now("UTC"))]


class IterableExportStreamRanged(IterableExportStream):
"""
This class use RangeSliceGenerator class to break single request into
ranges with same (or less for final range) number of days. By default it 90
days.
"""

RANGE_LENGTH_DAYS = 90
def stream_slices(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Optional[StreamSlice]]:

@staticmethod
def make_datetime_ranges(start: DateTime, end: DateTime, range_days: int) -> Iterable[Tuple[DateTime, DateTime]]:
"""
Generates list of ranges starting from start up to end date with duration of ranges_days.
Args:
start (DateTime): start of the range
end (DateTime): end of the range
range_days (int): Number in days to split subranges into.
start_datetime = self.get_start_date(stream_state)

Returns:
List[Tuple[DateTime, DateTime]]: list of tuples with ranges.
return RangeSliceGenerator(start_datetime)

Each tuple contains two daytime variables: first is period start
and second is period end.
"""
if start > end:
return []

next_start = start
period = pendulum.Duration(days=range_days)
while next_start < end:
next_end = min(next_start + period, end)
yield next_start, next_end
next_start = next_end
class IterableExportStreamAdjustableRange(IterableExportStream):
"""
For streams that could produce large amount of data in single request so we
cant just use IterableExportStreamRanged to split it in even ranges. If
request processing takes a lot of time API server could just close
connection and connector code would fail with ChunkedEncodingError.
To solve this problem we use AdjustableSliceGenerator that able to adjust
next slice range based on two factor:
1. Previous slice range / time to process ratio.
2. Had previous request failed with ChunkedEncodingError
In case of slice processing request failed with ChunkedEncodingError (which
means that API server closed connection cause of request takes to much
time) make CHUNKED_ENCODING_ERROR_RETRIES (3) retries each time reducing
slice length.
See AdjustableSliceGenerator description for more details on next slice length adjustment alghorithm.
"""

_adjustable_generator: AdjustableSliceGenerator = None
CHUNKED_ENCODING_ERROR_RETRIES = 3

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Optional[StreamSlice]]:

start_datetime = self.get_start_date(stream_state)
self._adjustable_generator = AdjustableSliceGenerator(start_datetime)
return self._adjustable_generator

return (
StreamSlice(start_date=start, end_date=end)
for start, end in self.make_datetime_ranges(start_datetime, pendulum.now("UTC"), self.RANGE_LENGTH_DAYS)
)
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str],
stream_slice: StreamSlice,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
start_time = pendulum.now()
for _ in range(self.CHUNKED_ENCODING_ERROR_RETRIES):
try:

self.logger.info(
f"Processing slice of {(stream_slice.end_date - stream_slice.start_date).total_days()} days for stream {self.name}"
)
for record in super().read_records(
sync_mode=sync_mode,
cursor_field=cursor_field,
stream_slice=stream_slice,
stream_state=stream_state,
):
now = pendulum.now()
self._adjustable_generator.adjust_range(now - start_time)
yield record
start_time = now
break
except ChunkedEncodingError:
self.logger.warn("ChunkedEncodingError occured, decrease days range and try again")
stream_slice = self._adjustable_generator.reduce_range()
else:
raise Exception(f"ChunkedEncodingError: Reached maximum number of retires: {self.CHUNKED_ENCODING_ERROR_RETRIES}")
Loading

0 comments on commit cf3be4a

Please sign in to comment.