Skip to content

Commit

Permalink
AirbyteOrchestratorMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
evantahler committed Oct 24, 2022
1 parent eaeb260 commit f3ef6c5
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 13 deletions.
23 changes: 19 additions & 4 deletions airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Type(Enum):
CONNECTION_STATUS = "CONNECTION_STATUS"
CATALOG = "CATALOG"
TRACE = "TRACE"
CONNECTOR_CONFIG = "CONNECTOR_CONFIG"
ORCHESTRATOR = "ORCHESTRATOR"


class AirbyteRecordMessage(BaseModel):
Expand Down Expand Up @@ -98,11 +98,14 @@ class Config:
failure_type: Optional[FailureType] = Field(None, description="The type of error")


class AirbyteConnectorConfigMessage(BaseModel):
class OrchestratorType(Enum):
CONNECTOR_CONFIG = "CONNECTOR_CONFIG"


class AirbyteOrchestratorConnectorConfigMessage(BaseModel):
class Config:
extra = Extra.allow

emitted_at: float = Field(..., description="the time in ms that the message was emitted")
config: Dict[str, Any] = Field(..., description="the config items from this connector's spec to update")


Expand Down Expand Up @@ -212,6 +215,18 @@ class Config:
error: Optional[AirbyteErrorTraceMessage] = Field(None, description="error trace message: the error object")


class AirbyteOrchestratorMessage(BaseModel):
class Config:
extra = Extra.allow

type: OrchestratorType = Field(..., description="the type of orchestrator message", title="orchestrator type")
emitted_at: float = Field(..., description="the time in ms that the message was emitted")
connectorConfig: Optional[AirbyteOrchestratorConnectorConfigMessage] = Field(
None,
description="connector config orchestrator message: the updated config for the platform to store for this connector",
)


class AirbyteStream(BaseModel):
class Config:
extra = Extra.allow
Expand Down Expand Up @@ -342,7 +357,7 @@ class Config:
None,
description="trace message: a message to communicate information about the status and performance of a connector",
)
connectorConfig: Optional[AirbyteConnectorConfigMessage] = Field(
orchestrator: Optional[AirbyteOrchestratorMessage] = Field(
None,
description="connector config message: a message to communicate an updated configuration from a connector that should be persisted",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.FailureReason;
import io.airbyte.config.State;
import io.airbyte.protocol.models.AirbyteConnectorConfigMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteOrchestratorConnectorConfigMessage;
import io.airbyte.protocol.models.AirbyteOrchestratorMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
Expand Down Expand Up @@ -107,7 +108,7 @@ public void acceptFromSource(final AirbyteMessage message) {
case TRACE -> handleEmittedTrace(message.getTrace(), ConnectorType.SOURCE);
case RECORD -> handleSourceEmittedRecord(message.getRecord());
case STATE -> handleSourceEmittedState(message.getState());
case CONNECTOR_CONFIG -> handleEmittedConfig(message.getConnectorConfig(), ConnectorType.SOURCE);
case ORCHESTRATOR -> handleEmittedOrchestratorMessage(message.getOrchestrator(), ConnectorType.SOURCE);
default -> log.warn("Invalid message type for message: {}", message);
}
}
Expand All @@ -119,7 +120,7 @@ public void acceptFromDestination(final AirbyteMessage message) {
switch (message.getType()) {
case TRACE -> handleEmittedTrace(message.getTrace(), ConnectorType.DESTINATION);
case STATE -> handleDestinationEmittedState(message.getState());
case CONNECTOR_CONFIG -> handleEmittedConfig(message.getConnectorConfig(), ConnectorType.DESTINATION);
case ORCHESTRATOR -> handleEmittedOrchestratorMessage(message.getOrchestrator(), ConnectorType.DESTINATION);
default -> log.warn("Invalid message type for message: {}", message);
}
}
Expand Down Expand Up @@ -214,11 +215,22 @@ private void handleDestinationEmittedState(final AirbyteStateMessage stateMessag
}
}

/**
* When a connector signals that the platform should update persist an update
*/
private void handleEmittedOrchestratorMessage(final AirbyteOrchestratorMessage orchestratorMessage, final ConnectorType connectorType) {
switch (orchestratorMessage.getType()) {
case CONNECTOR_CONFIG -> handleEmittedOrchestratorConnectorConfig(orchestratorMessage.getConnectorConfig(), connectorType);
default -> log.warn("Invalid orchestrator message type for message: {}", orchestratorMessage);
}
}

/**
* When a connector needs to update its configuration
*/
@SuppressWarnings("PMD") // until method is implemented
private void handleEmittedConfig(final AirbyteConnectorConfigMessage configMessage, final ConnectorType connectorType) {
private void handleEmittedOrchestratorConnectorConfig(final AirbyteOrchestratorConnectorConfigMessage orchestratorMessage,
final ConnectorType connectorType) {
// TODO: Update config here
/**
* Pseudocode: for (key in configMessage.getConfig()) { validateIsReallyConfig(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ definitions:
- CONNECTION_STATUS
- CATALOG
- TRACE
- CONNECTOR_CONFIG
- ORCHESTRATOR
log:
description: "log message: any kind of logging you want the platform to know about."
"$ref": "#/definitions/AirbyteLogMessage"
Expand All @@ -49,9 +49,9 @@ definitions:
trace:
description: "trace message: a message to communicate information about the status and performance of a connector"
"$ref": "#/definitions/AirbyteTraceMessage"
connectorConfig:
orchestrator:
description: "connector config message: a message to communicate an updated configuration from a connector that should be persisted"
"$ref": "#/definitions/AirbyteConnectorConfigMessage"
"$ref": "#/definitions/AirbyteOrchestratorMessage"
AirbyteRecordMessage:
type: object
additionalProperties: true
Expand Down Expand Up @@ -201,16 +201,31 @@ definitions:
enum:
- system_error
- config_error
AirbyteConnectorConfigMessage:
AirbyteOrchestratorMessage:
type: object
additionalProperties: true
required:
- config
- type
- emitted_at
properties:
type:
title: orchestrator type
description: "the type of orchestrator message"
type: string
enum:
- CONNECTOR_CONFIG
emitted_at:
description: "the time in ms that the message was emitted"
type: number
connectorConfig:
description: "connector config orchestrator message: the updated config for the platform to store for this connector"
"$ref": "#/definitions/AirbyteOrchestratorConnectorConfigMessage"
AirbyteOrchestratorConnectorConfigMessage:
type: object
additionalProperties: true
required:
- config
properties:
config:
description: "the config items from this connector's spec to update"
type: object
Expand Down

0 comments on commit f3ef6c5

Please sign in to comment.