Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add datadog.parse_record_headers property #49

Merged
merged 4 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 18 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Datadog Kafka Connect Logs

`datadog-kafka-connect-logs` is a [Kafka Connector](http://kafka.apache.org/documentation.html#connect) for sending
`datadog-kafka-connect-logs` is a [Kafka Connector](http://kafka.apache.org/documentation.html#connect) for sending
records from Kafka as logs to the [Datadog Logs Intake API](https://docs.datadoghq.com/api/v1/logs/).

It is a plugin meant to be installed on a [Kafka Connect Cluster](https://docs.confluent.io/current/connect/) running
Expand All @@ -12,7 +12,7 @@ besides a [Kafka Broker](https://www.confluent.io/what-is-apache-kafka/).
2. Java 8 and above.
3. Confluent Platform 4.0.x and above (optional).

To install the plugin, one must have a working instance of Kafka Connect connected to a Kafka Broker. See also
To install the plugin, one must have a working instance of Kafka Connect connected to a Kafka Broker. See also
[Confluent's](https://www.confluent.io/product/confluent-platform/) documentation for easily setting this up.

## Installation and Setup
Expand All @@ -24,25 +24,25 @@ See [Confluent's documentation](https://docs.confluent.io/current/connect/managi
### Download from Github

Download the latest version from the GitHub [releases page](https://github.com/DataDog/datadog-kafka-connect-logs/releases).
Also see [Confluent's documentation](https://docs.confluent.io/current/connect/managing/community.html) on installing
Also see [Confluent's documentation](https://docs.confluent.io/current/connect/managing/community.html) on installing
community connectors.

### Build from Source

1. Clone the repo from https://github.com/DataDog/datadog-kafka-connect-logs
2. Verify that Java8 JRE or JDK is installed.
3. Run `mvn clean compile package`. This will build the jar in the `/target` directory. The name will be
`datadog-kafka-connect-logs-[VERSION].jar`.
3. Run `mvn clean compile package`. This will build the jar in the `/target` directory. The name will be
jm1221d marked this conversation as resolved.
Show resolved Hide resolved
`datadog-kafka-connect-logs-[VERSION].jar`.
4. The zip file for use on [Confluent Hub](https://www.confluent.io/hub/) can be found in `target/components/packages`.

## Quick Start

1. To install the plugin, place the plugin's jar file (see [previous section](#installation-and-setup) on how to download or build it)
in or under the location specified in `plugin.path` . If you use Confluent Platform, simply run
`confluent-hub install target/components/packages/<connector-zip-file>`.
in or under the location specified in `plugin.path` . If you use Confluent Platform, simply run
jm1221d marked this conversation as resolved.
Show resolved Hide resolved
`confluent-hub install target/components/packages/<connector-zip-file>`.
2. Restart your Kafka Connect instance.
3. Run the following command to manually create connector tasks. Adjust `topics` to configure the Kafka topic to be
ingested and set your Datadog `api_key`.
3. Run the following command to manually create connector tasks. Adjust `topics` to configure the Kafka topic to be
ingested and set your Datadog `api_key`.

```
curl localhost:8083/connectors -X POST -H "Content-Type: application/json" -d '{
Expand All @@ -56,8 +56,8 @@ ingested and set your Datadog `api_key`.
}'
```

4. You can verify that data is ingested to the Datadog platform by searching for `source:kafka-connect` in the Log
Explorer tab
4. You can verify that data is ingested to the Datadog platform by searching for `source:kafka-connect` in the Log
Explorer tab
5. Use the following commands to check status, and manage connectors and tasks:

```
Expand Down Expand Up @@ -107,6 +107,7 @@ A REST call can be executed against one of the cluster instances, and the config
| `datadog.retry.max` | The number of retries before the output plugin stops. | `5` ||
| `datadog.retry.backoff_ms` | The time in milliseconds to wait following an error before a retry attempt is made. | `3000` ||
| `datadog.add_published_date` | Valid settings are true or false. When set to `true`, The timestamp is retrieved from the Kafka record and passed to Datadog as `published_date` ||
| `datadog.parse_record_headers` | Valid settings are true or false. When set to `true`, Kafka Record Headers will be parsed and passed to DataDog as `kafkaheaders` object |`false`|
jm1221d marked this conversation as resolved.
Show resolved Hide resolved

### Troubleshooting performance

Expand All @@ -126,7 +127,7 @@ To improve performance of the connector, you can try the following options:

## Single Message Transforms

Kafka Connect supports Single Message Transforms that let you change the structure or content of a message. To
Kafka Connect supports Single Message Transforms that let you change the structure or content of a message. To
experiment with this feature, try adding these lines to your sink connector configuration:

```properties
Expand All @@ -135,7 +136,7 @@ transforms.addExtraField.type=org.apache.kafka.connect.transforms.InsertField$Va
transforms.addExtraField.static.field=extraField
transforms.addExtraField.static.value=extraValue
```
Now if you restart the sink connector and send some more test messages, each new record should have a `extraField` field
Now if you restart the sink connector and send some more test messages, each new record should have a `extraField` field
jm1221d marked this conversation as resolved.
Show resolved Hide resolved
with value `value`. For more in-depth video, see [confluent's documentation](https://docs.confluent.io/current/connect/transforms/index.html).

## Testing
Expand All @@ -146,14 +147,14 @@ To run the supplied unit tests, run `mvn test` from the root of the project.

### System Tests

We use Confluent Platform for a batteries-included Kafka environment for local testing. Follow the guide
We use Confluent Platform for a batteries-included Kafka environment for local testing. Follow the guide
jm1221d marked this conversation as resolved.
Show resolved Hide resolved
[here](https://docs.confluent.io/current/quickstart/ce-quickstart.html) to install the Confluent Platform.

Then, install the [Confluent Kafka Datagen Connector](https://github.com/confluentinc/kafka-connect-datagen) to create
sample data of arbitrary types. Install this Datadog Logs Connector by running
Then, install the [Confluent Kafka Datagen Connector](https://github.com/confluentinc/kafka-connect-datagen) to create
sample data of arbitrary types. Install this Datadog Logs Connector by running
`confluent-hub install target/components/packages/<connector-zip-file>`.

In the `/test` directory there are some `.json` configuration files to make it easy to create Connectors. There are
In the `/test` directory there are some `.json` configuration files to make it easy to create Connectors. There are
jm1221d marked this conversation as resolved.
Show resolved Hide resolved
configurations for both the Datagen Connector with various datatypes, as well as the Datadog Logs Connector. To the latter,
you will need to add a valid Datadog API Key for once you upload the `.json` to Confluent Platform.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,53 @@ This product includes software developed at Datadog (https://www.datadoghq.com/)
package com.datadoghq.connect.logs.sink;

import com.datadoghq.connect.logs.util.Project;
import com.google.gson.*;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import javax.ws.rs.core.Response;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.zip.GZIPOutputStream;
import javax.ws.rs.core.Response;

import static java.util.stream.Collectors.toMap;
import static java.util.stream.StreamSupport.stream;

public class DatadogLogsApiWriter {
private final DatadogLogsSinkConnectorConfig config;
private static final Logger log = LoggerFactory.getLogger(DatadogLogsApiWriter.class);
private final DatadogLogsSinkConnectorConfig config;
private final Map<String, List<SinkRecord>> batches;
private final JsonConverter jsonConverter;
private final Gson gson;

public DatadogLogsApiWriter(DatadogLogsSinkConnectorConfig config) {
this.config = config;
this.batches = new HashMap<>();
this.jsonConverter = new JsonConverter();
this.gson = new Gson();

Map<String,String> jsonConverterConfig = new HashMap<String,String>();
Map<String, String> jsonConverterConfig = new HashMap<>();
jsonConverterConfig.put("schemas.enable", "false");
jsonConverterConfig.put("decimal.format", "NUMERIC");

Expand All @@ -42,13 +61,14 @@ public DatadogLogsApiWriter(DatadogLogsSinkConnectorConfig config) {

/**
* Writes records to the Datadog Logs API.
*
* @param records to be written from the Source Broker to the Datadog Logs API.
* @throws IOException may be thrown if the connection to the API fails.
*/
public void write(Collection<SinkRecord> records) throws IOException {
for (SinkRecord record : records) {
if (!batches.containsKey(record.topic())) {
batches.put(record.topic(), new ArrayList<> (Collections.singletonList(record)));
batches.put(record.topic(), new ArrayList<>(Collections.singletonList(record)));
} else {
batches.get(record.topic()).add(record);
}
Expand All @@ -64,7 +84,7 @@ public void write(Collection<SinkRecord> records) throws IOException {

private void flushBatches() throws IOException {
// send any outstanding batches
for(Map.Entry<String,List<SinkRecord>> entry: batches.entrySet()) {
for (Map.Entry<String, List<SinkRecord>> entry : batches.entrySet()) {
sendBatch(entry.getKey());
}

Expand Down Expand Up @@ -97,20 +117,29 @@ private JsonArray formatBatch(String topic) {
}

JsonElement recordJSON = recordToJSON(record);
JsonObject message = populateMetadata(topic, recordJSON, record.timestamp());
JsonObject message = populateMetadata(topic, recordJSON, record.timestamp(), () -> kafkaHeadersToJsonElement(record));
remeh marked this conversation as resolved.
Show resolved Hide resolved
batchRecords.add(message);
}

return batchRecords;
}

private JsonElement kafkaHeadersToJsonElement(SinkRecord sinkRecord) {
Map<String, Object> headerMap = stream(sinkRecord.headers().spliterator(), false)
.collect(toMap(Header::key, Header::value));

String jsonString = gson.toJson(headerMap);

return gson.fromJson(jsonString, JsonElement.class);
remeh marked this conversation as resolved.
Show resolved Hide resolved
}

private JsonElement recordToJSON(SinkRecord record) {
byte[] rawJSONPayload = jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
String jsonPayload = new String(rawJSONPayload, StandardCharsets.UTF_8);
return new Gson().fromJson(jsonPayload, JsonElement.class);
return gson.fromJson(jsonPayload, JsonElement.class);
remeh marked this conversation as resolved.
Show resolved Hide resolved
}

private JsonObject populateMetadata(String topic, JsonElement message, Long timestamp) {
private JsonObject populateMetadata(String topic, JsonElement message, Long timestamp, Supplier<JsonElement> kafkaHeaders) {
JsonObject content = new JsonObject();
String tags = "topic:" + topic;
content.add("message", message);
Expand All @@ -119,6 +148,10 @@ private JsonObject populateMetadata(String topic, JsonElement message, Long time
content.add("published_date", new JsonPrimitive(timestamp));
}

if (config.parseRecordHeaders) {
content.add("kafkaheaders", kafkaHeaders.get());
}

if (config.ddTags != null) {
tags += "," + config.ddTags;
}
Expand Down Expand Up @@ -161,7 +194,7 @@ private void sendRequest(JsonArray content, URL url) throws IOException {
if (Response.Status.Family.familyOf(status) != Response.Status.Family.SUCCESSFUL) {
InputStream stream = con.getErrorStream();
String error = "";
if (stream != null ) {
if (stream != null) {
error = getOutput(stream);
}
con.disconnect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class DatadogLogsSinkConnectorConfig extends AbstractConfig {
private static final String DEFAULT_DD_SITE = "datadoghq.com";
public static final String DEFAULT_DD_URL = String.format(DD_URL_FORMAT_FROM_SITE, DEFAULT_DD_SITE);
public static final String ADD_PUBLISHED_DATE = "datadog.add_published_date";
public static final String PARSE_RECORD_HEADERS = "datadog.parse_record_headers";

// Respect limit documented at https://docs.datadoghq.com/api/?lang=bash#logs
public final Integer ddMaxBatchLength;
Expand All @@ -53,6 +54,7 @@ public class DatadogLogsSinkConnectorConfig extends AbstractConfig {
public final Integer retryMax;
public final Integer retryBackoffMs;
public final boolean addPublishedDate;
public final boolean parseRecordHeaders;

public static final ConfigDef CONFIG_DEF = baseConfigDef();

Expand All @@ -75,6 +77,7 @@ public DatadogLogsSinkConnectorConfig(Boolean useSSL, Integer ddMaxBatchLength,
this.ddSite = getString(DD_SITE);
this.ddMaxBatchLength = ddMaxBatchLength;
this.addPublishedDate = getBoolean(ADD_PUBLISHED_DATE);
this.parseRecordHeaders = getBoolean(PARSE_RECORD_HEADERS);
validateConfig();
}

Expand Down Expand Up @@ -175,7 +178,13 @@ private static void addMetadataConfigs(ConfigDef configDef) {
false,
null,
Importance.MEDIUM,
"Valid settings are true or false. When set to `true`, The timestamp is retrieved from the Kafka record and passed to Datadog as `published_date`");
"Valid settings are true or false. When set to `true`, The timestamp is retrieved from the Kafka record and passed to Datadog as `published_date`"
).define(PARSE_RECORD_HEADERS,
Type.BOOLEAN,
false,
null,
Importance.MEDIUM,
"Valid settings are true or false. When set to `true`, Kafka Record Headers will be parsed and passed to DataDog as `kafkaheaders` object");
}

private static void addProxyConfigs(ConfigDef configDef) {
Expand Down Expand Up @@ -250,4 +259,4 @@ private String getTags(String key) {

return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,34 @@ This product includes software developed at Datadog (https://www.datadoghq.com/)
import com.datadoghq.connect.logs.sink.util.RequestInfo;
import com.datadoghq.connect.logs.sink.util.RestHelper;
import com.datadoghq.connect.logs.util.Project;

import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.math.BigDecimal;
import java.math.BigInteger;

public class DatadogLogsApiWriterTest {
private static String apiKey = "API_KEY";
private Map<String, String> props;
private List<SinkRecord> records;
private RestHelper restHelper;
private static String apiKey = "API_KEY";

@Before
public void setUp() throws Exception {
Expand Down Expand Up @@ -153,7 +156,7 @@ public void writer_IOException_for_status_429() throws Exception {
DatadogLogsApiWriter writer = new DatadogLogsApiWriter(config);

restHelper.setHttpStatusCode(429);
records.add(new SinkRecord("someTopic", 0, null, "someKey", null, "someValue1", 0));
records.add(new SinkRecord("someTopic", 0, null, "someKey", null, "someValue1", 0));
writer.write(records);
}

Expand Down Expand Up @@ -194,4 +197,50 @@ public void writer_withUseRecordTimeStampEnabled_shouldPopulateRecordTimestamp()
System.out.println(request.getBody());
Assert.assertEquals("[{\"message\":\"someValue1\",\"ddsource\":\"kafka-connect\",\"published_date\":1713974401224,\"ddtags\":\"topic:someTopic\"},{\"message\":\"someValue2\",\"ddsource\":\"kafka-connect\",\"published_date\":1713974401224,\"ddtags\":\"topic:someTopic\"}]", request.getBody());
}

@Test
public void writer_parse_record_headers_enabled() throws IOException {
props.put(DatadogLogsSinkConnectorConfig.PARSE_RECORD_HEADERS, "true");
DatadogLogsSinkConnectorConfig config = new DatadogLogsSinkConnectorConfig(false, 2, props);
DatadogLogsApiWriter writer = new DatadogLogsApiWriter(config);


Schema keySchema = Schema.INT32_SCHEMA;
Schema valueSchema = SchemaBuilder.struct()
.field("field1", Schema.STRING_SCHEMA)
.field("field2", Schema.INT32_SCHEMA)
.build();

Integer key = 123;
Struct value = new Struct(valueSchema)
.put("field1", "value1")
.put("field2", 456);

Headers headers = new ConnectHeaders();
headers.addString("headerKey", "headerValue");

long recordTime = 1713974401224L;

SinkRecord sinkRecord = new SinkRecord("topicName", 0, keySchema, key, valueSchema, value,
100L, recordTime, null, headers);

records.add(sinkRecord);
records.add(new SinkRecord("someTopic", 0, null, "someKey", null,
"someValue1", 0, recordTime, TimestampType.CREATE_TIME));
writer.write(records);

Assert.assertEquals(2, restHelper.getCapturedRequests().size());

RequestInfo requestWithHeaders = restHelper.getCapturedRequests().get(0);
RequestInfo requestWithoutHeaders = restHelper.getCapturedRequests().get(1);

Set<String> requestBodySetActual = new HashSet<>();
requestBodySetActual.add(requestWithHeaders.getBody());
requestBodySetActual.add(requestWithoutHeaders.getBody());
Set<String> requestBodySetExpected = new HashSet<>();
requestBodySetExpected.add("[{\"message\":{\"field1\":\"value1\",\"field2\":456},\"ddsource\":\"kafka-connect\",\"kafkaheaders\":{\"headerKey\":\"headerValue\"},\"ddtags\":\"topic:topicName\"}]");
requestBodySetExpected.add("[{\"message\":\"someValue1\",\"ddsource\":\"kafka-connect\",\"kafkaheaders\":{},\"ddtags\":\"topic:someTopic\"}]");
Assert.assertEquals(requestBodySetExpected, requestBodySetActual);
props.remove(DatadogLogsSinkConnectorConfig.PARSE_RECORD_HEADERS);
}
}