Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protocol Change: AirbyteControlMessage.ConnectorConfig #17907

Merged
merged 10 commits into from
Oct 28, 2022
28 changes: 28 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Type(Enum):
CONNECTION_STATUS = "CONNECTION_STATUS"
CATALOG = "CATALOG"
TRACE = "TRACE"
CONTROL = "CONTROL"


class AirbyteRecordMessage(BaseModel):
Expand Down Expand Up @@ -97,6 +98,17 @@ class Config:
failure_type: Optional[FailureType] = Field(None, description="The type of error")


class OrchestratorType(Enum):
CONNECTOR_CONFIG = "CONNECTOR_CONFIG"


class AirbyteControlConnectorConfigMessage(BaseModel):
class Config:
extra = Extra.allow

config: Dict[str, Any] = Field(..., description="the config items from this connector's spec to update")


class Status(Enum):
SUCCEEDED = "SUCCEEDED"
FAILED = "FAILED"
Expand Down Expand Up @@ -203,6 +215,18 @@ class Config:
error: Optional[AirbyteErrorTraceMessage] = Field(None, description="error trace message: the error object")


class AirbyteControlMessage(BaseModel):
class Config:
extra = Extra.allow

type: OrchestratorType = Field(..., description="the type of orchestrator message", title="orchestrator type")
emitted_at: float = Field(..., description="the time in ms that the message was emitted")
connectorConfig: Optional[AirbyteControlConnectorConfigMessage] = Field(
None,
description="connector config orchestrator message: the updated config for the platform to store for this connector",
)


class AirbyteStream(BaseModel):
class Config:
extra = Extra.allow
Expand Down Expand Up @@ -333,6 +357,10 @@ class Config:
None,
description="trace message: a message to communicate information about the status and performance of a connector",
)
control: Optional[AirbyteControlMessage] = Field(
None,
description="connector config message: a message to communicate an updated configuration from a connector that should be persisted",
)


class AirbyteProtocol(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.FailureReason;
import io.airbyte.config.State;
import io.airbyte.protocol.models.AirbyteControlConnectorConfigMessage;
import io.airbyte.protocol.models.AirbyteControlMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
Expand Down Expand Up @@ -110,6 +112,7 @@ public void acceptFromSource(final AirbyteMessage message) {
case TRACE -> handleEmittedTrace(message.getTrace(), ConnectorType.SOURCE);
case RECORD -> handleSourceEmittedRecord(message.getRecord());
case STATE -> handleSourceEmittedState(message.getState());
case CONTROL -> handleEmittedOrchestratorMessage(message.getControl(), ConnectorType.SOURCE);
default -> log.warn("Invalid message type for message: {}", message);
}
}
Expand All @@ -122,6 +125,7 @@ public void acceptFromDestination(final AirbyteMessage message) {
switch (message.getType()) {
case TRACE -> handleEmittedTrace(message.getTrace(), ConnectorType.DESTINATION);
case STATE -> handleDestinationEmittedState(message.getState());
case CONTROL -> handleEmittedOrchestratorMessage(message.getControl(), ConnectorType.DESTINATION);
default -> log.warn("Invalid message type for message: {}", message);
}
}
Expand Down Expand Up @@ -216,6 +220,30 @@ private void handleDestinationEmittedState(final AirbyteStateMessage stateMessag
}
}

/**
* When a connector signals that the platform should update persist an update
*/
private void handleEmittedOrchestratorMessage(final AirbyteControlMessage controlMessage, final ConnectorType connectorType) {
switch (controlMessage.getType()) {
case CONNECTOR_CONFIG -> handleEmittedOrchestratorConnectorConfig(controlMessage.getConnectorConfig(), connectorType);
default -> log.warn("Invalid orchestrator message type for message: {}", controlMessage);
}
}

/**
* When a connector needs to update its configuration
*/
@SuppressWarnings("PMD") // until method is implemented
private void handleEmittedOrchestratorConnectorConfig(final AirbyteControlConnectorConfigMessage configMessage,
final ConnectorType connectorType) {
// TODO: Update config here
/**
* Pseudocode: for (key in configMessage.getConfig()) { validateIsReallyConfig(key);
* persistConfigChange(connectorType, key, configMessage.getConfig().get(key)); // nuance here for
* secret storage or not. May need to be async over API for replication orchestrator }
*/
}

/**
* When a connector emits a trace message, check the type and call the correct function. If it is an
* error trace message, add it to the list of errorTraceMessages for the connector type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
title: AirbyteProtocol
type: object
description: AirbyteProtocol structs
version: 0.3.0
version: 0.3.1
properties:
airbyte_message:
"$ref": "#/definitions/AirbyteMessage"
Expand All @@ -28,6 +28,7 @@ definitions:
- CONNECTION_STATUS
- CATALOG
- TRACE
- CONTROL
log:
description: "log message: any kind of logging you want the platform to know about."
"$ref": "#/definitions/AirbyteLogMessage"
Expand All @@ -48,6 +49,9 @@ definitions:
trace:
description: "trace message: a message to communicate information about the status and performance of a connector"
"$ref": "#/definitions/AirbyteTraceMessage"
control:
description: "connector config message: a message to communicate an updated configuration from a connector that should be persisted"
"$ref": "#/definitions/AirbyteControlMessage"
AirbyteRecordMessage:
type: object
additionalProperties: true
Expand Down Expand Up @@ -197,6 +201,35 @@ definitions:
enum:
- system_error
- config_error
AirbyteControlMessage:
type: object
additionalProperties: true
required:
- type
- emitted_at
properties:
type:
title: orchestrator type
description: "the type of orchestrator message"
type: string
enum:
- CONNECTOR_CONFIG
emitted_at:
description: "the time in ms that the message was emitted"
type: number
connectorConfig:
description: "connector config orchestrator message: the updated config for the platform to store for this connector"
"$ref": "#/definitions/AirbyteControlConnectorConfigMessage"
AirbyteControlConnectorConfigMessage:
type: object
additionalProperties: true
required:
- config
properties:
config:
description: "the config items from this connector's spec to update"
type: object
additionalProperties: true
AirbyteConnectionStatus:
type: object
description: Airbyte connection status
Expand Down
50 changes: 50 additions & 0 deletions docs/understanding-airbyte/airbyte-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The Airbyte Protocol is versioned independently of the Airbyte Platform, and the

| Version | Date of Change | Pull Request(s) | Subject |
| :------- | :------------- | :------------------------------------------------------------------------------------------------------------------ | :------------------------------------------------------------------------------- |
| `v0.3.1` | 2022-10-12 | [17907](https://github.com/airbytehq/airbyte/pull/17907) | `AirbyteControlMessage.ConnectorConfig` added |
| `v0.3.0` | 2022-09-09 | [16479](https://github.com/airbytehq/airbyte/pull/16479) | `AirbyteLogMessage.stack_trace` added |
| `v0.2.0` | 2022-06-10 | [13573](https://github.com/airbytehq/airbyte/pull/13573) & [12586](https://github.com/airbytehq/airbyte/pull/12586) | `STREAM` and `GLOBAL` STATE messages |
| `v0.1.1` | 2022-06-06 | [13356](https://github.com/airbytehq/airbyte/pull/13356) | Add a namespace in association with the stream name |
Expand Down Expand Up @@ -803,6 +804,55 @@ AirbyteErrorTraceMessage:
- config_error
```

## AirbyteControlMessage

An `AirbyteControlMessage` is for connectors to signal to the Airbyte Platform or Orchestrator that an action with a side-effect should be taken. This means that the Orchestrator will likely be altering some stored data about the connector, connection, or sync.

```yaml
AirbyteControlMessage:
type: object
additionalProperties: true
required:
- type
- emitted_at
properties:
type:
title: orchestrator type
description: "the type of orchestrator message"
type: string
enum:
- CONNECTOR_CONFIG
emitted_at:
description: "the time in ms that the message was emitted"
type: number
connectorConfig:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateConnectorConfig ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read our message names as nouns, e.g. the content of this message is a "record", "state", etc. So, this is a connector config, rather than what we are asking the platform to do, "storeState" or "updateConnectorConfig"

description: "connector config orchestrator message: the updated config for the platform to store for this connector"
"$ref": "#/definitions/AirbyteControlConnectorConfigMessage"
```

### AirbyteControlConnectorConfigMessage

`AirbyteControlConnectorConfigMessage` allows a connector to update its configuration in the middle of a sync. This is valuable for connectors with short-lived or single-use credentials.

Emitting this message signals to the orchestrator process that it should update its persistence layer, replacing the connector's current configuration with the config present in the `.config` field of the message.

The config in the `AirbyteControlConnectorConfigMessage` must conform to connector's specification's schema, and the orchestrator process is expected to validate these messages. If the output config does not conform to the specification's schema, the orchestrator process should raise an exception and terminate the sync.

```yaml
AirbyteControlConnectorConfigMessage:
type: object
additionalProperties: true
required:
- config
properties:
config:
description: "the config items from this connector's spec to update"
type: object
additionalProperties: true
```

evantahler marked this conversation as resolved.
Show resolved Hide resolved
For example, if the currently persisted config file is `{"api_key": 123, start_date: "01-01-2022"}` and the following `AirbyteControlConnectorConfigMessage` is output `{type: ORCHESTRATOR, connectorConfig: {"config": {"api_key": 456}, "emitted_at": <current_time>}}` then the persisted configuration is merged, and will become `{"api_key": 456, start_date: "01-01-2022"}`.

# Acknowledgements

We'd like to note that we were initially inspired by Singer.io's [specification](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md#singer-specification) and would like to acknowledge that some of their design choices helped us bootstrap our project. We've since made a lot of modernizations to our protocol and specification, but don't want to forget the tools that helped us get started.