Skip to content

Commit

Permalink
add datadog.parse_record_headers property (#49)
Browse files Browse the repository at this point in the history
* add datadog.parse_record_headers property

* add README.md suggestions

* add README.md suggestions

* revent recordToJSON
  • Loading branch information
jm1221d authored May 22, 2024
1 parent 3ece8b1 commit 923c4d4
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 47 deletions.
56 changes: 28 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Datadog Kafka Connect Logs

`datadog-kafka-connect-logs` is a [Kafka Connector](http://kafka.apache.org/documentation.html#connect) for sending
`datadog-kafka-connect-logs` is a [Kafka Connector](http://kafka.apache.org/documentation.html#connect) for sending
records from Kafka as logs to the [Datadog Logs Intake API](https://docs.datadoghq.com/api/v1/logs/).

It is a plugin meant to be installed on a [Kafka Connect Cluster](https://docs.confluent.io/current/connect/) running
Expand All @@ -12,7 +12,7 @@ besides a [Kafka Broker](https://www.confluent.io/what-is-apache-kafka/).
2. Java 8 and above.
3. Confluent Platform 4.0.x and above (optional).

To install the plugin, one must have a working instance of Kafka Connect connected to a Kafka Broker. See also
To install the plugin, one must have a working instance of Kafka Connect connected to a Kafka Broker. See also
[Confluent's](https://www.confluent.io/product/confluent-platform/) documentation for easily setting this up.

## Installation and Setup
Expand All @@ -24,25 +24,24 @@ See [Confluent's documentation](https://docs.confluent.io/current/connect/managi
### Download from Github

Download the latest version from the GitHub [releases page](https://github.com/DataDog/datadog-kafka-connect-logs/releases).
Also see [Confluent's documentation](https://docs.confluent.io/current/connect/managing/community.html) on installing
Also see [Confluent's documentation](https://docs.confluent.io/current/connect/managing/community.html) on installing
community connectors.

### Build from Source

1. Clone the repo from https://github.com/DataDog/datadog-kafka-connect-logs
2. Verify that Java8 JRE or JDK is installed.
3. Run `mvn clean compile package`. This will build the jar in the `/target` directory. The name will be
`datadog-kafka-connect-logs-[VERSION].jar`.
3. Run `mvn clean compile package`. This builds the jar in the `/target` directory. The file name has the format `datadog-kafka-connect-logs-[VERSION].jar`.
4. The zip file for use on [Confluent Hub](https://www.confluent.io/hub/) can be found in `target/components/packages`.

## Quick Start

1. To install the plugin, place the plugin's jar file (see [previous section](#installation-and-setup) on how to download or build it)
in or under the location specified in `plugin.path` . If you use Confluent Platform, simply run
`confluent-hub install target/components/packages/<connector-zip-file>`.
in or under the location specified in `plugin.path` . If you use Confluent Platform, run
`confluent-hub install target/components/packages/<connector-zip-file>`.
2. Restart your Kafka Connect instance.
3. Run the following command to manually create connector tasks. Adjust `topics` to configure the Kafka topic to be
ingested and set your Datadog `api_key`.
3. Run the following command to manually create connector tasks. Adjust `topics` to configure the Kafka topic to be
ingested and set your Datadog `api_key`.

```
curl localhost:8083/connectors -X POST -H "Content-Type: application/json" -d '{
Expand All @@ -56,8 +55,8 @@ ingested and set your Datadog `api_key`.
}'
```

4. You can verify that data is ingested to the Datadog platform by searching for `source:kafka-connect` in the Log
Explorer tab
4. You can verify that data is ingested to the Datadog platform by searching for `source:kafka-connect` in the Log
Explorer tab
5. Use the following commands to check status, and manage connectors and tasks:

```
Expand Down Expand Up @@ -95,18 +94,19 @@ A REST call can be executed against one of the cluster instances, and the config
| `topics` | Comma separated list of Kafka topics for Datadog to consume. `prod-topic1,prod-topic2,prod-topic3`||
| `datadog.api_key` | The API key of your Datadog platform.||
#### General Optional Parameters
| Name | Description | Default Value |
|-------- |------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------|
| `datadog.site` | The site of the Datadog intake to send logs to (for example 'datadoghq.eu' to send data to the EU site) | `datadoghq.com` |
| Name | Description | Default Value |
|-------- |-------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------|
| `datadog.site` | The site of the Datadog intake to send logs to (for example 'datadoghq.eu' to send data to the EU site) | `datadoghq.com` |
| `datadog.url` | Custom Datadog URL endpoint where your logs will be sent. `datadog.url` takes precedence over `datadog.site`. Example: `http-intake.logs.datadoghq.com:443` ||
| `datadog.tags` | Tags associated with your logs in a comma separated tag:value format. ||
| `datadog.service` | The name of the application or service generating the log events. ||
| `datadog.hostname` | The name of the originating host of the log. ||
| `datadog.proxy.url` | Proxy endpoint when logs are not directly forwarded to Datadog. ||
| `datadog.proxy.port` | Proxy port when logs are not directly forwarded to Datadog. ||
| `datadog.retry.max` | The number of retries before the output plugin stops. | `5` ||
| `datadog.retry.backoff_ms` | The time in milliseconds to wait following an error before a retry attempt is made. | `3000` ||
| `datadog.add_published_date` | Valid settings are true or false. When set to `true`, The timestamp is retrieved from the Kafka record and passed to Datadog as `published_date` ||
| `datadog.tags` | Tags associated with your logs in a comma separated tag:value format. ||
| `datadog.service` | The name of the application or service generating the log events. ||
| `datadog.hostname` | The name of the originating host of the log. ||
| `datadog.proxy.url` | Proxy endpoint when logs are not directly forwarded to Datadog. ||
| `datadog.proxy.port` | Proxy port when logs are not directly forwarded to Datadog. ||
| `datadog.retry.max` | The number of retries before the output plugin stops. | `5` ||
| `datadog.retry.backoff_ms` | The time in milliseconds to wait following an error before a retry attempt is made. | `3000` ||
| `datadog.add_published_date` | Valid settings are true or false. When set to `true`, The timestamp is retrieved from the Kafka record and passed to Datadog as `published_date` ||
| `datadog.parse_record_headers` | Valid settings are true or false. When set to `true`, Kafka Record Headers are parsed and passed to DataDog as a `kafkaheaders` object |`false`|

### Troubleshooting performance

Expand All @@ -126,7 +126,7 @@ To improve performance of the connector, you can try the following options:

## Single Message Transforms

Kafka Connect supports Single Message Transforms that let you change the structure or content of a message. To
Kafka Connect supports Single Message Transforms that let you change the structure or content of a message. To
experiment with this feature, try adding these lines to your sink connector configuration:

```properties
Expand All @@ -135,7 +135,7 @@ transforms.addExtraField.type=org.apache.kafka.connect.transforms.InsertField$Va
transforms.addExtraField.static.field=extraField
transforms.addExtraField.static.value=extraValue
```
Now if you restart the sink connector and send some more test messages, each new record should have a `extraField` field
If you restart the sink connector and send some more test messages, each new record should have a `extraField` field
with value `value`. For more in-depth video, see [confluent's documentation](https://docs.confluent.io/current/connect/transforms/index.html).

## Testing
Expand All @@ -146,14 +146,14 @@ To run the supplied unit tests, run `mvn test` from the root of the project.

### System Tests

We use Confluent Platform for a batteries-included Kafka environment for local testing. Follow the guide
Use use Confluent Platform for a batteries-included Kafka environment for local testing. Follow the guide
[here](https://docs.confluent.io/current/quickstart/ce-quickstart.html) to install the Confluent Platform.

Then, install the [Confluent Kafka Datagen Connector](https://github.com/confluentinc/kafka-connect-datagen) to create
sample data of arbitrary types. Install this Datadog Logs Connector by running
Then, install the [Confluent Kafka Datagen Connector](https://github.com/confluentinc/kafka-connect-datagen) to create
sample data of arbitrary types. Install this Datadog Logs Connector by running
`confluent-hub install target/components/packages/<connector-zip-file>`.

In the `/test` directory there are some `.json` configuration files to make it easy to create Connectors. There are
In the `/test` directory, there are some `.json` configuration files to make it easy to create Connectors. There are
configurations for both the Datagen Connector with various datatypes, as well as the Datadog Logs Connector. To the latter,
you will need to add a valid Datadog API Key for once you upload the `.json` to Confluent Platform.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,42 @@ This product includes software developed at Datadog (https://www.datadoghq.com/)
package com.datadoghq.connect.logs.sink;

import com.datadoghq.connect.logs.util.Project;
import com.google.gson.*;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import javax.ws.rs.core.Response;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.zip.GZIPOutputStream;
import javax.ws.rs.core.Response;

import static java.util.stream.Collectors.toMap;
import static java.util.stream.StreamSupport.stream;

public class DatadogLogsApiWriter {
private final DatadogLogsSinkConnectorConfig config;
private static final Logger log = LoggerFactory.getLogger(DatadogLogsApiWriter.class);
private final DatadogLogsSinkConnectorConfig config;
private final Map<String, List<SinkRecord>> batches;
private final JsonConverter jsonConverter;

Expand All @@ -33,7 +50,7 @@ public DatadogLogsApiWriter(DatadogLogsSinkConnectorConfig config) {
this.batches = new HashMap<>();
this.jsonConverter = new JsonConverter();

Map<String,String> jsonConverterConfig = new HashMap<String,String>();
Map<String, String> jsonConverterConfig = new HashMap<>();
jsonConverterConfig.put("schemas.enable", "false");
jsonConverterConfig.put("decimal.format", "NUMERIC");

Expand All @@ -42,13 +59,14 @@ public DatadogLogsApiWriter(DatadogLogsSinkConnectorConfig config) {

/**
* Writes records to the Datadog Logs API.
*
* @param records to be written from the Source Broker to the Datadog Logs API.
* @throws IOException may be thrown if the connection to the API fails.
*/
public void write(Collection<SinkRecord> records) throws IOException {
for (SinkRecord record : records) {
if (!batches.containsKey(record.topic())) {
batches.put(record.topic(), new ArrayList<> (Collections.singletonList(record)));
batches.put(record.topic(), new ArrayList<>(Collections.singletonList(record)));
} else {
batches.get(record.topic()).add(record);
}
Expand All @@ -64,7 +82,7 @@ public void write(Collection<SinkRecord> records) throws IOException {

private void flushBatches() throws IOException {
// send any outstanding batches
for(Map.Entry<String,List<SinkRecord>> entry: batches.entrySet()) {
for (Map.Entry<String, List<SinkRecord>> entry : batches.entrySet()) {
sendBatch(entry.getKey());
}

Expand Down Expand Up @@ -97,20 +115,31 @@ private JsonArray formatBatch(String topic) {
}

JsonElement recordJSON = recordToJSON(record);
JsonObject message = populateMetadata(topic, recordJSON, record.timestamp());
JsonObject message = populateMetadata(topic, recordJSON, record.timestamp(), () -> kafkaHeadersToJsonElement(record));
batchRecords.add(message);
}

return batchRecords;
}

private JsonElement kafkaHeadersToJsonElement(SinkRecord sinkRecord) {
Map<String, Object> headerMap = stream(sinkRecord.headers().spliterator(), false)
.collect(toMap(Header::key, Header::value));

Gson gson = new Gson();

String jsonString = gson.toJson(headerMap);

return gson.fromJson(jsonString, JsonElement.class);
}

private JsonElement recordToJSON(SinkRecord record) {
byte[] rawJSONPayload = jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
String jsonPayload = new String(rawJSONPayload, StandardCharsets.UTF_8);
return new Gson().fromJson(jsonPayload, JsonElement.class);
}

private JsonObject populateMetadata(String topic, JsonElement message, Long timestamp) {
private JsonObject populateMetadata(String topic, JsonElement message, Long timestamp, Supplier<JsonElement> kafkaHeaders) {
JsonObject content = new JsonObject();
String tags = "topic:" + topic;
content.add("message", message);
Expand All @@ -119,6 +148,10 @@ private JsonObject populateMetadata(String topic, JsonElement message, Long time
content.add("published_date", new JsonPrimitive(timestamp));
}

if (config.parseRecordHeaders) {
content.add("kafkaheaders", kafkaHeaders.get());
}

if (config.ddTags != null) {
tags += "," + config.ddTags;
}
Expand Down Expand Up @@ -161,7 +194,7 @@ private void sendRequest(JsonArray content, URL url) throws IOException {
if (Response.Status.Family.familyOf(status) != Response.Status.Family.SUCCESSFUL) {
InputStream stream = con.getErrorStream();
String error = "";
if (stream != null ) {
if (stream != null) {
error = getOutput(stream);
}
con.disconnect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class DatadogLogsSinkConnectorConfig extends AbstractConfig {
private static final String DEFAULT_DD_SITE = "datadoghq.com";
public static final String DEFAULT_DD_URL = String.format(DD_URL_FORMAT_FROM_SITE, DEFAULT_DD_SITE);
public static final String ADD_PUBLISHED_DATE = "datadog.add_published_date";
public static final String PARSE_RECORD_HEADERS = "datadog.parse_record_headers";

// Respect limit documented at https://docs.datadoghq.com/api/?lang=bash#logs
public final Integer ddMaxBatchLength;
Expand All @@ -53,6 +54,7 @@ public class DatadogLogsSinkConnectorConfig extends AbstractConfig {
public final Integer retryMax;
public final Integer retryBackoffMs;
public final boolean addPublishedDate;
public final boolean parseRecordHeaders;

public static final ConfigDef CONFIG_DEF = baseConfigDef();

Expand All @@ -75,6 +77,7 @@ public DatadogLogsSinkConnectorConfig(Boolean useSSL, Integer ddMaxBatchLength,
this.ddSite = getString(DD_SITE);
this.ddMaxBatchLength = ddMaxBatchLength;
this.addPublishedDate = getBoolean(ADD_PUBLISHED_DATE);
this.parseRecordHeaders = getBoolean(PARSE_RECORD_HEADERS);
validateConfig();
}

Expand Down Expand Up @@ -175,7 +178,13 @@ private static void addMetadataConfigs(ConfigDef configDef) {
false,
null,
Importance.MEDIUM,
"Valid settings are true or false. When set to `true`, The timestamp is retrieved from the Kafka record and passed to Datadog as `published_date`");
"Valid settings are true or false. When set to `true`, The timestamp is retrieved from the Kafka record and passed to Datadog as `published_date`"
).define(PARSE_RECORD_HEADERS,
Type.BOOLEAN,
false,
null,
Importance.MEDIUM,
"Valid settings are true or false. When set to `true`, Kafka Record Headers will be parsed and passed to DataDog as `kafkaheaders` object");
}

private static void addProxyConfigs(ConfigDef configDef) {
Expand Down Expand Up @@ -250,4 +259,4 @@ private String getTags(String key) {

return null;
}
}
}
Loading

0 comments on commit 923c4d4

Please sign in to comment.