diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteMessageBufferedWriterFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteMessageBufferedWriterFactory.java index 0cb2f7fb920a..3b2a2c8f0a56 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteMessageBufferedWriterFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteMessageBufferedWriterFactory.java @@ -8,9 +8,13 @@ import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory; import io.airbyte.commons.version.Version; import java.io.BufferedWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class VersionedAirbyteMessageBufferedWriterFactory implements AirbyteMessageBufferedWriterFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(VersionedAirbyteMessageBufferedWriterFactory.class); + private final AirbyteMessageSerDeProvider serDeProvider; private final AirbyteMessageVersionedMigratorFactory migratorFactory; private final Version protocolVersion; @@ -25,6 +29,11 @@ public VersionedAirbyteMessageBufferedWriterFactory(final AirbyteMessageSerDePro @Override public AirbyteMessageBufferedWriter createWriter(BufferedWriter bufferedWriter) { + final boolean needMigration = !protocolVersion.getMajorVersion().equals(migratorFactory.getMostRecentVersion().getMajorVersion()); + LOGGER.info( + "Writing messages to protocol version {}{}", + protocolVersion.serialize(), + needMigration ? ", messages will be downgraded from protocol version " + migratorFactory.getMostRecentVersion().serialize() : ""); return new VersionedAirbyteMessageBufferedWriter<>( bufferedWriter, serDeProvider.getSerializer(protocolVersion).orElseThrow(), diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java index 65a2cd461fe1..fe4a88d56605 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java @@ -92,6 +92,12 @@ public Stream create(final BufferedReader bufferedReader) { initializeForProtocolVersion(fallbackVersion); } } + + final boolean needMigration = !protocolVersion.getMajorVersion().equals(migratorFactory.getMostRecentVersion().getMajorVersion()); + logger.info( + "Reading messages from protocol version {}{}", + protocolVersion.serialize(), + needMigration ? ", messages will be upgraded to protocol version " + migratorFactory.getMostRecentVersion().serialize() : ""); return super.create(bufferedReader); }