Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Keen destination: fix for timestamp inference for complex types #5973

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "81740ce8-d764-4ea7-94df-16bb41de36ae",
"name": "Chargify (Keen)",
"dockerRepository": "airbyte/destination-keen",
"dockerImgeTag": "0.1.0",
"dockerImageTag": "0.2.0",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/keen"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "8aaf41d0-f6d2-46de-9e79-c9540f828142",
"name": "Keen",
"dockerRepository": "airbyte/destination-keen",
"dockerImgeTag": "0.1.0",
"dockerImageTag": "0.2.0",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/keen"
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@
- destinationDefinitionId: 8aaf41d0-f6d2-46de-9e79-c9540f828142
name: Keen
dockerRepository: airbyte/destination-keen
dockerImageTag: 0.1.0
dockerImageTag: 0.2.0
Copy link
Contributor Author

@maciej-nedza maciej-nedza Sep 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've bumped versions here and in the <uuid>.json file, however I don't see updated version number in settings -> Destination after launching the application. Should I do something more ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instances don't autoupdate versions for stability reasons. The user must update it by hand. Versions specified in this/the JSON files are used the very first time Airbyte runs.

documentationUrl: https://docs.airbyte.io/integrations/destinations/keen
- destinationDefinitionId: 81740ce8-d764-4ea7-94df-16bb41de36ae
name: Chargify (Keen)
dockerRepository: airbyte/destination-keen
dockerImageTag: 0.1.0
dockerImageTag: 0.2.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/keen
- destinationDefinitionId: 8b746512-8c2e-6ac1-4adc-b59faafd473c
name: MongoDB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.name=airbyte/destination-keen
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@

import static io.airbyte.integrations.destination.keen.KeenDestination.KEEN_BASE_API_PATH;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import java.io.IOException;
Expand All @@ -37,8 +35,6 @@
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -106,40 +102,4 @@ public ArrayNode extract(String streamName, String projectId, String apiKey)
return (ArrayNode) objectMapper.readTree(response.body()).get("result");
}

public List<String> getAllCollectionsForProject(String projectId, String apiKey)
throws IOException, InterruptedException {
URI listCollectionsUri = URI.create(String.format(
KEEN_BASE_API_PATH + "/projects/%s/events",
projectId));

HttpRequest request = HttpRequest.newBuilder()
.uri(listCollectionsUri)
.timeout(Duration.ofSeconds(30))
.header("Authorization", apiKey)
.header("Content-Type", "application/json")
.build();

HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());

List<KeenCollection> keenCollections = objectMapper.readValue(objectMapper.createParser(response.body()),
new TypeReference<>() {});

return keenCollections.stream().map(KeenCollection::getName).collect(Collectors.toList());
}

@JsonIgnoreProperties(ignoreUnknown = true)
private static class KeenCollection {

private String name;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,10 @@
import com.joestelmach.natty.Parser;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
Expand All @@ -49,20 +46,14 @@
*/
public class KeenTimestampService {

public enum CursorType {
STRING,
NUMBER,
UNRECOGNIZED
}

private static final Logger LOGGER = LoggerFactory.getLogger(KeenRecordsConsumer.class);

private static final long SECONDS_FROM_EPOCH_THRESHOLD = 1_000_000_000L;

private static final long MILLIS_FROM_EPOCH_THRESHOLD = 10_000_000_000L;

// Map containing stream names paired with their cursor fields
private Map<String, CursorField> streamCursorFields;
private Map<String, List<String>> streamCursorFields;
private final Parser parser;
private final boolean timestampInferenceEnabled;

Expand All @@ -76,27 +67,24 @@ public KeenTimestampService(ConfiguredAirbyteCatalog catalog, boolean timestampI
streamCursorFields = catalog.getStreams()
.stream()
.filter(stream -> stream.getCursorField().size() > 0)
.map(s -> Pair.of(s.getStream().getName(), CursorField.fromStream(s)))
.filter(
pair -> pair.getRight() != null && pair.getRight().type != CursorType.UNRECOGNIZED)
.map(s -> Pair.of(s.getStream().getName(), s.getCursorField()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}
}

/**
* Tries to inject keen.timestamp field to the given message data. If the stream contains cursor
* fields of types NUMBER or STRING, this value is tried to be parsed to a timestamp. If this
* procedure fails, stream is removed from timestamp-parsable stream map, so parsing is not tried
* for future messages in the same stream. If parsing succeeds, keen.timestamp field is put as a
* JSON node to the message data and whole data is returned. Otherwise, keen.timestamp is set to
* emittedAt value
* field, it's value is tried to be parsed to timestamp. If this procedure fails, stream is removed
* from timestamp-parsable stream map, so parsing is not tried for future messages in the same stream.
* If parsing succeeds, keen.timestamp field is put as a JSON node to the message data and whole data
* is returned. Otherwise, keen.timestamp is set to emittedAt value
*
* @param message AirbyteRecordMessage containing record data
* @return Record data together with keen.timestamp field
*/
public JsonNode injectTimestamp(AirbyteRecordMessage message) {
String streamName = message.getStream();
CursorField cursorField = streamCursorFields.get(streamName);
List<String> cursorField = streamCursorFields.get(streamName);
JsonNode data = message.getData();
if (timestampInferenceEnabled && cursorField != null) {
try {
Expand All @@ -105,7 +93,7 @@ public JsonNode injectTimestamp(AirbyteRecordMessage message) {
} catch (Exception e) {
// If parsing of timestamp has failed, remove stream from timestamp-parsable stream map,
// so it won't be parsed for future messages.
LOGGER.info("Unable to parse timestamp field: {}", cursorField.path);
LOGGER.info("Unable to parse cursor field: {} into a keen.timestamp", cursorField);
streamCursorFields.remove(streamName);
injectTimestamp(data, Instant.ofEpochMilli(message.getEmittedAt()).toString());
}
Expand All @@ -120,104 +108,38 @@ private void injectTimestamp(JsonNode data, String timestamp) {
root.set("keen", JsonNodeFactory.instance.objectNode().put("timestamp", timestamp));
}

private String parseTimestamp(CursorField cursorField, JsonNode data) {
return switch (cursorField.type) {
case NUMBER -> dateFromNumber(
getNestedNode(data, cursorField.path)
.asLong());
case STRING -> parser
.parse(getNestedNode(data, cursorField.path)
.asText())
private String parseTimestamp(List<String> cursorField, JsonNode data) {
JsonNode timestamp = getNestedNode(data, cursorField);
long numberTimestamp = timestamp.asLong();
// if cursor value is below given threshold, assume that it's not epoch timestamp but ordered id
if (numberTimestamp >= SECONDS_FROM_EPOCH_THRESHOLD) {
return dateFromNumber(numberTimestamp);
}
// if timestamp is 0, then parsing it to long failed - let's try with String now
if (numberTimestamp == 0) {
return parser
.parse(timestamp.asText())
.get(0).getDates()
.get(0)
.toInstant()
.toString();
default -> throw new IllegalStateException("Unexpected value: " + cursorField.type);
};
}
throw new IllegalStateException();
}

private String dateFromNumber(Long timestamp) {
// if cursor value is below given threshold, assume that it's not epoch timestamp but ordered id
if (timestamp < SECONDS_FROM_EPOCH_THRESHOLD) {
throw new IllegalArgumentException("Number cursor field below threshold: " + timestamp);
}
// if cursor value is above given threshold, then assume that it's Unix timestamp in milliseconds
if (timestamp > MILLIS_FROM_EPOCH_THRESHOLD) {
return Instant.ofEpochMilli(timestamp).toString();
}
return Instant.ofEpochSecond(timestamp).toString();
}

public static class CursorField {

private static final Set<String> STRING_TYPES = Set.of(
"STRING", "CHAR", "NCHAR", "NVARCHAR", "VARCHAR", "LONGVARCHAR", "DATE",
"TIME", "TIMESTAMP");
private static final Set<String> NUMBER_TYPES = Set.of(
"NUMBER", "TINYINT", "SMALLINT", "INT", "INTEGER", "BIGINT", "FLOAT", "DOUBLE",
"REAL", "NUMERIC", "DECIMAL");

private final List<String> path;
private final CursorType type;

public CursorField(List<String> path, CursorType type) {
this.path = path;
this.type = type;
}

protected static CursorField fromStream(ConfiguredAirbyteStream stream) {
List<String> cursorFieldList = stream.getCursorField();
JsonNode typeNode = getNestedNode(stream.getStream().getJsonSchema()
.get("properties"), cursorFieldList).get("type");
return new CursorField(cursorFieldList, getType(typeNode));
}

private static CursorType getType(JsonNode typeNode) {
CursorType type = CursorType.UNRECOGNIZED;
if (typeNode.isArray()) {
for (JsonNode e : typeNode) {
type = getType(e.asText().toUpperCase());
if (type != CursorType.UNRECOGNIZED) {
break;
}
}
return type;
}
return getType(typeNode.asText().toUpperCase());
}

private static CursorType getType(String typeString) {
if (STRING_TYPES.contains(typeString)) {
return CursorType.STRING;
}
if (NUMBER_TYPES.contains(typeString)) {
return CursorType.NUMBER;
}
return CursorType.UNRECOGNIZED;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
CursorField that = (CursorField) o;
return Objects.equals(path, that.path) && type == that.type;
}

@Override
public int hashCode() {
return Objects.hash(path, type);
}

}

private static JsonNode getNestedNode(JsonNode data, List<String> fieldNames) {
return fieldNames.stream().reduce(data, JsonNode::get, (first, second) -> second);
}

public Map<String, CursorField> getStreamCursorFields() {
public Map<String, List<String>> getStreamCursorFields() {
return streamCursorFields;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,17 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class KeenDestinationTest extends DestinationAcceptanceTest {

private static final String SECRET_FILE_PATH = "secrets/config.json";

private final KeenHttpClient keenHttpClient = new KeenHttpClient();
private final Set<String> collectionsToDelete = new HashSet<>();

private String projectId;
private String apiKey;
Expand Down Expand Up @@ -77,6 +80,7 @@ protected JsonNode getBaseConfigJson() {
@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv, String streamName, String namespace, JsonNode streamSchema) throws Exception {
String accentStrippedStreamName = KeenCharactersStripper.stripSpecialCharactersFromStreamName(streamName);
collectionsToDelete.add(accentStrippedStreamName);

ArrayNode array = keenHttpClient.extract(accentStrippedStreamName, projectId, apiKey);
return Lists.newArrayList(array.elements()).stream()
Expand All @@ -100,15 +104,10 @@ protected void setup(TestDestinationEnv testEnv) throws Exception {

@Override
protected void tearDown(TestDestinationEnv testEnv) throws Exception {
// Changes for this particular operation - get all collections - can take a couple more time to
// propagate
// than standard queries for the newly created collection
Thread.sleep(5000);
List<String> keenCollections = keenHttpClient.getAllCollectionsForProject(projectId, apiKey);

for (String keenCollection : keenCollections) {
for (String keenCollection : collectionsToDelete) {
keenHttpClient.eraseStream(keenCollection, projectId, apiKey);
}
collectionsToDelete.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.destination.keen.KeenTimestampService.CursorField;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
Expand All @@ -54,22 +53,20 @@ public class KeenTimestampServiceTest {
void shouldInitializeCursorFieldsFromCatalog() throws IOException {
ConfiguredAirbyteCatalog configuredCatalog = readConfiguredCatalogFromFile("cursors_catalog.json");

Map<String, KeenTimestampService.CursorField> expectedCursorFieldsMap = Map.ofEntries(
entry("StringTypeStream1", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.STRING)),
entry("StringTypeStream2", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.STRING)),
entry("StringTypeStream3", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.STRING)),
entry("NumberTypeStream1", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.NUMBER)),
entry("NumberTypeStream2", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.NUMBER)),
entry("ArrayTypeStream1", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.NUMBER)),
entry("ArrayTypeStream2", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.NUMBER)),
entry("ArrayTypeStream3", new KeenTimestampService.CursorField(List.of("property1"), KeenTimestampService.CursorType.NUMBER)),
entry("NestedCursorStream", new KeenTimestampService.CursorField(List.of("property1", "inside"), KeenTimestampService.CursorType.NUMBER))

);
Map<String, List<String>> expectedCursorFieldsMap = Map.ofEntries(
entry("StringTypeStream1", List.of("property1")),
entry("StringTypeStream2", List.of("property1")),
entry("StringTypeStream3", List.of("property1")),
entry("NumberTypeStream1", List.of("property1")),
entry("NumberTypeStream2", List.of("property1")),
entry("ArrayTypeStream1", List.of("property1")),
entry("ArrayTypeStream2", List.of("property1")),
entry("ArrayTypeStream3", List.of("property1")),
entry("NestedCursorStream", List.of("property1", "inside")));

KeenTimestampService keenTimestampService = new KeenTimestampService(configuredCatalog, true);

Map<String, KeenTimestampService.CursorField> cursorFieldMap = keenTimestampService.getStreamCursorFields();
Map<String, List<String>> cursorFieldMap = keenTimestampService.getStreamCursorFields();
Assertions.assertEquals(expectedCursorFieldsMap, cursorFieldMap);
}

Expand Down Expand Up @@ -134,7 +131,7 @@ void shouldInjectEmittedAtWhenCursorIsUnparsableAndRemoveFieldFromMap() throws I

KeenTimestampService keenTimestampService = new KeenTimestampService(configuredCatalog, true);

Map<String, CursorField> cursorFieldMap = keenTimestampService.getStreamCursorFields();
Map<String, List<String>> cursorFieldMap = keenTimestampService.getStreamCursorFields();
Assertions.assertEquals(cursorFieldMap.size(), 1);

AirbyteMessage message = buildMessageWithCursorValue(configuredCatalog, "some_text");
Expand Down
Loading