Skip to content

Commit

Permalink
🚨 Destination Snowflake: checkpointing flush/commit and emit STATE me…
Browse files Browse the repository at this point in the history
…ssage 🚨 (#20631)

* Checkpointing flush/commit and emit STATE message

* Fixed tests for SerializedBufferingStrategy

* Updates BigQuery to support checkpointing and consolidates method naming for uploading from staging (#21028)

* Updates BigQuery to support checkpointing and consolidates method naming for uploading from staging

* Updated messages to reflect method changes

* Updates createTable to include mimic replication by calling createPartitionTable and removes unused copyIntoTargetTable

* Updates the COPY INTO methods to match writing to table

* Fixed comments and non-executed path

* Fixed BufferedStreamConsumerTest to support new logic for checkpointing

* Removed cleanup logic that no longer applies with checkpointing changes

* Checkpointing flush/commit and emit STATE message

* Updates BigQuery to support checkpointing and consolidates method naming for uploading from staging (#21028)

* Updates BigQuery to support checkpointing and consolidates method naming for uploading from staging

* Updated messages to reflect method changes

* Updates createTable to include mimic replication by calling createPartitionTable and removes unused copyIntoTargetTable

* Updates the COPY INTO methods to match writing to table

* Fixed comments and non-executed path

* Resolved BigQuery partitioning tests and parameterized GCS Staging test

* Fixed review comments and bumps version number

* Definition generation
  • Loading branch information
ryankfu authored Jan 26, 2023
1 parent ad8a632 commit aa6afd0
Show file tree
Hide file tree
Showing 42 changed files with 692 additions and 402 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
- name: BigQuery
destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 1.2.12
dockerImageTag: 1.2.13
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
icon: bigquery.svg
normalizationConfig:
Expand Down Expand Up @@ -290,7 +290,7 @@
- name: Redshift
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.54
dockerImageTag: 0.3.55
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
icon: redshift.svg
normalizationConfig:
Expand Down Expand Up @@ -348,7 +348,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.4.45
dockerImageTag: 0.4.46
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
icon: snowflake.svg
normalizationConfig:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-bigquery:1.2.12"
- dockerImage: "airbyte/destination-bigquery:1.2.13"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/bigquery"
connectionSpecification:
Expand Down Expand Up @@ -5123,7 +5123,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-redshift:0.3.54"
- dockerImage: "airbyte/destination-redshift:0.3.55"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/redshift"
connectionSpecification:
Expand Down Expand Up @@ -6109,7 +6109,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:0.4.45"
- dockerImage: "airbyte/destination-snowflake:0.4.46"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/snowflake"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ public abstract String uploadRecordsToBucket(SerializableBuffer recordsData, Str
*/
public abstract void cleanUpBucketObject(String objectPath, List<String> stagedFiles) throws Exception;

/**
* Deletes all the bucket objects for the specified bucket path
*
* @param namespace Optional source-defined namespace name
* @param streamName Name of the stream
* @param objectPath file path to where staging files are stored
* @param pathFormat formatted string for the path
*/
public abstract void cleanUpBucketObject(String namespace, String streamName, String objectPath, String pathFormat);

public abstract void dropBucketObject(String objectPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ public void start() throws Exception {
}

/**
* Wraps actual processing of each {@link AirbyteMessage}
* Processing of AirbyteMessages with general functionality of storing STATE messages, serializing
* RECORD messages and storage within a buffer
*
* NOTE: Not all the functionality mentioned above is always true but generally applies
*
* @param msg {@link AirbyteMessage} to be processed
* @throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.destination.dest_state_lifecycle_manager.DefaultDestStateLifecycleManager;
import io.airbyte.integrations.destination.dest_state_lifecycle_manager.DestStateLifecycleManager;
import io.airbyte.integrations.destination.record_buffer.BufferFlushType;
import io.airbyte.integrations.destination.record_buffer.BufferingStrategy;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
Expand All @@ -22,6 +23,7 @@
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import org.slf4j.Logger;
Expand Down Expand Up @@ -125,6 +127,7 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
final AirbyteRecordMessage recordMessage = message.getRecord();
final AirbyteStreamNameNamespacePair stream = AirbyteStreamNameNamespacePair.fromRecordMessage(recordMessage);

// if stream is not part of list of streams to sync to then throw invalid stream exception
if (!streamNames.contains(stream)) {
throwUnrecognizedStream(catalog, message);
}
Expand All @@ -134,22 +137,47 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
return;
}

// if the buffer flushes, update the states appropriately.
// TODO: ryankfu (if true, this is where bundling up logic to also commit to airbyte_raw table)
if (bufferingStrategy.addRecord(stream, message)) {
markStatesAsFlushedToTmpDestination();
final Optional<BufferFlushType> flushType = bufferingStrategy.addRecord(stream, message);
// if present means that a flush occurred
if (flushType.isPresent()) {
if (BufferFlushType.FLUSH_ALL.equals(flushType.get())) {
// when all buffers have been flushed then we can update all states as flushed
markStatesAsFlushedToDestination();
} else if (BufferFlushType.FLUSH_SINGLE_STREAM.equals(flushType.get())) {
if (stateManager.supportsPerStreamFlush()) {
// per-stream instance can handle flush of just a single stream
markStatesAsFlushedToDestination();
}
/*
* We don't mark {@link AirbyteStateMessage} as committed in the case with GLOBAL/LEGACY because
* within a single stream being flushed it is not deterministic that all the AirbyteRecordMessages
* have been committed
*/
}
}

/*
* TODO: (ryankfu) after record has added and time has been met then to see if time has elapsed then
* flush the buffer
*
* The reason that this is where time component should exist here is primarily due to #acceptTracked
* is the method that processes each AirbyteMessage. Method to call is
* bufferingStrategy.flushWriter(stream, streamBuffer)
*/
} else if (message.getType() == Type.STATE) {
stateManager.addState(message);
} else {
LOGGER.warn("Unexpected message: " + message.getType());
}

}

private void markStatesAsFlushedToTmpDestination() {
stateManager.markPendingAsFlushed();
/**
* After marking states as committed, emit the state message to platform then clear state messages
* to avoid resending the same state message to the platform
*/
private void markStatesAsFlushedToDestination() {
stateManager.markPendingAsCommitted();
stateManager.listCommitted().forEach(outputRecordCollector);
stateManager.clearCommitted();
}

private static void throwUnrecognizedStream(final ConfiguredAirbyteCatalog catalog, final AirbyteMessage message) {
Expand All @@ -158,6 +186,15 @@ private static void throwUnrecognizedStream(final ConfiguredAirbyteCatalog catal
Jsons.serialize(catalog), Jsons.serialize(message)));
}

/**
* Cleans up buffer based on whether the sync was successful or some exception occurred. In the case
* where a failure occurred we do a simple clean up any lingering data. Otherwise, flush any
* remaining data that has been stored. This is fine even if the state has not been received since
* this Airbyte promises at least once delivery
*
* @param hasFailed true if the stream replication failed partway through, false otherwise
* @throws Exception
*/
@Override
protected void close(final boolean hasFailed) throws Exception {
Preconditions.checkState(hasStarted, "Cannot close; has not started.");
Expand All @@ -170,17 +207,22 @@ protected void close(final boolean hasFailed) throws Exception {
LOGGER.error("executing on failed close procedure.");
} else {
LOGGER.info("executing on success close procedure.");
// When flushing the buffer, this will call the respective #flushBufferFunction which bundles
// the flush and commit operation, so if successful then mark state as committed
bufferingStrategy.flushAll();
markStatesAsFlushedToTmpDestination();
markStatesAsFlushedToDestination();
}
bufferingStrategy.close();

try {
// flushed is empty in 2 cases:
// 1. either it is full refresh (no state is emitted necessarily).
// 2. it is stream but no states were flushed.
// in both of these cases, if there was a failure, we should not bother committing. otherwise,
// attempt to commit.
/*
* TODO: (ryankfu) Remove usage of hasFailed with onClose after all destination connectors have been
* updated to support checkpointing
*
* flushed is empty in 2 cases: 1. either it is full refresh (no state is emitted necessarily) 2. it
* is stream but no states were flushed in both of these cases, if there was a failure, we should
* not bother committing. otherwise attempt to commit
*/
if (stateManager.listFlushed().isEmpty()) {
onClose.accept(hasFailed);
} else {
Expand All @@ -192,10 +234,6 @@ protected void close(final boolean hasFailed) throws Exception {
onClose.accept(false);
}

// TODO: (ryankfu) at this section for when we close the stream and mark stream as committed
// if onClose succeeds without exception then we can emit the state record because it means its
// records were not only flushed, but committed.
stateManager.markFlushedAsCommitted();
stateManager.listCommitted().forEach(outputRecordCollector);
} catch (final Exception e) {
LOGGER.error("Close failed.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,24 @@ public void markFlushedAsCommitted() {
internalStateManagerSupplier.get().markFlushedAsCommitted();
}

@Override
public void markPendingAsCommitted() {
internalStateManagerSupplier.get().markPendingAsCommitted();
}

@Override
public void clearCommitted() {
internalStateManagerSupplier.get().clearCommitted();
}

@Override
public Queue<AirbyteMessage> listCommitted() {
return internalStateManagerSupplier.get().listCommitted();
}

@Override
public boolean supportsPerStreamFlush() {
return internalStateManagerSupplier.get().supportsPerStreamFlush();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ public void markFlushedAsCommitted() {
}
}

@Override
public void clearCommitted() {
lastCommittedState = null;
}

@Override
public void markPendingAsCommitted() {
if (lastPendingState != null) {
lastCommittedState = lastPendingState;
lastPendingState = null;
}
}

@Override
public Queue<AirbyteMessage> listCommitted() {
return stateMessageToQueue(lastCommittedState);
Expand All @@ -67,4 +80,9 @@ private static Queue<AirbyteMessage> stateMessageToQueue(final AirbyteMessage st
return new LinkedList<>(stateMessage == null ? Collections.emptyList() : List.of(stateMessage));
}

@Override
public boolean supportsPerStreamFlush() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* been committed</li>
* <li>committed - associated records have been committed</li>
* </ol>
*
*/
public interface DestStateLifecycleManager {

Expand All @@ -28,6 +29,9 @@ public interface DestStateLifecycleManager {

/**
* Moves any tracked state messages that are currently pending to flushed.
*
* @Deprecated since destination checkpointing will be bundling flush & commit into the same
* operation
*/
void markPendingAsFlushed();

Expand All @@ -40,14 +44,39 @@ public interface DestStateLifecycleManager {

/**
* Moves any tracked state messages that are currently flushed to committed.
*
* @Deprecated since destination checkpointing will be bundling flush and commit into the same
* operation
*/
void markFlushedAsCommitted();

/**
* Clears any committed state messages, this is called after returning the state message to the
* platform. The rationale behind this logic is to avoid returning duplicated state messages that
* would otherwise be held in the `committed` state
*/
void clearCommitted();

/**
* Moves any tracked state messages that are currently pending to committed.
*
* Note: that this is skipping "flushed" state since flushed meant that this was using a staging
* area to hold onto files, for the changes with checkpointing this step is skipped. It follows
* under the guiding principle that destination needs to commit
* {@link io.airbyte.protocol.models.AirbyteRecordMessage} more frequently to checkpoint. The new
* transaction logic will be:
*
* Buffer -(flush)-> Staging (Blob Storage) -(commit to airbyte_raw)-> Destination table
*/
void markPendingAsCommitted();

/**
* List all tracked state messages that are committed.
*
* @return list of state messages
*/
Queue<AirbyteMessage> listCommitted();

boolean supportsPerStreamFlush();

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ Queue<AirbyteMessage> listPending() {
return listStatesInOrder(streamToLastPendingState);
}

/*
* Similar to #markFlushedAsCommmitted, this method should no longer be used to align with the
* changes to destination checkpointing where flush/commit operations will be bundled
*/
@Deprecated
@Override
public void markPendingAsFlushed() {
moveToNextPhase(streamToLastPendingState, streamToLastFlushedState);
Expand All @@ -62,16 +67,37 @@ public Queue<AirbyteMessage> listFlushed() {
return listStatesInOrder(streamToLastFlushedState);
}

/*
* During the process of migration to destination checkpointing, this method should no longer be in
* use in favor of #markPendingAsCommitted where states will be flushed/committed as a singular
* transaction
*/
@Deprecated
@Override
public void markFlushedAsCommitted() {
moveToNextPhase(streamToLastFlushedState, streamToLastCommittedState);
}

@Override
public void clearCommitted() {
streamToLastCommittedState.clear();
}

@Override
public void markPendingAsCommitted() {
moveToNextPhase(streamToLastPendingState, streamToLastCommittedState);
}

@Override
public Queue<AirbyteMessage> listCommitted() {
return listStatesInOrder(streamToLastCommittedState);
}

@Override
public boolean supportsPerStreamFlush() {
return true;
}

/**
* Lists out the states in the stream to state maps. Guarantees a deterministic sort order, which is
* handy because we are going from a map (unsorted) to a queue. The sort order primary sort on
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.record_buffer;

public enum BufferFlushType {
FLUSH_ALL,
FLUSH_SINGLE_STREAM
}
Loading

0 comments on commit aa6afd0

Please sign in to comment.