Skip to content

Commit

Permalink
Use different CustomerIO templates and edit slack notifs (#12674)
Browse files Browse the repository at this point in the history
* Use different CustomerIO templates and edit slack notifs

* fix unit test

* fix file naming and comments
  • Loading branch information
terencecho authored and suhomud committed May 23, 2022
1 parent c7a3ab5 commit cd61364
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,37 @@
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.UUID;
import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Notification client that uses customer.io API send emails.
*
* These notifications rely on `TRANSACTION_MESSAGE_ID`, which are basically templates you create
* through customer.io. These IDs are specific to a user's account on customer.io, so they will be
* different for every user. For now they are stored as variables here, but in the future they may
* be stored in as a notification config in the database.
*
* For Airbyte Cloud, Airbyte engineers may use `DEFAULT_TRANSACTION_MESSAGE_ID = "6"` as a generic
* template for notifications.
*/
public class CustomeriolNotificationClient extends NotificationClient {
public class CustomerioNotificationClient extends NotificationClient {

private static final Logger LOGGER = LoggerFactory.getLogger(CustomeriolNotificationClient.class);
private static final Logger LOGGER = LoggerFactory.getLogger(CustomerioNotificationClient.class);

// Once the configs are editable through the UI, these should be stored in
// airbyte-config/models/src/main/resources/types/CustomerioNotificationConfiguration.yaml
// - SENDER_EMAIL
// - receiver email
// - customer.io identifier email
// - customer.io TRANSACTION_MESSAGE_ID
private static final String SENDER_EMAIL = "Airbyte Notification <[email protected]>";
private static final String TRANSACTION_MESSAGE_ID = "6";
private static final String AUTO_DISABLE_TRANSACTION_MESSAGE_ID = "7";
private static final String AUTO_DISABLE_WARNING_TRANSACTION_MESSAGE_ID = "8";

private static final String CUSTOMERIO_EMAIL_API_ENDPOINT = "https://api.customer.io/v1/send/email";
private static final String AUTO_DISABLE_NOTIFICATION_TEMPLATE_PATH = "customerio/auto_disable_notification_template.json";
private static final String AUTO_DISABLE_WARNING_NOTIFICATION_TEMPLATE_PATH = "customerio/auto_disable_warning_notification_template.json";

private final HttpClient httpClient;
private final String apiToken;
private final String emailApiEndpoint;

public CustomeriolNotificationClient(final Notification notification) {
public CustomerioNotificationClient(final Notification notification) {
super(notification);
this.apiToken = System.getenv("CUSTOMERIO_API_KEY");
this.emailApiEndpoint = CUSTOMERIO_EMAIL_API_ENDPOINT;
Expand All @@ -50,10 +52,10 @@ public CustomeriolNotificationClient(final Notification notification) {
}

@VisibleForTesting
public CustomeriolNotificationClient(final Notification notification,
final String apiToken,
final String emailApiEndpoint,
final HttpClient httpClient) {
public CustomerioNotificationClient(final Notification notification,
final String apiToken,
final String emailApiEndpoint,
final HttpClient httpClient) {
super(notification);
this.apiToken = apiToken;
this.emailApiEndpoint = emailApiEndpoint;
Expand All @@ -72,28 +74,32 @@ public boolean notifyJobSuccess(final String sourceConnector, final String desti
throw new NotImplementedException();
}

// Once the configs are editable through the UI, the reciever email should be stored in
// airbyte-config/models/src/main/resources/types/CustomerioNotificationConfiguration.yaml
// instead of being passed in
@Override
public boolean notifyConnectionDisabled(final String receiverEmail,
final String sourceConnector,
final String destinationConnector,
final String jobDescription,
final String logUrl)
final UUID workspaceId,
final UUID connectionId)
throws IOException, InterruptedException {
final String requestBody = renderTemplate(AUTO_DISABLE_NOTIFICATION_TEMPLATE_PATH, TRANSACTION_MESSAGE_ID, SENDER_EMAIL, receiverEmail,
receiverEmail, sourceConnector, destinationConnector, jobDescription, logUrl);
final String requestBody = renderTemplate(AUTO_DISABLE_NOTIFICATION_TEMPLATE_PATH, AUTO_DISABLE_TRANSACTION_MESSAGE_ID, receiverEmail,
receiverEmail, sourceConnector, destinationConnector, jobDescription, workspaceId.toString(), connectionId.toString());
return notifyByEmail(requestBody);
}

@Override
public boolean notifyConnectionDisableWarning(
final String receiverEmail,
public boolean notifyConnectionDisableWarning(final String receiverEmail,
final String sourceConnector,
final String destinationConnector,
final String jobDescription,
final String logUrl)
final UUID workspaceId,
final UUID connectionId)
throws IOException, InterruptedException {
final String requestBody = renderTemplate(AUTO_DISABLE_WARNING_NOTIFICATION_TEMPLATE_PATH, TRANSACTION_MESSAGE_ID, SENDER_EMAIL, receiverEmail,
receiverEmail, sourceConnector, destinationConnector, jobDescription, logUrl);
final String requestBody = renderTemplate(AUTO_DISABLE_NOTIFICATION_TEMPLATE_PATH, AUTO_DISABLE_WARNING_TRANSACTION_MESSAGE_ID, receiverEmail,
receiverEmail, sourceConnector, destinationConnector, jobDescription, workspaceId.toString(), connectionId.toString());
return notifyByEmail(requestBody);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

package io.airbyte.notification;

import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.Notification;
import java.io.IOException;
import java.util.UUID;

public abstract class NotificationClient {

Expand Down Expand Up @@ -35,14 +37,16 @@ public abstract boolean notifyConnectionDisabled(String receiverEmail,
String sourceConnector,
String destinationConnector,
String jobDescription,
String logUrl)
UUID workspaceId,
UUID connectionId)
throws IOException, InterruptedException;

public abstract boolean notifyConnectionDisableWarning(String receiverEmail,
String sourceConnector,
String destinationConnector,
String jobDescription,
String logUrl)
UUID workspaceId,
UUID connectionId)
throws IOException, InterruptedException;

public abstract boolean notifySuccess(String message) throws IOException, InterruptedException;
Expand All @@ -52,9 +56,14 @@ public abstract boolean notifyConnectionDisableWarning(String receiverEmail,
public static NotificationClient createNotificationClient(final Notification notification) {
return switch (notification.getNotificationType()) {
case SLACK -> new SlackNotificationClient(notification);
case CUSTOMERIO -> new CustomeriolNotificationClient(notification);
case CUSTOMERIO -> new CustomerioNotificationClient(notification);
default -> throw new IllegalArgumentException("Unknown notification type:" + notification.getNotificationType());
};
}

String renderTemplate(final String templateFile, final String... data) throws IOException {
final String template = MoreResources.readResource(templateFile);
return String.format(template, data);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.Notification;
import io.airbyte.config.SlackNotificationConfiguration;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.UUID;
import org.apache.logging.log4j.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -46,7 +46,7 @@ public SlackNotificationClient(final Notification notification) {
@Override
public boolean notifyJobFailure(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl)
throws IOException, InterruptedException {
return notifyFailure(renderJobData(
return notifyFailure(renderTemplate(
"slack/failure_slack_notification_template.txt",
sourceConnector,
destinationConnector,
Expand All @@ -57,7 +57,7 @@ public boolean notifyJobFailure(final String sourceConnector, final String desti
@Override
public boolean notifyJobSuccess(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl)
throws IOException, InterruptedException {
return notifySuccess(renderJobData(
return notifySuccess(renderTemplate(
"slack/success_slack_notification_template.txt",
sourceConnector,
destinationConnector,
Expand All @@ -70,14 +70,16 @@ public boolean notifyConnectionDisabled(final String receiverEmail,
final String sourceConnector,
final String destinationConnector,
final String jobDescription,
final String logUrl)
final UUID workspaceId,
final UUID connectionId)
throws IOException, InterruptedException {
final String message = renderJobData(
final String message = renderTemplate(
"slack/auto_disable_slack_notification_template.txt",
sourceConnector,
destinationConnector,
jobDescription,
logUrl);
workspaceId.toString(),
connectionId.toString());

final String webhookUrl = config.getWebhook();
if (!Strings.isEmpty(webhookUrl)) {
Expand All @@ -91,14 +93,16 @@ public boolean notifyConnectionDisableWarning(final String receiverEmail,
final String sourceConnector,
final String destinationConnector,
final String jobDescription,
final String logUrl)
final UUID workspaceId,
final UUID connectionId)
throws IOException, InterruptedException {
final String message = renderJobData(
final String message = renderTemplate(
"slack/auto_disable_warning_slack_notification_template.txt",
sourceConnector,
destinationConnector,
jobDescription,
logUrl);
workspaceId.toString(),
connectionId.toString());

final String webhookUrl = config.getWebhook();
if (!Strings.isEmpty(webhookUrl)) {
Expand All @@ -107,16 +111,6 @@ public boolean notifyConnectionDisableWarning(final String receiverEmail,
return false;
}

private String renderJobData(final String templateFile,
final String sourceConnector,
final String destinationConnector,
final String jobDescription,
final String logUrl)
throws IOException {
final String template = MoreResources.readResource(templateFile);
return String.format(template, sourceConnector, destinationConnector, jobDescription, logUrl);
}

private boolean notify(final String message) throws IOException, InterruptedException {
final ImmutableMap<String, String> body = new Builder<String, String>()
.put("text", message)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
{
"transactional_message_id": "%s",
"from": "%s",
"subject": "Automatic Notification: Your Airbyte connection has been disabled",
"to": "%s",
"identifiers": {
"email": "%s"
},
"message_data": {
"email_title": "Automatic Notification: Connection Disabled",
"email_body": "Your connection from <b>%s</b> to <b>%s</b> was automatically disabled because it failed 100 times consecutively or has been failing for 14 days in a row.<p>Please address the failing issues to ensure your syncs continue to run. The most recent attempted %s You can access its logs here: %s.<p>If you need help with resolving your connection, reach out to Support in-app or by emailing [email protected]."
"source": "%s",
"destination": "%s",
"job_description": "%s",
"connection_id": "%s",
"workspace_id": "%s"
},

"disable_message_retention": false,
"send_to_unsubscribed": true,
"tracked": true,
"tracked": false,
"queue_draft": false,
"disable_css_preprocessing": true
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"transactional_message_id": "%s",
"to": "%s",
"identifiers": {
"email": "%s"
},
"message_data": {
"email_title": "%s",
"email_body": "%s"
},

"disable_message_retention": false,
"send_to_unsubscribed": true,
"tracked": true,
"queue_draft": false,
"disable_css_preprocessing": true
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
Your connection from %s to %s was automatically disabled because it failed 100 times consecutively or has been failing for 14 days in a row.

Please address the failing issues to ensure your syncs continue to run. The most recent attempted %s You can access its logs here: %s.
Please address the failing issues to ensure your syncs continue to run. The most recent attempted %s

Workspace ID: %s
Connection ID: %s
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
Your connection from %s to %s is scheduled to be automatically disabled because it either failed 50 times consecutively or there were only failed jobs in the past 7 days. Once it has failed 100 times consecutively or has been failing for 14 days in a row, the connection will be automatically disabled.

Please address the failing issues to ensure your syncs continue to run. The most recent attempted %s You can access its logs here: %s.
Please address the failing issues to ensure your syncs continue to run. The most recent attempted %s

Workspace ID: %s
Connection ID: %s
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class CustomerioNotificationClientTest {
private static final String API_KEY = "api-key";
private static final String URI_BASE = "https://customer.io";
private static final UUID WORKSPACE_ID = UUID.randomUUID();
private static final UUID CONNECTION_ID = UUID.randomUUID();
private static final StandardWorkspace WORKSPACE = new StandardWorkspace()
.withWorkspaceId(WORKSPACE_ID)
.withName("workspace-name")
Expand All @@ -45,7 +46,7 @@ void setUp() {
// this test does _not_ check the body of the request.
@Test
void notifyConnectionDisabled() throws IOException, InterruptedException {
final CustomeriolNotificationClient customeriolNotificationClient = new CustomeriolNotificationClient(new Notification()
final CustomerioNotificationClient customerioNotificationClient = new CustomerioNotificationClient(new Notification()
.withNotificationType(NotificationType.CUSTOMERIO), API_KEY, URI_BASE, mHttpClient);

final HttpRequest expectedRequest = HttpRequest.newBuilder()
Expand All @@ -60,7 +61,8 @@ void notifyConnectionDisabled() throws IOException, InterruptedException {
Mockito.when(httpResponse.statusCode()).thenReturn(200);

final boolean result =
customeriolNotificationClient.notifyConnectionDisabled(WORKSPACE.getEmail(), RANDOM_INPUT, RANDOM_INPUT, RANDOM_INPUT, RANDOM_INPUT);
customerioNotificationClient.notifyConnectionDisabled(WORKSPACE.getEmail(), RANDOM_INPUT, RANDOM_INPUT, RANDOM_INPUT, WORKSPACE_ID,
CONNECTION_ID);
Mockito.verify(mHttpClient).send(expectedRequest, HttpResponse.BodyHandlers.ofString());

assertTrue(result);
Expand Down
Loading

0 comments on commit cd61364

Please sign in to comment.