From eaeb2608d76bdcbe66b5b60f152002791df96735 Mon Sep 17 00:00:00 2001 From: evantahler Date: Wed, 19 Oct 2022 13:23:15 -0700 Subject: [PATCH] `AirbyteConfigMessage` -> `AirbyteConnectorConfigMessage` --- .../airbyte_cdk/models/airbyte_protocol.py | 8 ++-- .../internal/AirbyteMessageTracker.java | 8 ++-- .../airbyte_protocol/airbyte_protocol.yaml | 10 ++--- .../understanding-airbyte/airbyte-protocol.md | 43 +++++++++---------- 4 files changed, 34 insertions(+), 35 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py b/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py index 2bdd1abbcd83..12f10d8241ed 100644 --- a/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py +++ b/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py @@ -21,7 +21,7 @@ class Type(Enum): CONNECTION_STATUS = "CONNECTION_STATUS" CATALOG = "CATALOG" TRACE = "TRACE" - CONFIG = "CONFIG" + CONNECTOR_CONFIG = "CONNECTOR_CONFIG" class AirbyteRecordMessage(BaseModel): @@ -98,7 +98,7 @@ class Config: failure_type: Optional[FailureType] = Field(None, description="The type of error") -class AirbyteConfigMessage(BaseModel): +class AirbyteConnectorConfigMessage(BaseModel): class Config: extra = Extra.allow @@ -342,9 +342,9 @@ class Config: None, description="trace message: a message to communicate information about the status and performance of a connector", ) - config: Optional[AirbyteConfigMessage] = Field( + connectorConfig: Optional[AirbyteConnectorConfigMessage] = Field( None, - description="config message: a message to communicate an updated configuration from a connector that should be persisted", + description="connector config message: a message to communicate an updated configuration from a connector that should be persisted", ) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java index e1d6229ac38a..f5c84b9798a9 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java @@ -14,7 +14,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.config.FailureReason; import io.airbyte.config.State; -import io.airbyte.protocol.models.AirbyteConfigMessage; +import io.airbyte.protocol.models.AirbyteConnectorConfigMessage; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStateMessage; @@ -107,7 +107,7 @@ public void acceptFromSource(final AirbyteMessage message) { case TRACE -> handleEmittedTrace(message.getTrace(), ConnectorType.SOURCE); case RECORD -> handleSourceEmittedRecord(message.getRecord()); case STATE -> handleSourceEmittedState(message.getState()); - case CONFIG -> handleEmittedConfig(message.getConfig(), ConnectorType.SOURCE); + case CONNECTOR_CONFIG -> handleEmittedConfig(message.getConnectorConfig(), ConnectorType.SOURCE); default -> log.warn("Invalid message type for message: {}", message); } } @@ -119,7 +119,7 @@ public void acceptFromDestination(final AirbyteMessage message) { switch (message.getType()) { case TRACE -> handleEmittedTrace(message.getTrace(), ConnectorType.DESTINATION); case STATE -> handleDestinationEmittedState(message.getState()); - case CONFIG -> handleEmittedConfig(message.getConfig(), ConnectorType.DESTINATION); + case CONNECTOR_CONFIG -> handleEmittedConfig(message.getConnectorConfig(), ConnectorType.DESTINATION); default -> log.warn("Invalid message type for message: {}", message); } } @@ -218,7 +218,7 @@ private void handleDestinationEmittedState(final AirbyteStateMessage stateMessag * When a connector needs to update its configuration */ @SuppressWarnings("PMD") // until method is implemented - private void handleEmittedConfig(final AirbyteConfigMessage configMessage, final ConnectorType connectorType) { + private void handleEmittedConfig(final AirbyteConnectorConfigMessage configMessage, final ConnectorType connectorType) { // TODO: Update config here /** * Pseudocode: for (key in configMessage.getConfig()) { validateIsReallyConfig(key); diff --git a/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml b/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml index ea7a50653dbc..f9c694113059 100644 --- a/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml +++ b/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml @@ -28,7 +28,7 @@ definitions: - CONNECTION_STATUS - CATALOG - TRACE - - CONFIG + - CONNECTOR_CONFIG log: description: "log message: any kind of logging you want the platform to know about." "$ref": "#/definitions/AirbyteLogMessage" @@ -49,9 +49,9 @@ definitions: trace: description: "trace message: a message to communicate information about the status and performance of a connector" "$ref": "#/definitions/AirbyteTraceMessage" - config: - description: "config message: a message to communicate an updated configuration from a connector that should be persisted" - "$ref": "#/definitions/AirbyteConfigMessage" + connectorConfig: + description: "connector config message: a message to communicate an updated configuration from a connector that should be persisted" + "$ref": "#/definitions/AirbyteConnectorConfigMessage" AirbyteRecordMessage: type: object additionalProperties: true @@ -201,7 +201,7 @@ definitions: enum: - system_error - config_error - AirbyteConfigMessage: + AirbyteConnectorConfigMessage: type: object additionalProperties: true required: diff --git a/docs/understanding-airbyte/airbyte-protocol.md b/docs/understanding-airbyte/airbyte-protocol.md index 97c68fe1d0b7..452e98e4619d 100644 --- a/docs/understanding-airbyte/airbyte-protocol.md +++ b/docs/understanding-airbyte/airbyte-protocol.md @@ -27,8 +27,8 @@ Each of these concepts is described in greater depth in their respective section The Airbyte Protocol is versioned independently of the Airbyte Platform, and the version number is used to determine the compatibility between connectors and the Airbyte Platform. | Version | Date of Change | Pull Request(s) | Subject | -|:---------|:---------------|:--------------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------| -| `v0.3.1` | 2022-10-12 | [17907](https://github.com/airbytehq/airbyte/pull/17907) | `AirbyteConfigMessage` added | +| :------- | :------------- | :------------------------------------------------------------------------------------------------------------------ | :------------------------------------------------------------------------------- | +| `v0.3.1` | 2022-10-12 | [17907](https://github.com/airbytehq/airbyte/pull/17907) | `AirbyteConnectorConfigMessage` added | | `v0.3.0` | 2022-09-09 | [16479](https://github.com/airbytehq/airbyte/pull/16479) | `AirbyteLogMessage.stack_trace` added | | `v0.2.0` | 2022-06-10 | [13573](https://github.com/airbytehq/airbyte/pull/13573) & [12586](https://github.com/airbytehq/airbyte/pull/12586) | `STREAM` and `GLOBAL` STATE messages | | `v0.1.1` | 2022-06-06 | [13356](https://github.com/airbytehq/airbyte/pull/13356) | Add a namespace in association with the stream name | @@ -804,33 +804,32 @@ AirbyteErrorTraceMessage: - config_error ``` -## AirbyteConfigMessage +## AirbyteConnectorConfigMessage -An `AirbyteConfigMessage` allows a connector to update its configuration in the middle of a sync. This is valuable for connectors with short-lived or single-use credentials. +An `AirbyteConnectorConfigMessage` allows a connector to update its configuration in the middle of a sync. This is valuable for connectors with short-lived or single-use credentials. -Emitting this message signals to the orchestrator process (for example, the Airbyte platform) that it should update its persistence layer, replacing the connector's current configuration with the config present in the `.config` field of the message. +Emitting this message signals to the orchestrator process (for example, the Airbyte platform) that it should update its persistence layer, replacing the connector's current configuration with the config present in the `.config` field of the message. -The config in the `AirbyteConfigMessage` must conform to connector's specification's schema, and the orchestrator process is expected to validate this invariant. If the output config does not conform to the specification's schema, the orchestrator process should raise an exception and terminate the sync. +The config in the `AirbyteConnectorConfigMessage` must conform to connector's specification's schema, and the orchestrator process is expected to validate these messages. If the output config does not conform to the specification's schema, the orchestrator process should raise an exception and terminate the sync. ```yaml - AirbyteConfigMessage: - type: object - additionalProperties: true - required: - - config - - emitted_at - properties: - emitted_at: - description: "the time in ms that the message was emitted" - type: number - config: - description: "the config items from this connector's spec to update" - type: object - additionalProperties: true +AirbyteConnectorConfigMessage: + type: object + additionalProperties: true + required: + - config + - emitted_at + properties: + emitted_at: + description: "the time in ms that the message was emitted" + type: number + config: + description: "the config items from this connector's spec to update" + type: object + additionalProperties: true ``` -For example, if the currently persisted config file is `{"api_key": 123, start_date: "01-01-2022"}` and the following `AirbyteConfigMessage` is output `{"config": {"api_key": 456}, "emitted_at": }` then the persisted configuration will become `{"api_key": 456, start_date: "01-01-2022"}`. - +For example, if the currently persisted config file is `{"api_key": 123, start_date: "01-01-2022"}` and the following `AirbyteConnectorConfigMessage` is output `{"config": {"api_key": 456}, "emitted_at": }` then the persisted configuration is merged, and will become `{"api_key": 456, start_date: "01-01-2022"}`. # Acknowledgements