From cd613641761034e9841ab37f3511431404536323 Mon Sep 17 00:00:00 2001 From: terencecho Date: Mon, 9 May 2022 10:28:49 -0400 Subject: [PATCH] Use different CustomerIO templates and edit slack notifs (#12674) * Use different CustomerIO templates and edit slack notifs * fix unit test * fix file naming and comments --- ...java => CustomerioNotificationClient.java} | 54 ++++++++++--------- .../notification/NotificationClient.java | 15 ++++-- .../notification/SlackNotificationClient.java | 32 +++++------ .../auto_disable_notification_template.json | 11 ++-- ...disable_warning_notification_template.json | 19 ------- .../customerio/default_template.json | 17 ++++++ ...to_disable_slack_notification_template.txt | 5 +- ...le_warning_slack_notification_template.txt | 5 +- .../CustomerioNotificationClientTest.java | 6 ++- .../SlackNotificationClientTest.java | 26 ++++++--- .../scheduler/persistence/JobNotifier.java | 4 +- 11 files changed, 110 insertions(+), 84 deletions(-) rename airbyte-notification/src/main/java/io/airbyte/notification/{CustomeriolNotificationClient.java => CustomerioNotificationClient.java} (70%) delete mode 100644 airbyte-notification/src/main/resources/customerio/auto_disable_warning_notification_template.json create mode 100644 airbyte-notification/src/main/resources/customerio/default_template.json diff --git a/airbyte-notification/src/main/java/io/airbyte/notification/CustomeriolNotificationClient.java b/airbyte-notification/src/main/java/io/airbyte/notification/CustomerioNotificationClient.java similarity index 70% rename from airbyte-notification/src/main/java/io/airbyte/notification/CustomeriolNotificationClient.java rename to airbyte-notification/src/main/java/io/airbyte/notification/CustomerioNotificationClient.java index 1d7ca81bd5f4..8e966d9766a0 100644 --- a/airbyte-notification/src/main/java/io/airbyte/notification/CustomeriolNotificationClient.java +++ b/airbyte-notification/src/main/java/io/airbyte/notification/CustomerioNotificationClient.java @@ -12,35 +12,37 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.util.UUID; import org.apache.commons.lang3.NotImplementedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Notification client that uses customer.io API send emails. + * + * These notifications rely on `TRANSACTION_MESSAGE_ID`, which are basically templates you create + * through customer.io. These IDs are specific to a user's account on customer.io, so they will be + * different for every user. For now they are stored as variables here, but in the future they may + * be stored in as a notification config in the database. + * + * For Airbyte Cloud, Airbyte engineers may use `DEFAULT_TRANSACTION_MESSAGE_ID = "6"` as a generic + * template for notifications. */ -public class CustomeriolNotificationClient extends NotificationClient { +public class CustomerioNotificationClient extends NotificationClient { - private static final Logger LOGGER = LoggerFactory.getLogger(CustomeriolNotificationClient.class); + private static final Logger LOGGER = LoggerFactory.getLogger(CustomerioNotificationClient.class); - // Once the configs are editable through the UI, these should be stored in - // airbyte-config/models/src/main/resources/types/CustomerioNotificationConfiguration.yaml - // - SENDER_EMAIL - // - receiver email - // - customer.io identifier email - // - customer.io TRANSACTION_MESSAGE_ID - private static final String SENDER_EMAIL = "Airbyte Notification "; - private static final String TRANSACTION_MESSAGE_ID = "6"; + private static final String AUTO_DISABLE_TRANSACTION_MESSAGE_ID = "7"; + private static final String AUTO_DISABLE_WARNING_TRANSACTION_MESSAGE_ID = "8"; private static final String CUSTOMERIO_EMAIL_API_ENDPOINT = "https://api.customer.io/v1/send/email"; private static final String AUTO_DISABLE_NOTIFICATION_TEMPLATE_PATH = "customerio/auto_disable_notification_template.json"; - private static final String AUTO_DISABLE_WARNING_NOTIFICATION_TEMPLATE_PATH = "customerio/auto_disable_warning_notification_template.json"; private final HttpClient httpClient; private final String apiToken; private final String emailApiEndpoint; - public CustomeriolNotificationClient(final Notification notification) { + public CustomerioNotificationClient(final Notification notification) { super(notification); this.apiToken = System.getenv("CUSTOMERIO_API_KEY"); this.emailApiEndpoint = CUSTOMERIO_EMAIL_API_ENDPOINT; @@ -50,10 +52,10 @@ public CustomeriolNotificationClient(final Notification notification) { } @VisibleForTesting - public CustomeriolNotificationClient(final Notification notification, - final String apiToken, - final String emailApiEndpoint, - final HttpClient httpClient) { + public CustomerioNotificationClient(final Notification notification, + final String apiToken, + final String emailApiEndpoint, + final HttpClient httpClient) { super(notification); this.apiToken = apiToken; this.emailApiEndpoint = emailApiEndpoint; @@ -72,28 +74,32 @@ public boolean notifyJobSuccess(final String sourceConnector, final String desti throw new NotImplementedException(); } + // Once the configs are editable through the UI, the reciever email should be stored in + // airbyte-config/models/src/main/resources/types/CustomerioNotificationConfiguration.yaml + // instead of being passed in @Override public boolean notifyConnectionDisabled(final String receiverEmail, final String sourceConnector, final String destinationConnector, final String jobDescription, - final String logUrl) + final UUID workspaceId, + final UUID connectionId) throws IOException, InterruptedException { - final String requestBody = renderTemplate(AUTO_DISABLE_NOTIFICATION_TEMPLATE_PATH, TRANSACTION_MESSAGE_ID, SENDER_EMAIL, receiverEmail, - receiverEmail, sourceConnector, destinationConnector, jobDescription, logUrl); + final String requestBody = renderTemplate(AUTO_DISABLE_NOTIFICATION_TEMPLATE_PATH, AUTO_DISABLE_TRANSACTION_MESSAGE_ID, receiverEmail, + receiverEmail, sourceConnector, destinationConnector, jobDescription, workspaceId.toString(), connectionId.toString()); return notifyByEmail(requestBody); } @Override - public boolean notifyConnectionDisableWarning( - final String receiverEmail, + public boolean notifyConnectionDisableWarning(final String receiverEmail, final String sourceConnector, final String destinationConnector, final String jobDescription, - final String logUrl) + final UUID workspaceId, + final UUID connectionId) throws IOException, InterruptedException { - final String requestBody = renderTemplate(AUTO_DISABLE_WARNING_NOTIFICATION_TEMPLATE_PATH, TRANSACTION_MESSAGE_ID, SENDER_EMAIL, receiverEmail, - receiverEmail, sourceConnector, destinationConnector, jobDescription, logUrl); + final String requestBody = renderTemplate(AUTO_DISABLE_NOTIFICATION_TEMPLATE_PATH, AUTO_DISABLE_WARNING_TRANSACTION_MESSAGE_ID, receiverEmail, + receiverEmail, sourceConnector, destinationConnector, jobDescription, workspaceId.toString(), connectionId.toString()); return notifyByEmail(requestBody); } diff --git a/airbyte-notification/src/main/java/io/airbyte/notification/NotificationClient.java b/airbyte-notification/src/main/java/io/airbyte/notification/NotificationClient.java index 65bd08b67c8f..02b7465dc37d 100644 --- a/airbyte-notification/src/main/java/io/airbyte/notification/NotificationClient.java +++ b/airbyte-notification/src/main/java/io/airbyte/notification/NotificationClient.java @@ -4,8 +4,10 @@ package io.airbyte.notification; +import io.airbyte.commons.resources.MoreResources; import io.airbyte.config.Notification; import java.io.IOException; +import java.util.UUID; public abstract class NotificationClient { @@ -35,14 +37,16 @@ public abstract boolean notifyConnectionDisabled(String receiverEmail, String sourceConnector, String destinationConnector, String jobDescription, - String logUrl) + UUID workspaceId, + UUID connectionId) throws IOException, InterruptedException; public abstract boolean notifyConnectionDisableWarning(String receiverEmail, String sourceConnector, String destinationConnector, String jobDescription, - String logUrl) + UUID workspaceId, + UUID connectionId) throws IOException, InterruptedException; public abstract boolean notifySuccess(String message) throws IOException, InterruptedException; @@ -52,9 +56,14 @@ public abstract boolean notifyConnectionDisableWarning(String receiverEmail, public static NotificationClient createNotificationClient(final Notification notification) { return switch (notification.getNotificationType()) { case SLACK -> new SlackNotificationClient(notification); - case CUSTOMERIO -> new CustomeriolNotificationClient(notification); + case CUSTOMERIO -> new CustomerioNotificationClient(notification); default -> throw new IllegalArgumentException("Unknown notification type:" + notification.getNotificationType()); }; } + String renderTemplate(final String templateFile, final String... data) throws IOException { + final String template = MoreResources.readResource(templateFile); + return String.format(template, data); + } + } diff --git a/airbyte-notification/src/main/java/io/airbyte/notification/SlackNotificationClient.java b/airbyte-notification/src/main/java/io/airbyte/notification/SlackNotificationClient.java index 9f4837b646e8..9a48fda4c2ed 100644 --- a/airbyte-notification/src/main/java/io/airbyte/notification/SlackNotificationClient.java +++ b/airbyte-notification/src/main/java/io/airbyte/notification/SlackNotificationClient.java @@ -7,7 +7,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.resources.MoreResources; import io.airbyte.config.Notification; import io.airbyte.config.SlackNotificationConfiguration; import java.io.IOException; @@ -15,6 +14,7 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.util.UUID; import org.apache.logging.log4j.util.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +46,7 @@ public SlackNotificationClient(final Notification notification) { @Override public boolean notifyJobFailure(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl) throws IOException, InterruptedException { - return notifyFailure(renderJobData( + return notifyFailure(renderTemplate( "slack/failure_slack_notification_template.txt", sourceConnector, destinationConnector, @@ -57,7 +57,7 @@ public boolean notifyJobFailure(final String sourceConnector, final String desti @Override public boolean notifyJobSuccess(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl) throws IOException, InterruptedException { - return notifySuccess(renderJobData( + return notifySuccess(renderTemplate( "slack/success_slack_notification_template.txt", sourceConnector, destinationConnector, @@ -70,14 +70,16 @@ public boolean notifyConnectionDisabled(final String receiverEmail, final String sourceConnector, final String destinationConnector, final String jobDescription, - final String logUrl) + final UUID workspaceId, + final UUID connectionId) throws IOException, InterruptedException { - final String message = renderJobData( + final String message = renderTemplate( "slack/auto_disable_slack_notification_template.txt", sourceConnector, destinationConnector, jobDescription, - logUrl); + workspaceId.toString(), + connectionId.toString()); final String webhookUrl = config.getWebhook(); if (!Strings.isEmpty(webhookUrl)) { @@ -91,14 +93,16 @@ public boolean notifyConnectionDisableWarning(final String receiverEmail, final String sourceConnector, final String destinationConnector, final String jobDescription, - final String logUrl) + final UUID workspaceId, + final UUID connectionId) throws IOException, InterruptedException { - final String message = renderJobData( + final String message = renderTemplate( "slack/auto_disable_warning_slack_notification_template.txt", sourceConnector, destinationConnector, jobDescription, - logUrl); + workspaceId.toString(), + connectionId.toString()); final String webhookUrl = config.getWebhook(); if (!Strings.isEmpty(webhookUrl)) { @@ -107,16 +111,6 @@ public boolean notifyConnectionDisableWarning(final String receiverEmail, return false; } - private String renderJobData(final String templateFile, - final String sourceConnector, - final String destinationConnector, - final String jobDescription, - final String logUrl) - throws IOException { - final String template = MoreResources.readResource(templateFile); - return String.format(template, sourceConnector, destinationConnector, jobDescription, logUrl); - } - private boolean notify(final String message) throws IOException, InterruptedException { final ImmutableMap body = new Builder() .put("text", message) diff --git a/airbyte-notification/src/main/resources/customerio/auto_disable_notification_template.json b/airbyte-notification/src/main/resources/customerio/auto_disable_notification_template.json index 545d0b737f72..7cfd0bee4071 100644 --- a/airbyte-notification/src/main/resources/customerio/auto_disable_notification_template.json +++ b/airbyte-notification/src/main/resources/customerio/auto_disable_notification_template.json @@ -1,19 +1,20 @@ { "transactional_message_id": "%s", - "from": "%s", - "subject": "Automatic Notification: Your Airbyte connection has been disabled", "to": "%s", "identifiers": { "email": "%s" }, "message_data": { - "email_title": "Automatic Notification: Connection Disabled", - "email_body": "Your connection from %s to %s was automatically disabled because it failed 100 times consecutively or has been failing for 14 days in a row.

Please address the failing issues to ensure your syncs continue to run. The most recent attempted %s You can access its logs here: %s.

If you need help with resolving your connection, reach out to Support in-app or by emailing cloud-support@airbyte.io." + "source": "%s", + "destination": "%s", + "job_description": "%s", + "connection_id": "%s", + "workspace_id": "%s" }, "disable_message_retention": false, "send_to_unsubscribed": true, - "tracked": true, + "tracked": false, "queue_draft": false, "disable_css_preprocessing": true } diff --git a/airbyte-notification/src/main/resources/customerio/auto_disable_warning_notification_template.json b/airbyte-notification/src/main/resources/customerio/auto_disable_warning_notification_template.json deleted file mode 100644 index bdfde3a9e702..000000000000 --- a/airbyte-notification/src/main/resources/customerio/auto_disable_warning_notification_template.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "transactional_message_id": "%s", - "from": "%s", - "subject": "Warning: Your Airbyte connection will be disabled", - "to": "%s", - "identifiers": { - "email": "%s" - }, - "message_data": { - "email_title": "Warning: Connection Failing", - "email_body": "Your connection from %s to %s is scheduled to be automatically disabled because it either failed 50 times consecutively or there were only failed jobs in the past 7 days. Once it has failed 100 times consecutively or has been failing for 14 days in a row, the connection will be automatically disabled.

Please address the failing issues to ensure your syncs continue to run. The most recent attempted %s You can access its logs here: %s.

If you need help with resolving your connection, reach out to Support in-app or by emailing cloud-support@airbyte.io." - }, - - "disable_message_retention": false, - "send_to_unsubscribed": true, - "tracked": true, - "queue_draft": false, - "disable_css_preprocessing": true -} diff --git a/airbyte-notification/src/main/resources/customerio/default_template.json b/airbyte-notification/src/main/resources/customerio/default_template.json new file mode 100644 index 000000000000..d394666cd763 --- /dev/null +++ b/airbyte-notification/src/main/resources/customerio/default_template.json @@ -0,0 +1,17 @@ +{ + "transactional_message_id": "%s", + "to": "%s", + "identifiers": { + "email": "%s" + }, + "message_data": { + "email_title": "%s", + "email_body": "%s" + }, + + "disable_message_retention": false, + "send_to_unsubscribed": true, + "tracked": true, + "queue_draft": false, + "disable_css_preprocessing": true +} diff --git a/airbyte-notification/src/main/resources/slack/auto_disable_slack_notification_template.txt b/airbyte-notification/src/main/resources/slack/auto_disable_slack_notification_template.txt index 011308b714a2..132d03bbf6be 100644 --- a/airbyte-notification/src/main/resources/slack/auto_disable_slack_notification_template.txt +++ b/airbyte-notification/src/main/resources/slack/auto_disable_slack_notification_template.txt @@ -1,3 +1,6 @@ Your connection from %s to %s was automatically disabled because it failed 100 times consecutively or has been failing for 14 days in a row. -Please address the failing issues to ensure your syncs continue to run. The most recent attempted %s You can access its logs here: %s. +Please address the failing issues to ensure your syncs continue to run. The most recent attempted %s + +Workspace ID: %s +Connection ID: %s diff --git a/airbyte-notification/src/main/resources/slack/auto_disable_warning_slack_notification_template.txt b/airbyte-notification/src/main/resources/slack/auto_disable_warning_slack_notification_template.txt index e2a1ce8da6d6..ad325bfaf8f7 100644 --- a/airbyte-notification/src/main/resources/slack/auto_disable_warning_slack_notification_template.txt +++ b/airbyte-notification/src/main/resources/slack/auto_disable_warning_slack_notification_template.txt @@ -1,3 +1,6 @@ Your connection from %s to %s is scheduled to be automatically disabled because it either failed 50 times consecutively or there were only failed jobs in the past 7 days. Once it has failed 100 times consecutively or has been failing for 14 days in a row, the connection will be automatically disabled. -Please address the failing issues to ensure your syncs continue to run. The most recent attempted %s You can access its logs here: %s. +Please address the failing issues to ensure your syncs continue to run. The most recent attempted %s + +Workspace ID: %s +Connection ID: %s diff --git a/airbyte-notification/src/test/java/io/airbyte/notification/CustomerioNotificationClientTest.java b/airbyte-notification/src/test/java/io/airbyte/notification/CustomerioNotificationClientTest.java index d6f342ee5c95..f01cee0e7367 100644 --- a/airbyte-notification/src/test/java/io/airbyte/notification/CustomerioNotificationClientTest.java +++ b/airbyte-notification/src/test/java/io/airbyte/notification/CustomerioNotificationClientTest.java @@ -26,6 +26,7 @@ class CustomerioNotificationClientTest { private static final String API_KEY = "api-key"; private static final String URI_BASE = "https://customer.io"; private static final UUID WORKSPACE_ID = UUID.randomUUID(); + private static final UUID CONNECTION_ID = UUID.randomUUID(); private static final StandardWorkspace WORKSPACE = new StandardWorkspace() .withWorkspaceId(WORKSPACE_ID) .withName("workspace-name") @@ -45,7 +46,7 @@ void setUp() { // this test does _not_ check the body of the request. @Test void notifyConnectionDisabled() throws IOException, InterruptedException { - final CustomeriolNotificationClient customeriolNotificationClient = new CustomeriolNotificationClient(new Notification() + final CustomerioNotificationClient customerioNotificationClient = new CustomerioNotificationClient(new Notification() .withNotificationType(NotificationType.CUSTOMERIO), API_KEY, URI_BASE, mHttpClient); final HttpRequest expectedRequest = HttpRequest.newBuilder() @@ -60,7 +61,8 @@ void notifyConnectionDisabled() throws IOException, InterruptedException { Mockito.when(httpResponse.statusCode()).thenReturn(200); final boolean result = - customeriolNotificationClient.notifyConnectionDisabled(WORKSPACE.getEmail(), RANDOM_INPUT, RANDOM_INPUT, RANDOM_INPUT, RANDOM_INPUT); + customerioNotificationClient.notifyConnectionDisabled(WORKSPACE.getEmail(), RANDOM_INPUT, RANDOM_INPUT, RANDOM_INPUT, WORKSPACE_ID, + CONNECTION_ID); Mockito.verify(mHttpClient).send(expectedRequest, HttpResponse.BodyHandlers.ofString()); assertTrue(result); diff --git a/airbyte-notification/src/test/java/io/airbyte/notification/SlackNotificationClientTest.java b/airbyte-notification/src/test/java/io/airbyte/notification/SlackNotificationClientTest.java index bbdeb356c652..072a53eb48b2 100644 --- a/airbyte-notification/src/test/java/io/airbyte/notification/SlackNotificationClientTest.java +++ b/airbyte-notification/src/test/java/io/airbyte/notification/SlackNotificationClientTest.java @@ -33,6 +33,8 @@ public class SlackNotificationClientTest { private static final Logger LOGGER = LoggerFactory.getLogger(SlackNotificationClientTest.class); + private static final UUID WORKSPACE_ID = UUID.randomUUID(); + private static final UUID CONNECTION_ID = UUID.randomUUID(); public static final String WEBHOOK_URL = "http://localhost:"; private static final String EXPECTED_FAIL_MESSAGE = "Your connection from source-test to destination-test just failed...\n" @@ -120,12 +122,16 @@ void testNotifyJobSuccess() throws IOException, InterruptedException { @Test void testNotifyConnectionDisabled() throws IOException, InterruptedException { - final String expectedNotificationMessage = + final String expectedNotificationMessage = String.format( """ Your connection from source-test to destination-test was automatically disabled because it failed 100 times consecutively or has been failing for 14 days in a row. - Please address the failing issues to ensure your syncs continue to run. The most recent attempted job description You can access its logs here: logUrl. - """; + Please address the failing issues to ensure your syncs continue to run. The most recent attempted job description. + + Workspace ID: %s + Connection ID: %s + """, + WORKSPACE_ID, CONNECTION_ID); server.createContext("/test", new ServerHandler(expectedNotificationMessage)); final SlackNotificationClient client = @@ -133,17 +139,21 @@ void testNotifyConnectionDisabled() throws IOException, InterruptedException { .withNotificationType(NotificationType.SLACK) .withSendOnSuccess(true) .withSlackConfiguration(new SlackNotificationConfiguration().withWebhook(WEBHOOK_URL + server.getAddress().getPort() + "/test"))); - assertTrue(client.notifyConnectionDisabled("", "source-test", "destination-test", "job description", "logUrl")); + assertTrue(client.notifyConnectionDisabled("", "source-test", "destination-test", "job description.", WORKSPACE_ID, CONNECTION_ID)); } @Test void testNotifyConnectionDisabledWarning() throws IOException, InterruptedException { - final String expectedNotificationWarningMessage = + final String expectedNotificationWarningMessage = String.format( """ Your connection from source-test to destination-test is scheduled to be automatically disabled because it either failed 50 times consecutively or there were only failed jobs in the past 7 days. Once it has failed 100 times consecutively or has been failing for 14 days in a row, the connection will be automatically disabled. - Please address the failing issues to ensure your syncs continue to run. The most recent attempted job description You can access its logs here: logUrl. - """; + Please address the failing issues to ensure your syncs continue to run. The most recent attempted job description. + + Workspace ID: %s + Connection ID: %s + """, + WORKSPACE_ID, CONNECTION_ID); server.createContext("/test", new ServerHandler(expectedNotificationWarningMessage)); final SlackNotificationClient client = @@ -151,7 +161,7 @@ void testNotifyConnectionDisabledWarning() throws IOException, InterruptedExcept .withNotificationType(NotificationType.SLACK) .withSendOnSuccess(true) .withSlackConfiguration(new SlackNotificationConfiguration().withWebhook(WEBHOOK_URL + server.getAddress().getPort() + "/test"))); - assertTrue(client.notifyConnectionDisableWarning("", "source-test", "destination-test", "job description", "logUrl")); + assertTrue(client.notifyConnectionDisableWarning("", "source-test", "destination-test", "job description.", WORKSPACE_ID, CONNECTION_ID)); } static class ServerHandler implements HttpHandler { diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java index b49236b9beec..afeb83f52679 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java @@ -118,13 +118,13 @@ private void notifyJob(final String reason, break; case CONNECTION_DISABLED_NOTIFICATION: if (!notificationClient.notifyConnectionDisabled(workspace.getEmail(), sourceConnector, destinationConnector, jobDescription, - logUrl)) { + workspaceId, connectionId)) { LOGGER.warn("Failed to successfully notify auto-disable connection: {}", notification); } break; case CONNECTION_DISABLED_WARNING_NOTIFICATION: if (!notificationClient.notifyConnectionDisableWarning(workspace.getEmail(), sourceConnector, destinationConnector, jobDescription, - logUrl)) { + workspaceId, connectionId)) { LOGGER.warn("Failed to successfully notify auto-disable connection warning: {}", notification); } }