Skip to content

Commit

Permalink
Add versioning logging (#18618)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp authored and nataly committed Nov 3, 2022
1 parent 7e5259a commit 46ec5d5
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory;
import io.airbyte.commons.version.Version;
import java.io.BufferedWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VersionedAirbyteMessageBufferedWriterFactory implements AirbyteMessageBufferedWriterFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(VersionedAirbyteMessageBufferedWriterFactory.class);

private final AirbyteMessageSerDeProvider serDeProvider;
private final AirbyteMessageVersionedMigratorFactory migratorFactory;
private final Version protocolVersion;
Expand All @@ -25,6 +29,11 @@ public VersionedAirbyteMessageBufferedWriterFactory(final AirbyteMessageSerDePro

@Override
public AirbyteMessageBufferedWriter createWriter(BufferedWriter bufferedWriter) {
final boolean needMigration = !protocolVersion.getMajorVersion().equals(migratorFactory.getMostRecentVersion().getMajorVersion());
LOGGER.info(
"Writing messages to protocol version {}{}",
protocolVersion.serialize(),
needMigration ? ", messages will be downgraded from protocol version " + migratorFactory.getMostRecentVersion().serialize() : "");
return new VersionedAirbyteMessageBufferedWriter<>(
bufferedWriter,
serDeProvider.getSerializer(protocolVersion).orElseThrow(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ public Stream<AirbyteMessage> create(final BufferedReader bufferedReader) {
initializeForProtocolVersion(fallbackVersion);
}
}

final boolean needMigration = !protocolVersion.getMajorVersion().equals(migratorFactory.getMostRecentVersion().getMajorVersion());
logger.info(
"Reading messages from protocol version {}{}",
protocolVersion.serialize(),
needMigration ? ", messages will be upgraded to protocol version " + migratorFactory.getMostRecentVersion().serialize() : "");
return super.create(bufferedReader);
}

Expand Down

0 comments on commit 46ec5d5

Please sign in to comment.