Skip to content

Commit

Permalink
remove sleep logic when the queue is full from CDC (#5600)
Browse files Browse the repository at this point in the history
* dont sleep when queue is full

* bump version
  • Loading branch information
subodh1810 authored Aug 30, 2021
1 parent 314a747 commit a53dd7e
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
- sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
name: Microsoft SQL Server (MSSQL)
dockerRepository: airbyte/source-mssql
dockerImageTag: 0.3.4
dockerImageTag: 0.3.5
documentationUrl: https://docs.airbyte.io/integrations/sources/mssql
icon: mssql.svg
- sourceDefinitionId: d8286229-c680-4063-8c59-23b9b391c700
Expand All @@ -56,7 +56,7 @@
- sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
name: Postgres
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.3.9
dockerImageTag: 0.3.10
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
- sourceDefinitionId: 9fa5862c-da7c-11eb-8d19-0242ac130003
Expand Down Expand Up @@ -101,7 +101,7 @@
- sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
name: MySQL
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.4.3
dockerImageTag: 0.4.4
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
- sourceDefinitionId: 2470e835-feaf-4db6-96f3-70fd645acc77
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class DebeziumRecordIterator extends AbstractIterator<ChangeEvent<String,
private final VoidCallable requestClose;
private boolean receivedFirstRecord;
private boolean hasSnapshotFinished;
private boolean signalledClose;

public DebeziumRecordIterator(LinkedBlockingQueue<ChangeEvent<String, String>> queue,
CdcTargetPosition targetPosition,
Expand All @@ -74,6 +75,7 @@ public DebeziumRecordIterator(LinkedBlockingQueue<ChangeEvent<String, String>> q
this.requestClose = requestClose;
this.receivedFirstRecord = false;
this.hasSnapshotFinished = true;
this.signalledClose = false;
}

@Override
Expand Down Expand Up @@ -103,7 +105,7 @@ protected ChangeEvent<String, String> computeNext() {
hasSnapshotFinished = hasSnapshotFinished(eventAsJson);

// if the last record matches the target file position, it is time to tell the producer to shutdown.
if (shouldSignalClose(eventAsJson)) {
if (!signalledClose && shouldSignalClose(eventAsJson)) {
requestClose();
}
receivedFirstRecord = true;
Expand Down Expand Up @@ -135,8 +137,7 @@ private boolean hasSnapshotFinished(JsonNode eventAsJson) {
*/
@Override
public void close() throws Exception {
requestClose.call();
throwExceptionIfSnapshotNotFinished();
requestClose();
}

private boolean shouldSignalClose(JsonNode eventAsJson) {
Expand All @@ -146,6 +147,7 @@ private boolean shouldSignalClose(JsonNode eventAsJson) {
private void requestClose() {
try {
requestClose.call();
signalledClose = true;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,6 @@ public void start(Queue<ChangeEvent<String, String>> queue) {
boolean inserted = false;
while (!inserted) {
inserted = queue.offer(e);
if (!inserted) {
try {
Thread.sleep(10);
} catch (InterruptedException interruptedException) {
throw new RuntimeException(interruptedException);
}
}
}
}
})
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.4
LABEL io.airbyte.version=0.3.5
LABEL io.airbyte.name=airbyte/source-mssql
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.4.3
LABEL io.airbyte.version=0.4.4

LABEL io.airbyte.name=airbyte/source-mysql
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.9
LABEL io.airbyte.version=0.3.10
LABEL io.airbyte.name=airbyte/source-postgres

0 comments on commit a53dd7e

Please sign in to comment.