diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/81740ce8-d764-4ea7-94df-16bb41de36ae.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/81740ce8-d764-4ea7-94df-16bb41de36ae.json index a660faf5dacb..211931630c0e 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/81740ce8-d764-4ea7-94df-16bb41de36ae.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/81740ce8-d764-4ea7-94df-16bb41de36ae.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "81740ce8-d764-4ea7-94df-16bb41de36ae", "name": "Chargify (Keen)", "dockerRepository": "airbyte/destination-keen", - "dockerImgeTag": "0.1.0", + "dockerImageTag": "0.2.0", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/keen" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8aaf41d0-f6d2-46de-9e79-c9540f828142.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8aaf41d0-f6d2-46de-9e79-c9540f828142.json index c8172d75bc45..747ef3d770ef 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8aaf41d0-f6d2-46de-9e79-c9540f828142.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8aaf41d0-f6d2-46de-9e79-c9540f828142.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "8aaf41d0-f6d2-46de-9e79-c9540f828142", "name": "Keen", "dockerRepository": "airbyte/destination-keen", - "dockerImgeTag": "0.1.0", + "dockerImageTag": "0.2.0", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/keen" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 176461a5ce0f..fd20f4da2caf 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -88,12 +88,12 @@ - destinationDefinitionId: 8aaf41d0-f6d2-46de-9e79-c9540f828142 name: Keen dockerRepository: airbyte/destination-keen - dockerImageTag: 0.1.0 + dockerImageTag: 0.2.0 documentationUrl: https://docs.airbyte.io/integrations/destinations/keen - destinationDefinitionId: 81740ce8-d764-4ea7-94df-16bb41de36ae name: Chargify (Keen) dockerRepository: airbyte/destination-keen - dockerImageTag: 0.1.0 + dockerImageTag: 0.2.0 documentationUrl: https://docs.airbyte.io/integrations/destinations/keen - destinationDefinitionId: 8b746512-8c2e-6ac1-4adc-b59faafd473c name: MongoDB diff --git a/airbyte-integrations/connectors/destination-keen/Dockerfile b/airbyte-integrations/connectors/destination-keen/Dockerfile index f9576125b342..c0d05380fc96 100644 --- a/airbyte-integrations/connectors/destination-keen/Dockerfile +++ b/airbyte-integrations/connectors/destination-keen/Dockerfile @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.version=0.2.0 LABEL io.airbyte.name=airbyte/destination-keen diff --git a/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenHttpClient.java b/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenHttpClient.java index e2414218ac5b..3dfde8d98f75 100644 --- a/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenHttpClient.java +++ b/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenHttpClient.java @@ -26,8 +26,6 @@ import static io.airbyte.integrations.destination.keen.KeenDestination.KEEN_BASE_API_PATH; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import java.io.IOException; @@ -37,8 +35,6 @@ import java.net.http.HttpResponse; import java.net.http.HttpResponse.BodyHandlers; import java.time.Duration; -import java.util.List; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,40 +102,4 @@ public ArrayNode extract(String streamName, String projectId, String apiKey) return (ArrayNode) objectMapper.readTree(response.body()).get("result"); } - public List getAllCollectionsForProject(String projectId, String apiKey) - throws IOException, InterruptedException { - URI listCollectionsUri = URI.create(String.format( - KEEN_BASE_API_PATH + "/projects/%s/events", - projectId)); - - HttpRequest request = HttpRequest.newBuilder() - .uri(listCollectionsUri) - .timeout(Duration.ofSeconds(30)) - .header("Authorization", apiKey) - .header("Content-Type", "application/json") - .build(); - - HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); - - List keenCollections = objectMapper.readValue(objectMapper.createParser(response.body()), - new TypeReference<>() {}); - - return keenCollections.stream().map(KeenCollection::getName).collect(Collectors.toList()); - } - - @JsonIgnoreProperties(ignoreUnknown = true) - private static class KeenCollection { - - private String name; - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - } - } diff --git a/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenTimestampService.java b/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenTimestampService.java index 49276776be55..3b1146ec42da 100644 --- a/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenTimestampService.java +++ b/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenTimestampService.java @@ -30,13 +30,10 @@ import com.joestelmach.natty.Parser; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.time.Instant; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; @@ -49,12 +46,6 @@ */ public class KeenTimestampService { - public enum CursorType { - STRING, - NUMBER, - UNRECOGNIZED - } - private static final Logger LOGGER = LoggerFactory.getLogger(KeenRecordsConsumer.class); private static final long SECONDS_FROM_EPOCH_THRESHOLD = 1_000_000_000L; @@ -62,7 +53,7 @@ public enum CursorType { private static final long MILLIS_FROM_EPOCH_THRESHOLD = 10_000_000_000L; // Map containing stream names paired with their cursor fields - private Map streamCursorFields; + private Map> streamCursorFields; private final Parser parser; private final boolean timestampInferenceEnabled; @@ -76,27 +67,24 @@ public KeenTimestampService(ConfiguredAirbyteCatalog catalog, boolean timestampI streamCursorFields = catalog.getStreams() .stream() .filter(stream -> stream.getCursorField().size() > 0) - .map(s -> Pair.of(s.getStream().getName(), CursorField.fromStream(s))) - .filter( - pair -> pair.getRight() != null && pair.getRight().type != CursorType.UNRECOGNIZED) + .map(s -> Pair.of(s.getStream().getName(), s.getCursorField())) .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); } } /** * Tries to inject keen.timestamp field to the given message data. If the stream contains cursor - * fields of types NUMBER or STRING, this value is tried to be parsed to a timestamp. If this - * procedure fails, stream is removed from timestamp-parsable stream map, so parsing is not tried - * for future messages in the same stream. If parsing succeeds, keen.timestamp field is put as a - * JSON node to the message data and whole data is returned. Otherwise, keen.timestamp is set to - * emittedAt value + * field, it's value is tried to be parsed to timestamp. If this procedure fails, stream is removed + * from timestamp-parsable stream map, so parsing is not tried for future messages in the same stream. + * If parsing succeeds, keen.timestamp field is put as a JSON node to the message data and whole data + * is returned. Otherwise, keen.timestamp is set to emittedAt value * * @param message AirbyteRecordMessage containing record data * @return Record data together with keen.timestamp field */ public JsonNode injectTimestamp(AirbyteRecordMessage message) { String streamName = message.getStream(); - CursorField cursorField = streamCursorFields.get(streamName); + List cursorField = streamCursorFields.get(streamName); JsonNode data = message.getData(); if (timestampInferenceEnabled && cursorField != null) { try { @@ -105,7 +93,7 @@ public JsonNode injectTimestamp(AirbyteRecordMessage message) { } catch (Exception e) { // If parsing of timestamp has failed, remove stream from timestamp-parsable stream map, // so it won't be parsed for future messages. - LOGGER.info("Unable to parse timestamp field: {}", cursorField.path); + LOGGER.info("Unable to parse cursor field: {} into a keen.timestamp", cursorField); streamCursorFields.remove(streamName); injectTimestamp(data, Instant.ofEpochMilli(message.getEmittedAt()).toString()); } @@ -120,27 +108,26 @@ private void injectTimestamp(JsonNode data, String timestamp) { root.set("keen", JsonNodeFactory.instance.objectNode().put("timestamp", timestamp)); } - private String parseTimestamp(CursorField cursorField, JsonNode data) { - return switch (cursorField.type) { - case NUMBER -> dateFromNumber( - getNestedNode(data, cursorField.path) - .asLong()); - case STRING -> parser - .parse(getNestedNode(data, cursorField.path) - .asText()) + private String parseTimestamp(List cursorField, JsonNode data) { + JsonNode timestamp = getNestedNode(data, cursorField); + long numberTimestamp = timestamp.asLong(); + // if cursor value is below given threshold, assume that it's not epoch timestamp but ordered id + if (numberTimestamp >= SECONDS_FROM_EPOCH_THRESHOLD) { + return dateFromNumber(numberTimestamp); + } + // if timestamp is 0, then parsing it to long failed - let's try with String now + if (numberTimestamp == 0) { + return parser + .parse(timestamp.asText()) .get(0).getDates() .get(0) .toInstant() .toString(); - default -> throw new IllegalStateException("Unexpected value: " + cursorField.type); - }; + } + throw new IllegalStateException(); } private String dateFromNumber(Long timestamp) { - // if cursor value is below given threshold, assume that it's not epoch timestamp but ordered id - if (timestamp < SECONDS_FROM_EPOCH_THRESHOLD) { - throw new IllegalArgumentException("Number cursor field below threshold: " + timestamp); - } // if cursor value is above given threshold, then assume that it's Unix timestamp in milliseconds if (timestamp > MILLIS_FROM_EPOCH_THRESHOLD) { return Instant.ofEpochMilli(timestamp).toString(); @@ -148,76 +135,11 @@ private String dateFromNumber(Long timestamp) { return Instant.ofEpochSecond(timestamp).toString(); } - public static class CursorField { - - private static final Set STRING_TYPES = Set.of( - "STRING", "CHAR", "NCHAR", "NVARCHAR", "VARCHAR", "LONGVARCHAR", "DATE", - "TIME", "TIMESTAMP"); - private static final Set NUMBER_TYPES = Set.of( - "NUMBER", "TINYINT", "SMALLINT", "INT", "INTEGER", "BIGINT", "FLOAT", "DOUBLE", - "REAL", "NUMERIC", "DECIMAL"); - - private final List path; - private final CursorType type; - - public CursorField(List path, CursorType type) { - this.path = path; - this.type = type; - } - - protected static CursorField fromStream(ConfiguredAirbyteStream stream) { - List cursorFieldList = stream.getCursorField(); - JsonNode typeNode = getNestedNode(stream.getStream().getJsonSchema() - .get("properties"), cursorFieldList).get("type"); - return new CursorField(cursorFieldList, getType(typeNode)); - } - - private static CursorType getType(JsonNode typeNode) { - CursorType type = CursorType.UNRECOGNIZED; - if (typeNode.isArray()) { - for (JsonNode e : typeNode) { - type = getType(e.asText().toUpperCase()); - if (type != CursorType.UNRECOGNIZED) { - break; - } - } - return type; - } - return getType(typeNode.asText().toUpperCase()); - } - - private static CursorType getType(String typeString) { - if (STRING_TYPES.contains(typeString)) { - return CursorType.STRING; - } - if (NUMBER_TYPES.contains(typeString)) { - return CursorType.NUMBER; - } - return CursorType.UNRECOGNIZED; - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - CursorField that = (CursorField) o; - return Objects.equals(path, that.path) && type == that.type; - } - - @Override - public int hashCode() { - return Objects.hash(path, type); - } - - } - private static JsonNode getNestedNode(JsonNode data, List fieldNames) { return fieldNames.stream().reduce(data, JsonNode::get, (first, second) -> second); } - public Map getStreamCursorFields() { + public Map> getStreamCursorFields() { return streamCursorFields; } diff --git a/airbyte-integrations/connectors/destination-keen/src/test-integration/java/io/airbyte/integrations/destination/keen/KeenDestinationTest.java b/airbyte-integrations/connectors/destination-keen/src/test-integration/java/io/airbyte/integrations/destination/keen/KeenDestinationTest.java index 7d3db95b1fab..7e1b9cac2eb5 100644 --- a/airbyte-integrations/connectors/destination-keen/src/test-integration/java/io/airbyte/integrations/destination/keen/KeenDestinationTest.java +++ b/airbyte-integrations/connectors/destination-keen/src/test-integration/java/io/airbyte/integrations/destination/keen/KeenDestinationTest.java @@ -39,7 +39,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Comparator; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; public class KeenDestinationTest extends DestinationAcceptanceTest { @@ -47,6 +49,7 @@ public class KeenDestinationTest extends DestinationAcceptanceTest { private static final String SECRET_FILE_PATH = "secrets/config.json"; private final KeenHttpClient keenHttpClient = new KeenHttpClient(); + private final Set collectionsToDelete = new HashSet<>(); private String projectId; private String apiKey; @@ -77,6 +80,7 @@ protected JsonNode getBaseConfigJson() { @Override protected List retrieveRecords(TestDestinationEnv testEnv, String streamName, String namespace, JsonNode streamSchema) throws Exception { String accentStrippedStreamName = KeenCharactersStripper.stripSpecialCharactersFromStreamName(streamName); + collectionsToDelete.add(accentStrippedStreamName); ArrayNode array = keenHttpClient.extract(accentStrippedStreamName, projectId, apiKey); return Lists.newArrayList(array.elements()).stream() @@ -100,15 +104,10 @@ protected void setup(TestDestinationEnv testEnv) throws Exception { @Override protected void tearDown(TestDestinationEnv testEnv) throws Exception { - // Changes for this particular operation - get all collections - can take a couple more time to - // propagate - // than standard queries for the newly created collection - Thread.sleep(5000); - List keenCollections = keenHttpClient.getAllCollectionsForProject(projectId, apiKey); - - for (String keenCollection : keenCollections) { + for (String keenCollection : collectionsToDelete) { keenHttpClient.eraseStream(keenCollection, projectId, apiKey); } + collectionsToDelete.clear(); } @Override diff --git a/airbyte-integrations/connectors/destination-keen/src/test/java/KeenTimestampServiceTest.java b/airbyte-integrations/connectors/destination-keen/src/test/java/KeenTimestampServiceTest.java index 4484a6fff4e6..5b7321a9080a 100644 --- a/airbyte-integrations/connectors/destination-keen/src/test/java/KeenTimestampServiceTest.java +++ b/airbyte-integrations/connectors/destination-keen/src/test/java/KeenTimestampServiceTest.java @@ -32,7 +32,6 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; -import io.airbyte.integrations.destination.keen.KeenTimestampService.CursorField; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; @@ -54,22 +53,20 @@ public class KeenTimestampServiceTest { void shouldInitializeCursorFieldsFromCatalog() throws IOException { ConfiguredAirbyteCatalog configuredCatalog = readConfiguredCatalogFromFile("cursors_catalog.json"); - Map expectedCursorFieldsMap = Map.ofEntries( - entry("StringTypeStream1", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.STRING)), - entry("StringTypeStream2", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.STRING)), - entry("StringTypeStream3", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.STRING)), - entry("NumberTypeStream1", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.NUMBER)), - entry("NumberTypeStream2", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.NUMBER)), - entry("ArrayTypeStream1", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.NUMBER)), - entry("ArrayTypeStream2", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.NUMBER)), - entry("ArrayTypeStream3", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.NUMBER)), - entry("NestedCursorStream", new KeenTimestampService.CursorField(List.of("property1", "inside"), KeenTimestampService.CursorType.NUMBER)) - - ); + Map> expectedCursorFieldsMap = Map.ofEntries( + entry("StringTypeStream1", List.of("property1")), + entry("StringTypeStream2", List.of("property1")), + entry("StringTypeStream3", List.of("property1")), + entry("NumberTypeStream1", List.of("property1")), + entry("NumberTypeStream2", List.of("property1")), + entry("ArrayTypeStream1", List.of("property1")), + entry("ArrayTypeStream2", List.of("property1")), + entry("ArrayTypeStream3", List.of("property1")), + entry("NestedCursorStream", List.of("property1", "inside"))); KeenTimestampService keenTimestampService = new KeenTimestampService(configuredCatalog, true); - Map cursorFieldMap = keenTimestampService.getStreamCursorFields(); + Map> cursorFieldMap = keenTimestampService.getStreamCursorFields(); Assertions.assertEquals(expectedCursorFieldsMap, cursorFieldMap); } @@ -134,7 +131,7 @@ void shouldInjectEmittedAtWhenCursorIsUnparsableAndRemoveFieldFromMap() throws I KeenTimestampService keenTimestampService = new KeenTimestampService(configuredCatalog, true); - Map cursorFieldMap = keenTimestampService.getStreamCursorFields(); + Map> cursorFieldMap = keenTimestampService.getStreamCursorFields(); Assertions.assertEquals(cursorFieldMap.size(), 1); AirbyteMessage message = buildMessageWithCursorValue(configuredCatalog, "some_text"); diff --git a/airbyte-integrations/connectors/destination-keen/src/test/resources/cursors_catalog.json b/airbyte-integrations/connectors/destination-keen/src/test/resources/cursors_catalog.json index d976e34297f3..df442b54ebfe 100644 --- a/airbyte-integrations/connectors/destination-keen/src/test/resources/cursors_catalog.json +++ b/airbyte-integrations/connectors/destination-keen/src/test/resources/cursors_catalog.json @@ -10,30 +10,6 @@ } } }, - { - "name": "WrongTypeStream", - "source_defined_cursor": true, - "default_cursor_field": ["property1"], - "json_schema": { - "properties": { - "property1": { - "type": "wrong" - } - } - } - }, - { - "name": "BooleanTypeStream", - "source_defined_cursor": true, - "default_cursor_field": ["property1"], - "json_schema": { - "properties": { - "property1": { - "type": "boolean" - } - } - } - }, { "name": "StringTypeStream1", "source_defined_cursor": true, diff --git a/docs/integrations/destinations/keen.md b/docs/integrations/destinations/keen.md index 5dfc31649364..05d6695340b3 100644 --- a/docs/integrations/destinations/keen.md +++ b/docs/integrations/destinations/keen.md @@ -62,3 +62,8 @@ Now you should have all the parameters needed to configure Keen destination. ## CHANGELOG +| Version | Date | Pull Request | Subject | +| :------ | :-------- | :----- | :------ | +| 0.2.0 | 2021-09-10 | [#5973](https://github.com/airbytehq/airbyte/pull/5973) | Fix timestamp inference for complex schemas | +| 0.1.0 | 2021-08-18 | [#5339](https://github.com/airbytehq/airbyte/pull/5339) | Keen Destination Release! | +