Skip to content

Commit

Permalink
AirbyteConfigMessage -> AirbyteConnectorConfigMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
evantahler committed Oct 24, 2022
1 parent 49ec370 commit eaeb260
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 35 deletions.
8 changes: 4 additions & 4 deletions airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Type(Enum):
CONNECTION_STATUS = "CONNECTION_STATUS"
CATALOG = "CATALOG"
TRACE = "TRACE"
CONFIG = "CONFIG"
CONNECTOR_CONFIG = "CONNECTOR_CONFIG"


class AirbyteRecordMessage(BaseModel):
Expand Down Expand Up @@ -98,7 +98,7 @@ class Config:
failure_type: Optional[FailureType] = Field(None, description="The type of error")


class AirbyteConfigMessage(BaseModel):
class AirbyteConnectorConfigMessage(BaseModel):
class Config:
extra = Extra.allow

Expand Down Expand Up @@ -342,9 +342,9 @@ class Config:
None,
description="trace message: a message to communicate information about the status and performance of a connector",
)
config: Optional[AirbyteConfigMessage] = Field(
connectorConfig: Optional[AirbyteConnectorConfigMessage] = Field(
None,
description="config message: a message to communicate an updated configuration from a connector that should be persisted",
description="connector config message: a message to communicate an updated configuration from a connector that should be persisted",
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.FailureReason;
import io.airbyte.config.State;
import io.airbyte.protocol.models.AirbyteConfigMessage;
import io.airbyte.protocol.models.AirbyteConnectorConfigMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
Expand Down Expand Up @@ -107,7 +107,7 @@ public void acceptFromSource(final AirbyteMessage message) {
case TRACE -> handleEmittedTrace(message.getTrace(), ConnectorType.SOURCE);
case RECORD -> handleSourceEmittedRecord(message.getRecord());
case STATE -> handleSourceEmittedState(message.getState());
case CONFIG -> handleEmittedConfig(message.getConfig(), ConnectorType.SOURCE);
case CONNECTOR_CONFIG -> handleEmittedConfig(message.getConnectorConfig(), ConnectorType.SOURCE);
default -> log.warn("Invalid message type for message: {}", message);
}
}
Expand All @@ -119,7 +119,7 @@ public void acceptFromDestination(final AirbyteMessage message) {
switch (message.getType()) {
case TRACE -> handleEmittedTrace(message.getTrace(), ConnectorType.DESTINATION);
case STATE -> handleDestinationEmittedState(message.getState());
case CONFIG -> handleEmittedConfig(message.getConfig(), ConnectorType.DESTINATION);
case CONNECTOR_CONFIG -> handleEmittedConfig(message.getConnectorConfig(), ConnectorType.DESTINATION);
default -> log.warn("Invalid message type for message: {}", message);
}
}
Expand Down Expand Up @@ -218,7 +218,7 @@ private void handleDestinationEmittedState(final AirbyteStateMessage stateMessag
* When a connector needs to update its configuration
*/
@SuppressWarnings("PMD") // until method is implemented
private void handleEmittedConfig(final AirbyteConfigMessage configMessage, final ConnectorType connectorType) {
private void handleEmittedConfig(final AirbyteConnectorConfigMessage configMessage, final ConnectorType connectorType) {
// TODO: Update config here
/**
* Pseudocode: for (key in configMessage.getConfig()) { validateIsReallyConfig(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ definitions:
- CONNECTION_STATUS
- CATALOG
- TRACE
- CONFIG
- CONNECTOR_CONFIG
log:
description: "log message: any kind of logging you want the platform to know about."
"$ref": "#/definitions/AirbyteLogMessage"
Expand All @@ -49,9 +49,9 @@ definitions:
trace:
description: "trace message: a message to communicate information about the status and performance of a connector"
"$ref": "#/definitions/AirbyteTraceMessage"
config:
description: "config message: a message to communicate an updated configuration from a connector that should be persisted"
"$ref": "#/definitions/AirbyteConfigMessage"
connectorConfig:
description: "connector config message: a message to communicate an updated configuration from a connector that should be persisted"
"$ref": "#/definitions/AirbyteConnectorConfigMessage"
AirbyteRecordMessage:
type: object
additionalProperties: true
Expand Down Expand Up @@ -201,7 +201,7 @@ definitions:
enum:
- system_error
- config_error
AirbyteConfigMessage:
AirbyteConnectorConfigMessage:
type: object
additionalProperties: true
required:
Expand Down
43 changes: 21 additions & 22 deletions docs/understanding-airbyte/airbyte-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ Each of these concepts is described in greater depth in their respective section
The Airbyte Protocol is versioned independently of the Airbyte Platform, and the version number is used to determine the compatibility between connectors and the Airbyte Platform.

| Version | Date of Change | Pull Request(s) | Subject |
|:---------|:---------------|:--------------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------|
| `v0.3.1` | 2022-10-12 | [17907](https://github.com/airbytehq/airbyte/pull/17907) | `AirbyteConfigMessage` added |
| :------- | :------------- | :------------------------------------------------------------------------------------------------------------------ | :------------------------------------------------------------------------------- |
| `v0.3.1` | 2022-10-12 | [17907](https://github.com/airbytehq/airbyte/pull/17907) | `AirbyteConnectorConfigMessage` added |
| `v0.3.0` | 2022-09-09 | [16479](https://github.com/airbytehq/airbyte/pull/16479) | `AirbyteLogMessage.stack_trace` added |
| `v0.2.0` | 2022-06-10 | [13573](https://github.com/airbytehq/airbyte/pull/13573) & [12586](https://github.com/airbytehq/airbyte/pull/12586) | `STREAM` and `GLOBAL` STATE messages |
| `v0.1.1` | 2022-06-06 | [13356](https://github.com/airbytehq/airbyte/pull/13356) | Add a namespace in association with the stream name |
Expand Down Expand Up @@ -804,33 +804,32 @@ AirbyteErrorTraceMessage:
- config_error
```

## AirbyteConfigMessage
## AirbyteConnectorConfigMessage

An `AirbyteConfigMessage` allows a connector to update its configuration in the middle of a sync. This is valuable for connectors with short-lived or single-use credentials.
An `AirbyteConnectorConfigMessage` allows a connector to update its configuration in the middle of a sync. This is valuable for connectors with short-lived or single-use credentials.

Emitting this message signals to the orchestrator process (for example, the Airbyte platform) that it should update its persistence layer, replacing the connector's current configuration with the config present in the `.config` field of the message.
Emitting this message signals to the orchestrator process (for example, the Airbyte platform) that it should update its persistence layer, replacing the connector's current configuration with the config present in the `.config` field of the message.

The config in the `AirbyteConfigMessage` must conform to connector's specification's schema, and the orchestrator process is expected to validate this invariant. If the output config does not conform to the specification's schema, the orchestrator process should raise an exception and terminate the sync.
The config in the `AirbyteConnectorConfigMessage` must conform to connector's specification's schema, and the orchestrator process is expected to validate these messages. If the output config does not conform to the specification's schema, the orchestrator process should raise an exception and terminate the sync.

```yaml
AirbyteConfigMessage:
type: object
additionalProperties: true
required:
- config
- emitted_at
properties:
emitted_at:
description: "the time in ms that the message was emitted"
type: number
config:
description: "the config items from this connector's spec to update"
type: object
additionalProperties: true
AirbyteConnectorConfigMessage:
type: object
additionalProperties: true
required:
- config
- emitted_at
properties:
emitted_at:
description: "the time in ms that the message was emitted"
type: number
config:
description: "the config items from this connector's spec to update"
type: object
additionalProperties: true
```

For example, if the currently persisted config file is `{"api_key": 123, start_date: "01-01-2022"}` and the following `AirbyteConfigMessage` is output `{"config": {"api_key": 456}, "emitted_at": <current_time>}` then the persisted configuration will become `{"api_key": 456, start_date: "01-01-2022"}`.

For example, if the currently persisted config file is `{"api_key": 123, start_date: "01-01-2022"}` and the following `AirbyteConnectorConfigMessage` is output `{"config": {"api_key": 456}, "emitted_at": <current_time>}` then the persisted configuration is merged, and will become `{"api_key": 456, start_date: "01-01-2022"}`.

# Acknowledgements

Expand Down

0 comments on commit eaeb260

Please sign in to comment.