diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 4ffc8152afec..8befd38c588b 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -24,7 +24,10 @@ package io.airbyte.integrations.source.postgres; +import static java.lang.Thread.sleep; + import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.AbstractIterator; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.json.Jsons; @@ -47,6 +50,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -55,7 +59,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Stream; +import java.util.function.Predicate; import org.apache.commons.lang3.RandomStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -156,8 +160,53 @@ public List> getIncrementalIterators(JsonN // Run the engine asynchronously ... executor.execute(engine); - final Stream jsonStream = Queues.toStream(queue); - final AutoCloseableIterator jsonIterator = AutoCloseableIterators.fromStream(jsonStream); + // todo - this obviously needs to actually do something. + final Predicate hasReachedLsnPredicate = (r) -> false; + final Iterator queueIterator = Queues.toStream(queue).iterator(); + final AbstractIterator iterator = new AbstractIterator<>() { + + private boolean hasReachedLsn = false; + + @Override + protected JsonNode computeNext() { + // if we have reached the lsn we stop, otherwise we have the potential to wait indefinitely for the + // next value. + if (!hasReachedLsn) { + while (!queueIterator.hasNext()) { + LOGGER.info("sleeping."); + try { + sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + final JsonNode next = queueIterator.next(); + LOGGER.info("next {}", next); + // todo fix this cast. the record passed to this iterator has to include the lsn somewhere. it can + // either be the full change event or some smaller object that just includes the lsn. + // we guarantee that this will always eventually return true, because we pick an LSN that already + // exists when we start the sync. + if (hasReachedLsnPredicate.test(next)) { + hasReachedLsn = true; + } + return next; + } else { + return endOfData(); + } + } + + }; + + final AutoCloseableIterator jsonIterator = AutoCloseableIterators.fromIterator(iterator, () -> { + engine.close(); + executor.shutdown(); + + if (thrownError.get() != null) { + throw new RuntimeException(thrownError.get()); + } + }); + final AutoCloseableIterator messageIterator = AutoCloseableIterators.transform(jsonIterator, r -> new AirbyteMessage() .withType(AirbyteMessage.Type.RECORD) .withRecord(new AirbyteRecordMessage()