Skip to content

Commit

Permalink
🐛 Keen destination: fix for timestamp inference for complex types (#5973
Browse files Browse the repository at this point in the history
)
  • Loading branch information
maciej-nedza authored Sep 14, 2021
1 parent 74c9986 commit c3bdaa8
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "81740ce8-d764-4ea7-94df-16bb41de36ae",
"name": "Chargify (Keen)",
"dockerRepository": "airbyte/destination-keen",
"dockerImgeTag": "0.1.0",
"dockerImageTag": "0.2.0",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/keen"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "8aaf41d0-f6d2-46de-9e79-c9540f828142",
"name": "Keen",
"dockerRepository": "airbyte/destination-keen",
"dockerImgeTag": "0.1.0",
"dockerImageTag": "0.2.0",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/keen"
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@
- destinationDefinitionId: 8aaf41d0-f6d2-46de-9e79-c9540f828142
name: Keen
dockerRepository: airbyte/destination-keen
dockerImageTag: 0.1.0
dockerImageTag: 0.2.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/keen
- destinationDefinitionId: 81740ce8-d764-4ea7-94df-16bb41de36ae
name: Chargify (Keen)
dockerRepository: airbyte/destination-keen
dockerImageTag: 0.1.0
dockerImageTag: 0.2.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/keen
- destinationDefinitionId: 8b746512-8c2e-6ac1-4adc-b59faafd473c
name: MongoDB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.name=airbyte/destination-keen
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@

import static io.airbyte.integrations.destination.keen.KeenDestination.KEEN_BASE_API_PATH;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import java.io.IOException;
Expand All @@ -37,8 +35,6 @@
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -106,40 +102,4 @@ public ArrayNode extract(String streamName, String projectId, String apiKey)
return (ArrayNode) objectMapper.readTree(response.body()).get("result");
}

public List<String> getAllCollectionsForProject(String projectId, String apiKey)
throws IOException, InterruptedException {
URI listCollectionsUri = URI.create(String.format(
KEEN_BASE_API_PATH + "/projects/%s/events",
projectId));

HttpRequest request = HttpRequest.newBuilder()
.uri(listCollectionsUri)
.timeout(Duration.ofSeconds(30))
.header("Authorization", apiKey)
.header("Content-Type", "application/json")
.build();

HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());

List<KeenCollection> keenCollections = objectMapper.readValue(objectMapper.createParser(response.body()),
new TypeReference<>() {});

return keenCollections.stream().map(KeenCollection::getName).collect(Collectors.toList());
}

@JsonIgnoreProperties(ignoreUnknown = true)
private static class KeenCollection {

private String name;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,10 @@
import com.joestelmach.natty.Parser;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
Expand All @@ -49,20 +46,14 @@
*/
public class KeenTimestampService {

public enum CursorType {
STRING,
NUMBER,
UNRECOGNIZED
}

private static final Logger LOGGER = LoggerFactory.getLogger(KeenRecordsConsumer.class);

private static final long SECONDS_FROM_EPOCH_THRESHOLD = 1_000_000_000L;

private static final long MILLIS_FROM_EPOCH_THRESHOLD = 10_000_000_000L;

// Map containing stream names paired with their cursor fields
private Map<String, CursorField> streamCursorFields;
private Map<String, List<String>> streamCursorFields;
private final Parser parser;
private final boolean timestampInferenceEnabled;

Expand All @@ -76,27 +67,24 @@ public KeenTimestampService(ConfiguredAirbyteCatalog catalog, boolean timestampI
streamCursorFields = catalog.getStreams()
.stream()
.filter(stream -> stream.getCursorField().size() > 0)
.map(s -> Pair.of(s.getStream().getName(), CursorField.fromStream(s)))
.filter(
pair -> pair.getRight() != null && pair.getRight().type != CursorType.UNRECOGNIZED)
.map(s -> Pair.of(s.getStream().getName(), s.getCursorField()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}
}

/**
* Tries to inject keen.timestamp field to the given message data. If the stream contains cursor
* fields of types NUMBER or STRING, this value is tried to be parsed to a timestamp. If this
* procedure fails, stream is removed from timestamp-parsable stream map, so parsing is not tried
* for future messages in the same stream. If parsing succeeds, keen.timestamp field is put as a
* JSON node to the message data and whole data is returned. Otherwise, keen.timestamp is set to
* emittedAt value
* field, it's value is tried to be parsed to timestamp. If this procedure fails, stream is removed
* from timestamp-parsable stream map, so parsing is not tried for future messages in the same stream.
* If parsing succeeds, keen.timestamp field is put as a JSON node to the message data and whole data
* is returned. Otherwise, keen.timestamp is set to emittedAt value
*
* @param message AirbyteRecordMessage containing record data
* @return Record data together with keen.timestamp field
*/
public JsonNode injectTimestamp(AirbyteRecordMessage message) {
String streamName = message.getStream();
CursorField cursorField = streamCursorFields.get(streamName);
List<String> cursorField = streamCursorFields.get(streamName);
JsonNode data = message.getData();
if (timestampInferenceEnabled && cursorField != null) {
try {
Expand All @@ -105,7 +93,7 @@ public JsonNode injectTimestamp(AirbyteRecordMessage message) {
} catch (Exception e) {
// If parsing of timestamp has failed, remove stream from timestamp-parsable stream map,
// so it won't be parsed for future messages.
LOGGER.info("Unable to parse timestamp field: {}", cursorField.path);
LOGGER.info("Unable to parse cursor field: {} into a keen.timestamp", cursorField);
streamCursorFields.remove(streamName);
injectTimestamp(data, Instant.ofEpochMilli(message.getEmittedAt()).toString());
}
Expand All @@ -120,104 +108,38 @@ private void injectTimestamp(JsonNode data, String timestamp) {
root.set("keen", JsonNodeFactory.instance.objectNode().put("timestamp", timestamp));
}

private String parseTimestamp(CursorField cursorField, JsonNode data) {
return switch (cursorField.type) {
case NUMBER -> dateFromNumber(
getNestedNode(data, cursorField.path)
.asLong());
case STRING -> parser
.parse(getNestedNode(data, cursorField.path)
.asText())
private String parseTimestamp(List<String> cursorField, JsonNode data) {
JsonNode timestamp = getNestedNode(data, cursorField);
long numberTimestamp = timestamp.asLong();
// if cursor value is below given threshold, assume that it's not epoch timestamp but ordered id
if (numberTimestamp >= SECONDS_FROM_EPOCH_THRESHOLD) {
return dateFromNumber(numberTimestamp);
}
// if timestamp is 0, then parsing it to long failed - let's try with String now
if (numberTimestamp == 0) {
return parser
.parse(timestamp.asText())
.get(0).getDates()
.get(0)
.toInstant()
.toString();
default -> throw new IllegalStateException("Unexpected value: " + cursorField.type);
};
}
throw new IllegalStateException();
}

private String dateFromNumber(Long timestamp) {
// if cursor value is below given threshold, assume that it's not epoch timestamp but ordered id
if (timestamp < SECONDS_FROM_EPOCH_THRESHOLD) {
throw new IllegalArgumentException("Number cursor field below threshold: " + timestamp);
}
// if cursor value is above given threshold, then assume that it's Unix timestamp in milliseconds
if (timestamp > MILLIS_FROM_EPOCH_THRESHOLD) {
return Instant.ofEpochMilli(timestamp).toString();
}
return Instant.ofEpochSecond(timestamp).toString();
}

public static class CursorField {

private static final Set<String> STRING_TYPES = Set.of(
"STRING", "CHAR", "NCHAR", "NVARCHAR", "VARCHAR", "LONGVARCHAR", "DATE",
"TIME", "TIMESTAMP");
private static final Set<String> NUMBER_TYPES = Set.of(
"NUMBER", "TINYINT", "SMALLINT", "INT", "INTEGER", "BIGINT", "FLOAT", "DOUBLE",
"REAL", "NUMERIC", "DECIMAL");

private final List<String> path;
private final CursorType type;

public CursorField(List<String> path, CursorType type) {
this.path = path;
this.type = type;
}

protected static CursorField fromStream(ConfiguredAirbyteStream stream) {
List<String> cursorFieldList = stream.getCursorField();
JsonNode typeNode = getNestedNode(stream.getStream().getJsonSchema()
.get("properties"), cursorFieldList).get("type");
return new CursorField(cursorFieldList, getType(typeNode));
}

private static CursorType getType(JsonNode typeNode) {
CursorType type = CursorType.UNRECOGNIZED;
if (typeNode.isArray()) {
for (JsonNode e : typeNode) {
type = getType(e.asText().toUpperCase());
if (type != CursorType.UNRECOGNIZED) {
break;
}
}
return type;
}
return getType(typeNode.asText().toUpperCase());
}

private static CursorType getType(String typeString) {
if (STRING_TYPES.contains(typeString)) {
return CursorType.STRING;
}
if (NUMBER_TYPES.contains(typeString)) {
return CursorType.NUMBER;
}
return CursorType.UNRECOGNIZED;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
CursorField that = (CursorField) o;
return Objects.equals(path, that.path) && type == that.type;
}

@Override
public int hashCode() {
return Objects.hash(path, type);
}

}

private static JsonNode getNestedNode(JsonNode data, List<String> fieldNames) {
return fieldNames.stream().reduce(data, JsonNode::get, (first, second) -> second);
}

public Map<String, CursorField> getStreamCursorFields() {
public Map<String, List<String>> getStreamCursorFields() {
return streamCursorFields;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,17 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class KeenDestinationTest extends DestinationAcceptanceTest {

private static final String SECRET_FILE_PATH = "secrets/config.json";

private final KeenHttpClient keenHttpClient = new KeenHttpClient();
private final Set<String> collectionsToDelete = new HashSet<>();

private String projectId;
private String apiKey;
Expand Down Expand Up @@ -77,6 +80,7 @@ protected JsonNode getBaseConfigJson() {
@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv, String streamName, String namespace, JsonNode streamSchema) throws Exception {
String accentStrippedStreamName = KeenCharactersStripper.stripSpecialCharactersFromStreamName(streamName);
collectionsToDelete.add(accentStrippedStreamName);

ArrayNode array = keenHttpClient.extract(accentStrippedStreamName, projectId, apiKey);
return Lists.newArrayList(array.elements()).stream()
Expand All @@ -100,15 +104,10 @@ protected void setup(TestDestinationEnv testEnv) throws Exception {

@Override
protected void tearDown(TestDestinationEnv testEnv) throws Exception {
// Changes for this particular operation - get all collections - can take a couple more time to
// propagate
// than standard queries for the newly created collection
Thread.sleep(5000);
List<String> keenCollections = keenHttpClient.getAllCollectionsForProject(projectId, apiKey);

for (String keenCollection : keenCollections) {
for (String keenCollection : collectionsToDelete) {
keenHttpClient.eraseStream(keenCollection, projectId, apiKey);
}
collectionsToDelete.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.destination.keen.KeenTimestampService.CursorField;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
Expand All @@ -54,22 +53,20 @@ public class KeenTimestampServiceTest {
void shouldInitializeCursorFieldsFromCatalog() throws IOException {
ConfiguredAirbyteCatalog configuredCatalog = readConfiguredCatalogFromFile("cursors_catalog.json");

Map<String, KeenTimestampService.CursorField> expectedCursorFieldsMap = Map.ofEntries(
entry("StringTypeStream1", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.STRING)),
entry("StringTypeStream2", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.STRING)),
entry("StringTypeStream3", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.STRING)),
entry("NumberTypeStream1", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.NUMBER)),
entry("NumberTypeStream2", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.NUMBER)),
entry("ArrayTypeStream1", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.NUMBER)),
entry("ArrayTypeStream2", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.NUMBER)),
entry("ArrayTypeStream3", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.NUMBER)),
entry("NestedCursorStream", new KeenTimestampService.CursorField(List.of("property1", "inside"), KeenTimestampService.CursorType.NUMBER))

);
Map<String, List<String>> expectedCursorFieldsMap = Map.ofEntries(
entry("StringTypeStream1", List.of("property1")),
entry("StringTypeStream2", List.of("property1")),
entry("StringTypeStream3", List.of("property1")),
entry("NumberTypeStream1", List.of("property1")),
entry("NumberTypeStream2", List.of("property1")),
entry("ArrayTypeStream1", List.of("property1")),
entry("ArrayTypeStream2", List.of("property1")),
entry("ArrayTypeStream3", List.of("property1")),
entry("NestedCursorStream", List.of("property1", "inside")));

KeenTimestampService keenTimestampService = new KeenTimestampService(configuredCatalog, true);

Map<String, KeenTimestampService.CursorField> cursorFieldMap = keenTimestampService.getStreamCursorFields();
Map<String, List<String>> cursorFieldMap = keenTimestampService.getStreamCursorFields();
Assertions.assertEquals(expectedCursorFieldsMap, cursorFieldMap);
}

Expand Down Expand Up @@ -134,7 +131,7 @@ void shouldInjectEmittedAtWhenCursorIsUnparsableAndRemoveFieldFromMap() throws I

KeenTimestampService keenTimestampService = new KeenTimestampService(configuredCatalog, true);

Map<String, CursorField> cursorFieldMap = keenTimestampService.getStreamCursorFields();
Map<String, List<String>> cursorFieldMap = keenTimestampService.getStreamCursorFields();
Assertions.assertEquals(cursorFieldMap.size(), 1);

AirbyteMessage message = buildMessageWithCursorValue(configuredCatalog, "some_text");
Expand Down
Loading

0 comments on commit c3bdaa8

Please sign in to comment.