Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

return better structured error logs to connector builder #46963

Merged
merged 8 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,16 @@ def get_message_groups(
log_messages.append(LogMessage(**{"message": message_group.message, "level": message_group.level.value}))
elif isinstance(message_group, AirbyteTraceMessage):
if message_group.type == TraceType.ERROR:
error_message = f"{message_group.error.message} - {message_group.error.stack_trace}"
log_messages.append(LogMessage(**{"message": error_message, "level": "ERROR"}))
log_messages.append(
LogMessage(
**{
"message": message_group.error.message,
"level": "ERROR",
"internal_message": message_group.error.internal_message,
"stacktrace": message_group.error.stack_trace,
}
)
)
elif isinstance(message_group, AirbyteControlMessage):
if not latest_config_update or latest_config_update.emitted_at <= message_group.emitted_at:
latest_config_update = message_group
Expand Down Expand Up @@ -300,6 +308,17 @@ def _read_stream(
# iterate over the generated messages. if next raise an exception, catch it and yield it as an AirbyteLogMessage
try:
yield from AirbyteEntrypoint(source).read(source.spec(self.logger), config, configured_catalog, state)
except AirbyteTracedException as traced_exception:
# Look for this message which indicates that it is the "final exception" raised by AbstractSource.
# If it matches, don't yield this as we don't need to show this in the Builder.
# This is somewhat brittle as it relies on the message string, but if they drift then the worst case
# is that this message will be shown in the Builder.
if (
traced_exception.message is not None
and "During the sync, the following streams did not sync successfully" in traced_exception.message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ Can we modify AbstractSource to indicate its a "Final Exception"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bnchrch Do you have a specific approach in mind? We could set internal_message to something for example to indicate this but this would still rely on comparing strings in that approach.

I'm wary to raise a different exception in abstract_source here because I'm not sure what relies on this being an AirbyteTracedException and I don't feel familiar enough with that fairly central CDK code to know what is safe to change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also add that any raised AirbyteTracedException is sort of already implied to be the final exception. Within abstract_source.py, https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py#L171-L177 , during a read of a stream always try to catch any form of Exception or AirbyteTracedException thrown by a stream and wrap it into an emitted trace message. We only formally raise the "final" AirbyteTracedException because we need to end the process with a non-zero error code.

Granted the small gap is if we get an exception outside of that try/except block. But from my perspective, us trying to adjust the protocol or create new abstractions to capture what is effectively a work around feels like we're trying to over engineer a bit. The ideal design is that the platform can identify that merely receiving any AirbyteTracedException is enough to know that the sync was unsuccessful. And then we could get rid of this work around.

So TLDR: I think how we do it here is fine.

):
return
yield traced_exception.as_airbyte_message()
except Exception as e:
error_message = f"{e.args[0] if len(e.args) > 0 else str(e)}"
yield AirbyteTracedException.from_exception(e, message=error_message).as_airbyte_message()
Expand Down
2 changes: 2 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/connector_builder/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class StreamReadSlices:
class LogMessage:
message: str
level: str
internal_message: Optional[str] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trade off here is that since this does not map to the protocol error message model, when we move to using source-declarative-manifest container as manifest runner for Builder, we'd need to make sure that the protocol has these fields, otherwise error reporting will break.

Should we make the work and align these models in this file with protocol-level models now instead? Or ship the small improvement and record a todo for later?
/cc @bnchrch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a good callout Natik!

Looking into it, this does already align with the protocol model AirbyteTraceMessage

https://github.com/airbytehq/airbyte-protocol/blob/e24ee151574a7f0f4e997b90063d7c9ffed2b158/protocol-models/src/main/resources/airbyte_protocol/v0/airbyte_protocol.yaml#L256

@lmossman can you confirm Im reading this as this is already a protocol blessed type?

If so should LogMessage become a union type of AirbyteTraceMessage and AirbyteLogMessage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trade off here is that since this does not map to the protocol error message model, when we move to using source-declarative-manifest container as manifest runner for Builder, we'd need to make sure that the protocol has these fields, otherwise error reporting will break.

This LogMessage class is only used in the message_grouper logic which wraps a call to the read protocol method call.

Even if we swap out the runtime to use source-declarative-manifest to execute the read, we will still need all of this extra message_grouper logic around it to create the StreamRead output object that the connector builder server expects, and it shouldn't need to be changed because the get_message_groups function does operate on the airbyte protocol already, and is just normalizing those airbyte protocol messages into a form that is easier understood by the connector builder server.

If we don't want to directly execute any python anymore at all and therefore delete this message_grouper logic, then the connector builder server will require a bunch of other changes anyway to do similar logic to what that python code is doing now.

So, I think I'd prefer to keep this PR simple right now, especially because I don't think we can easily reference the protocol AirbyteLogMessage and AirbyteTraceMessage in our connector-builder-server and airbyte-server openapi specs, and I'd rather not repeat those entire schemas as they are more complicated than the LogMessage schema I've defined here.

But let me know if anything I said above sounds off @natikgadzhi @bnchrch !

stacktrace: Optional[str] = None


@dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,7 @@ def parse_response_error_message(self, response: requests.Response) -> Optional[
body = response.json()
return self._try_get_error(body)
except requests.exceptions.JSONDecodeError:
return None
try:
return response.content.decode("utf-8")
except Exception:
return None
Comment on lines +48 to +51
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change allows this method to also properly handle response bodies which are just string values, since that currently causes a JSONDecodeError

Loading