Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix(source-s3): Optimize File-Based performance in Python CDK and Source-S3 #45721

Draft
wants to merge 105 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 101 commits
Commits
Show all changes
105 commits
Select commit Hold shift + click to select a range
726a722
initial code scaffold for dataframe processing
aaronsteers Aug 16, 2024
faa8517
drive-by-fix: typo in type hint
aaronsteers Aug 20, 2024
9fc9cf7
Merge remote-tracking branch 'origin/master' into aj/source-s3/datafr…
aaronsteers Aug 20, 2024
50207aa
drive-by-fix: missing .gitignore for test artifact
aaronsteers Aug 20, 2024
392c43b
mark method as abstract
aaronsteers Aug 20, 2024
f8a7b7c
`poetry add polars` (TODO: Move to extra)
aaronsteers Aug 20, 2024
0a00d1e
implement `parse_records_to_dataframes()` for jsonl file type
aaronsteers Aug 20, 2024
cec132a
add config option `FileBasedStreamConfig.bulk_mode`
aaronsteers Aug 20, 2024
09e8bf7
apply new enum class
aaronsteers Aug 20, 2024
c78a576
checkpoint: basic plumbing in place
aaronsteers Aug 22, 2024
1dab0c4
resolve version conflicts in `source-s3`
aaronsteers Aug 22, 2024
63a8dc2
minor fixes
aaronsteers Aug 22, 2024
2d3031d
ability to step-debug "full refresh" acceptance tests
aaronsteers Aug 22, 2024
a7b1989
make polars part of the file-based extra
aaronsteers Aug 22, 2024
e8b4a2d
fix lock check in airbyte-ci
aaronsteers Aug 22, 2024
92733e9
script to download secret config
aaronsteers Aug 22, 2024
cbb8777
fix extra args
aaronsteers Aug 22, 2024
07f7929
cleanup secret fetch script
aaronsteers Aug 22, 2024
d0da02a
checkpoint: jsonl sync running successfully
aaronsteers Aug 23, 2024
87ea175
tidy secrets install script using latest pyairbyte features
aaronsteers Sep 2, 2024
82568e0
tidy some more
aaronsteers Sep 2, 2024
f8093ce
make helper script slightly more reusable
aaronsteers Sep 6, 2024
a487c13
use local CDK in poetry
aaronsteers Sep 6, 2024
3a6305d
add read_to_buffer stub
aaronsteers Sep 6, 2024
77120ab
improve handling
aaronsteers Sep 10, 2024
1ca2ead
add perftest
aaronsteers Sep 10, 2024
a92fa34
chore: perf tests
aaronsteers Sep 10, 2024
feac74a
add code to stream partition class
aaronsteers Sep 10, 2024
ed0032b
lint fixes
aaronsteers Sep 10, 2024
d1abb84
default to lazy
aaronsteers Sep 10, 2024
1e2e657
add `out` override arg
aaronsteers Sep 10, 2024
b8ff8cd
Merge remote-tracking branch 'origin/master' into aj/source-s3/datafr…
aaronsteers Sep 10, 2024
b8c0b11
git: hide python venvs
aaronsteers Sep 12, 2024
2b0e986
update perf test script
aaronsteers Sep 12, 2024
9eece3f
move perf test script to new poetry project
aaronsteers Sep 17, 2024
42ed3ea
Merge remote-tracking branch 'origin/master' into aj/source-s3/datafr…
aaronsteers Sep 17, 2024
6744216
re-lock poetry in cdk and connector
aaronsteers Sep 17, 2024
b982e50
working sync and perf tests
aaronsteers Sep 17, 2024
1589768
chore: misc pr clean up
aaronsteers Sep 18, 2024
55e1f46
update perf test script
aaronsteers Sep 18, 2024
e00857f
fix: add missing file-based columns
aaronsteers Sep 18, 2024
1c5b7ea
chore: clean up pr
aaronsteers Sep 18, 2024
78a2a99
chore: clean up defaults
aaronsteers Sep 18, 2024
faa9068
clean up perf test script
aaronsteers Sep 18, 2024
952d90d
improve default bulk mode handling
aaronsteers Sep 18, 2024
209caf7
add bulk mode logging
aaronsteers Sep 18, 2024
7112041
improve bulk mode resolve
aaronsteers Sep 18, 2024
644861f
tidy
aaronsteers Sep 18, 2024
0901763
tidy
aaronsteers Sep 18, 2024
486fdab
tidy jsonl parser comments
aaronsteers Sep 18, 2024
cd5a7dd
rename variable
aaronsteers Sep 18, 2024
d7c7af7
fix type hint
aaronsteers Sep 18, 2024
0d80e81
update perf-test script
aaronsteers Sep 18, 2024
a94187d
chore: update comment
aaronsteers Sep 18, 2024
015fd90
delete unused
aaronsteers Sep 18, 2024
4cec7a9
multiple fixes, refactoring, including change to concurrent cursor fo…
aaronsteers Sep 20, 2024
60f843c
chore: add comment
aaronsteers Sep 20, 2024
6a20761
chore: add CLI entrypoint
aaronsteers Sep 20, 2024
5ebf102
update tests
aaronsteers Sep 20, 2024
87e43c7
update poetry projects and lock files
aaronsteers Sep 20, 2024
c930f69
remove very slow tests from acceptance tests
aaronsteers Sep 20, 2024
b2cd2c5
minor format stuff
aaronsteers Sep 20, 2024
51d511c
clean up files
aaronsteers Sep 20, 2024
9526835
update comment
aaronsteers Sep 20, 2024
da92440
update poetry
aaronsteers Sep 21, 2024
b946e43
update perf test
aaronsteers Sep 21, 2024
1919b8d
update cursor logic
aaronsteers Sep 21, 2024
2bcbc98
update concurrency
aaronsteers Sep 21, 2024
6aba77e
update poetry
aaronsteers Sep 21, 2024
5478be1
buffered reads
aaronsteers Sep 21, 2024
b8514f4
Merge remote-tracking branch 'origin/master' into source-s3/cdk5-refa…
aaronsteers Sep 21, 2024
0bb5fa3
revert out-of-scope changes
aaronsteers Sep 21, 2024
e8ceecb
tidy pr updates
aaronsteers Sep 21, 2024
f135a97
re-lock poetry
aaronsteers Sep 23, 2024
84bbb4e
Merge remote-tracking branch 'origin/master' into source-s3/cdk5-refa…
aaronsteers Sep 23, 2024
e4be4e1
improve perf test script
aaronsteers Oct 1, 2024
3b7b530
fixing tests (wip)
aaronsteers Oct 1, 2024
c3c0465
enable unit tests
aaronsteers Oct 1, 2024
8ccfe34
Merge remote-tracking branch 'origin/master' into source-s3/cdk5-refa…
aaronsteers Oct 14, 2024
a8cd27f
reduce formatting noise in pr
aaronsteers Oct 14, 2024
ee56011
chore: `poetry lock` x2
aaronsteers Oct 14, 2024
c488f7d
deletion: remove `run_perf_tests.py` (in favor of `pyab benchmark` CLI)
aaronsteers Oct 14, 2024
3266618
add cloud benchmark test script
aaronsteers Oct 15, 2024
0ffe85a
map a local concurrency constant
aaronsteers Oct 17, 2024
9ce886e
chore: version bump to 5.0.0-rc.1
aaronsteers Oct 18, 2024
1ed9fef
add cast for readability
aaronsteers Oct 18, 2024
dc9fa75
update some tests
aaronsteers Oct 18, 2024
57fca09
Merge remote-tracking branch 'origin/master' into source-s3/cdk5-refa…
aaronsteers Oct 18, 2024
7c5be88
remove 'v4' references
aaronsteers Oct 18, 2024
5293c57
add back source.py (from v4 directory)
aaronsteers Oct 18, 2024
69963bf
poetry lock (local ref)
aaronsteers Oct 18, 2024
fd6d0dd
remove perf test from acceptance tests
aaronsteers Oct 18, 2024
3b61159
fix perf test constructor
aaronsteers Oct 18, 2024
ae8831f
remove legacy migration code
aaronsteers Oct 18, 2024
a892d80
re-lock without local cdk ref
aaronsteers Oct 18, 2024
815b6de
delete perf test file
aaronsteers Oct 18, 2024
1aaba60
add breaking change notice and release candidate flag
aaronsteers Oct 18, 2024
e65fb7c
Merge remote-tracking branch 'origin/master' into source-s3/cdk5-refa…
aaronsteers Oct 18, 2024
b620396
restrict numpy in CDK to <2.0 for now
aaronsteers Oct 18, 2024
a134e40
restrict numpy in CDK to <2.0 for now
aaronsteers Oct 18, 2024
795385a
delete bulk perf test catalog
aaronsteers Oct 18, 2024
18af05c
remove legacy state file test, rename for readability
aaronsteers Oct 18, 2024
3cad55d
try adding incremental tests to acceptance test suite
aaronsteers Oct 21, 2024
c62c5b9
Merge remote-tracking branch 'origin/master' into source-s3/cdk5-refa…
aaronsteers Oct 21, 2024
6b34856
Merge branch 'master' into source-s3/cdk5-refactor-and-fixes
aaronsteers Oct 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
venv
.venv
.venv-*
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
.gradle
.idea
*.iml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,18 @@
import traceback
from abc import ABC
from collections import Counter
from typing import Any, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Type, Union
from typing import (
Any,
Iterator,
List,
Mapping,
MutableMapping,
Optional,
Tuple,
Type,
Union,
cast,
)

from airbyte_cdk.logger import AirbyteLogFormatter, init_logger
from airbyte_cdk.models import (
Expand Down Expand Up @@ -187,42 +198,60 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
)
self._validate_input_schema(stream_config)

sync_mode = self._get_sync_mode_from_catalog(stream_config.name)
sync_mode: SyncMode | None = self._get_sync_mode_from_catalog(stream_config.name)
# Note: sync_mode may be `None` in `check` and `discover` modes.

if sync_mode == SyncMode.full_refresh and hasattr(self, "_concurrency_level") and self._concurrency_level is not None:
cursor = FileBasedFinalStateCursor(
stream_config=stream_config, stream_namespace=None, message_repository=self.message_repository
if (
hasattr(self, "_concurrency_level") and self._concurrency_level is not None
and not issubclass(self.cursor_cls, AbstractConcurrentFileBasedCursor)
):
self.logger.warning(
"An internal error occurred. The cursor class must be a concurrent "
"cursor if concurrency level is set. "
"Falling back to non-concurrent execution, which may be slower."
)
stream = FileBasedStreamFacade.create_from_stream(
self._make_default_stream(stream_config, cursor), self, self.logger, stream_state, cursor
self._concurrency_level = None

if not hasattr(self, "_concurrency_level") or self._concurrency_level is None:
# Concurrency not supported for this source.
# Expect a non-concurrent cursor class.
cursor = cast(
AbstractFileBasedCursor, self.cursor_cls(stream_config)
)
stream = self._make_default_stream(stream_config, cursor)
streams.append(stream)
continue

elif (
sync_mode == SyncMode.incremental
and issubclass(self.cursor_cls, AbstractConcurrentFileBasedCursor)
and hasattr(self, "_concurrency_level")
and self._concurrency_level is not None
):
assert (
state_manager is not None
), "No ConnectorStateManager was created, but it is required for incremental syncs. This is unexpected. Please contact Support."

cursor = self.cursor_cls(
stream_config,
stream_config.name,
None,
stream_state,
self.message_repository,
state_manager,
CursorField(DefaultFileBasedStream.ab_last_mod_col),
# Else, we have a concurrency level set and a valid concurrent cursor class.

if sync_mode == SyncMode.full_refresh or sync_mode is None:
cursor = FileBasedFinalStateCursor(
stream_config=stream_config, stream_namespace=None, message_repository=self.message_repository
)
stream = FileBasedStreamFacade.create_from_stream(
self._make_default_stream(stream_config, cursor), self, self.logger, stream_state, cursor
)
else:
cursor = self.cursor_cls(stream_config)
stream = self._make_default_stream(stream_config, cursor)

streams.append(stream)
continue

# Else, incremental sync with concurrent cursor:

assert (
state_manager is not None
), "No ConnectorStateManager was created, but it is required for incremental syncs. This is unexpected. Please contact Support."

cursor = self.cursor_cls(
stream_config,
stream_config.name,
None,
stream_state,
self.message_repository,
state_manager,
CursorField(DefaultFileBasedStream.ab_last_mod_col),
)
stream = FileBasedStreamFacade.create_from_stream(
self._make_default_stream(stream_config, cursor), self, self.logger, stream_state, cursor
)
streams.append(stream)
return streams

Expand Down
Loading
Loading