-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simple default replication worker refactor #19002
Simple default replication worker refactor #19002
Conversation
@@ -139,7 +140,6 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path | |||
destinationConfig.setCatalog(mapper.mapCatalog(destinationConfig.getCatalog())); | |||
|
|||
final ThreadedTimeTracker timeTracker = new ThreadedTimeTracker(); | |||
final long startTime = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only deletion in the entire file. We can replace this with timeTracker.trackReplicationStartTime()
so this is no longer needed.
PTAL @cgardens @jdpgrailsdev @mfsiega-airbyte @benmoriceau @gosusnp I'm tagging you all since I've spoken to each and everyone of you on how ugly this function is. Should be a relatively quick review! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java
Outdated
Show resolved
Hide resolved
….com:airbytehq/airbyte into davinchia/simple-replication-worker-refactor
…nent * master: 🪟 🎉 Enable frontend validation for <1hr syncs (cloud) #19028 Stream returns AirbyteMessages (#18572) 🎉 New Source - Recruitee [low-code SDK] (#18671) 🎉 New source: Breezometer [low-code cdk] (#18650) Check disabled connections after protocol update (#18990) Simple default replication worker refactor (#19002) 🎉 New Source: Visma e-conomic (#18595) 🎉 New Source: Fastbill (#18593) Bmoric/extract state api (#18980) 🎉 New Source: Zapier Supported Storage (#18442) 🎉 New source: Klarna (#18385) `AirbyteEstimateTraceMessage` (#18875) Extract source definition api (#18977) [low-code cdk] Allow for spec file to be defined in the yaml manifest instead of an external file (#18411) 🐛 Source HubSpot: fix property scopes (#18624) Ensure that only 6-character hex values are passed to monaco editor (#18943)
What
Logic in this class is going to have to change as part of two big upcoming projects:
To prepare for this, I've gone ahead and refactored the
run
method for readability. This is a monster function. The current function is too long and contains several operational abstractions, increasing unnecessary complexity. This is the core of what we do, so it's important to ensure this code is extremely understandable.Ultimately we want to probably want to break the run method up into two or more separate classes - one that deals with replication and one that deals with outputs - for better testing, readability and isolation. This sets the stage for that.
I have intentionally NOT removed or touched any logic, nor have I put thought into consolidating the function signatures to preserve as much of the pre-existing logic and keep the changeset small and reviewable.
This changeset only renames and moves code around.
How
run
intoreplicate
andgetReplicationOutput
.replicate
rename the two runnables toreadFromDstThread
andreadSrcAndWriteDstThread
to further clarify what replicate does.getReplicationOutput
, introducegetTotalStats
,getPerStreamStats
,getFailureReasons
andsaveState
to clarify the stages of generating the output.The
readSrcAndWriteDstThread
function can probably be broken down further. However that is for a separate PR.Recommended reading order
🚨 User Impact 🚨
Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.
Pre-merge Checklist
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereUpdating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereConnector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changesTests
Unit
Put your unit tests output here.
Integration
Put your integration tests output here.
Acceptance
Put your acceptance tests output here.