Skip to content

Commit

Permalink
Normalization logs: remove json parse warnings (#34978)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Feb 20, 2024
1 parent 95b05a8 commit 2a369e8
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 7 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.21.2 | 2024-02-20 | [\#34978](https://github.com/airbytehq/airbyte/pull/34978) | Reduce log noise in NormalizationLogParser. |
| 0.21.1 | 2024-02-20 | [\#35199](https://github.com/airbytehq/airbyte/pull/35199) | Add thread names to the logs. |
| 0.21.0 | 2024-02-16 | [\#35314](https://github.com/airbytehq/airbyte/pull/35314) | Delete S3StreamCopier classes. These have been superseded by the async destinations framework. |
| 0.20.9 | 2024-02-15 | [\#35240](https://github.com/airbytehq/airbyte/pull/35240) | Make state emission to platform inside state manager itself. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Stream<AirbyteMessage> toMessages(final String line) {
if (Strings.isEmpty(line)) {
return Stream.of(logMessage(Level.INFO, ""));
}
final Optional<JsonNode> json = Jsons.tryDeserialize(line);
final Optional<JsonNode> json = Jsons.tryDeserializeWithoutWarn(line);
if (json.isPresent()) {
return jsonToMessage(json.get());
} else {
Expand Down Expand Up @@ -96,7 +96,7 @@ private Stream<AirbyteMessage> jsonToMessage(final JsonNode jsonLine) {
*/
final String logLevel = (jsonLine.hasNonNull("level")) ? jsonLine.get("level").asText() : "";
String logMsg = jsonLine.hasNonNull("msg") ? jsonLine.get("msg").asText() : "";
Level level;
final Level level;
switch (logLevel) {
case "debug" -> level = Level.DEBUG;
case "info" -> level = Level.INFO;
Expand All @@ -117,15 +117,15 @@ private Stream<AirbyteMessage> jsonToMessage(final JsonNode jsonLine) {
}
}

private static AirbyteMessage logMessage(Level level, String message) {
private static AirbyteMessage logMessage(final Level level, final String message) {
return new AirbyteMessage()
.withType(Type.LOG)
.withLog(new AirbyteLogMessage()
.withLevel(level)
.withMessage(message));
}

public static void main(String[] args) {
public static void main(final String[] args) {
final NormalizationLogParser normalizationLogParser = new NormalizationLogParser();
final Stream<AirbyteMessage> airbyteMessageStream =
normalizationLogParser.create(new BufferedReader(new InputStreamReader(System.in, StandardCharsets.UTF_8)));
Expand All @@ -135,8 +135,8 @@ public static void main(String[] args) {
final String dbtErrorStack = String.join("\n", errors);
if (!"".equals(dbtErrorStack)) {
final Map<ErrorMapKeys, String> errorMap = SentryExceptionHelper.getUsefulErrorMessageAndTypeFromDbtError(dbtErrorStack);
String internalMessage = errorMap.get(ErrorMapKeys.ERROR_MAP_MESSAGE_KEY);
AirbyteMessage traceMessage = new AirbyteMessage()
final String internalMessage = errorMap.get(ErrorMapKeys.ERROR_MAP_MESSAGE_KEY);
final AirbyteMessage traceMessage = new AirbyteMessage()
.withType(Type.TRACE)
.withTrace(new AirbyteTraceMessage()
.withType(AirbyteTraceMessage.Type.ERROR)
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.21.1
version=0.21.2

0 comments on commit 2a369e8

Please sign in to comment.