Skip to content

Commit

Permalink
Func test for aggregate records (#98)
Browse files Browse the repository at this point in the history
Adding setting  to ensure valid json can be enforced in the output of the http requests.

Update kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/JsonTidyTest.scala

Co-authored-by: Mati Urban <[email protected]>

Update kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala

Co-authored-by: Mati Urban <[email protected]>

Update kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpSinkTest.scala

Co-authored-by: Mati Urban <[email protected]>

Test fix

Tweaks

Formatting
  • Loading branch information
davidsloan authored Sep 20, 2024
1 parent e39ec93 commit 6caac97
Show file tree
Hide file tree
Showing 17 changed files with 305 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ trait HttpConfiguration extends LazyLogging {
headerTemplates: Seq[(String, String)],
topicName: String,
converters: Map[String, String],
batchSize: Int,
jsonTidy: Boolean,
): ConnectorConfiguration = {
val configMap: Map[String, ConfigValue[_]] = converters.view.mapValues(new ConfigValue[String](_)).toMap ++
Map(
Expand All @@ -29,7 +31,8 @@ trait HttpConfiguration extends LazyLogging {
HttpSinkConfigDef.HttpRequestContentProp -> ConfigValue(contentTemplate),
HttpSinkConfigDef.HttpRequestHeadersProp -> ConfigValue(headerTemplates.mkString(",")),
HttpSinkConfigDef.AuthenticationTypeProp -> ConfigValue("none"), //NoAuthentication
HttpSinkConfigDef.BatchCountProp -> ConfigValue(1),
HttpSinkConfigDef.BatchCountProp -> ConfigValue(batchSize),
HttpSinkConfigDef.JsonTidyProp -> ConfigValue(jsonTidy),
ERROR_REPORTING_ENABLED_PROP -> ConfigValue("false"),
SUCCESS_REPORTING_ENABLED_PROP -> ConfigValue("false"),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.lenses.streamreactor.connect.test

import cats.implicits._
import cats.effect.IO
import cats.effect.kernel.Resource
import cats.effect.testing.scalatest.AsyncIOSpec
Expand Down Expand Up @@ -34,6 +35,8 @@ class HttpSinkTest
with LazyLogging
with EitherValues
with HttpConfiguration {
private val BatchSizeSingleRecord = 1
private val BatchSizeMultipleRecords = 2

private val stringSerializer = classOf[StringSerializer]
private val stringProducer = createProducer[String, String](stringSerializer, stringSerializer)
Expand Down Expand Up @@ -63,11 +66,12 @@ class HttpSinkTest
.withNetwork(network)

override val connectorModule: String = "http"
private var randomTestId = UUID.randomUUID().toString
private def topic = "topic" + randomTestId
private var randomTestId: String = _
private var topic: String = _

before {
randomTestId = UUID.randomUUID().toString
topic = "topic" + randomTestId
}

override def beforeAll(): Unit = {
Expand All @@ -85,12 +89,14 @@ class HttpSinkTest
setUpWiremockResponse()

val record = "My First Record"
sendRecordWithProducer(stringProducer,
stringConverters,
randomTestId,
topic,
record,
"My Static Content Template",
sendRecordsWithProducer(stringProducer,
stringConverters,
randomTestId,
topic,
"My Static Content Template",
BatchSizeSingleRecord,
false,
record,
).asserting {
requests =>
requests.size should be(1)
Expand All @@ -105,26 +111,36 @@ class HttpSinkTest
setUpWiremockResponse()

val record = "My First Record"
sendRecordWithProducer(stringProducer, stringConverters, randomTestId, topic, record, "{{value}}").asserting {
sendRecordsWithProducer(stringProducer,
stringConverters,
randomTestId,
topic,
record,
BatchSizeSingleRecord,
false,
"{{value}}",
).asserting {
requests =>
requests.size should be(1)
val firstRequest = requests.head
new String(firstRequest.getBody) should be(record)
firstRequest.getMethod should be(RequestMethod.POST)
new String(firstRequest.getBody) should be("My First Record")
}
}

test("dynamic string template containing json message fields should be sent to endpoint") {

setUpWiremockResponse()

sendRecordWithProducer[String, Order](
sendRecordsWithProducer[String, Order](
orderProducer,
jsonConverters,
randomTestId,
topic,
Order(1, "myOrder product", 1.3d, 10),
"product: {{value.product}}",
BatchSizeSingleRecord,
false,
Order(1, "myOrder product", 1.3d, 10),
).asserting {
requests =>
requests.size should be(1)
Expand All @@ -138,13 +154,15 @@ class HttpSinkTest

setUpWiremockResponse()

sendRecordWithProducer[String, Order](
sendRecordsWithProducer[String, Order](
orderProducer,
stringConverters,
randomTestId,
topic,
Order(1, "myOrder product", 1.3d, 10),
"whole product message: {{value}}",
BatchSizeSingleRecord,
false,
Order(1, "myOrder product", 1.3d, 10),
).asserting {
requests =>
requests.size should be(1)
Expand All @@ -156,18 +174,45 @@ class HttpSinkTest
}
}

test("batched dynamic string template containing whole json message should be sent to endpoint") {

setUpWiremockResponse()

sendRecordsWithProducer[String, Order](
orderProducer,
stringConverters,
randomTestId,
topic,
"{\"data\":[{{#message}}{{value}},{{/message}}]}",
BatchSizeMultipleRecords,
true,
Order(1, "myOrder product", 1.3d, 10),
Order(2, "another product", 1.4d, 109),
).asserting {
requests =>
requests.size should be(1)
val firstRequest = requests.head
firstRequest.getMethod should be(RequestMethod.POST)
new String(firstRequest.getBody) should be(
"{\"data\":[{\"id\":1,\"product\":\"myOrder product\",\"price\":1.3,\"qty\":10,\"created\":null},{\"id\":2,\"product\":\"another product\",\"price\":1.4,\"qty\":109,\"created\":null}]}",
)
}
}

test("dynamic string template containing avro message fields should be sent to endpoint") {

setUpWiremockResponse()

val order = Order(1, "myOrder product", 1.3d, 10, "March").toRecord
sendRecordWithProducer[String, GenericRecord](
sendRecordsWithProducer[String, GenericRecord](
avroOrderProducer,
avroConverters,
randomTestId,
topic,
order,
"product: {{value.product}}",
BatchSizeSingleRecord,
false,
order,
).asserting {
requests =>
requests.size should be(1)
Expand All @@ -188,25 +233,30 @@ class HttpSinkTest
()
}

private def sendRecordWithProducer[K, V](
private def sendRecordsWithProducer[K, V](
producer: Resource[IO, KafkaProducer[K, V]],
converters: Map[String, String],
randomTestId: String,
topic: String,
record: V,
contentTemplate: String,
batchSize: Int,
jsonTidy: Boolean,
record: V*,
): IO[List[LoggedRequest]] =
producer.use {
producer =>
createConnectorResource(randomTestId, topic, contentTemplate, converters).use {
createConnectorResource(randomTestId, topic, contentTemplate, converters, batchSize, jsonTidy).use {
_ =>
IO {
sendRecord[K, V](topic, producer, record)
eventually {
verify(postRequestedFor(urlEqualTo(s"/$randomTestId")))
findAll(postRequestedFor(urlEqualTo(s"/$randomTestId"))).asScala.toList
record.map {
rec => IO(sendRecord[K, V](topic, producer, rec))
}.sequence
.map { _ =>
eventually {
verify(postRequestedFor(urlEqualTo(s"/$randomTestId")))
findAll(postRequestedFor(urlEqualTo(s"/$randomTestId"))).asScala.toList
}
}
}

}
}
private def sendRecord[K, V](topic: String, producer: KafkaProducer[K, V], record: V): Unit = {
Expand All @@ -219,6 +269,8 @@ class HttpSinkTest
topic: String,
contentTemplate: String,
converters: Map[String, String],
batchSize: Int,
jsonTidy: Boolean,
): Resource[IO, String] =
createConnector(
sinkConfig(
Expand All @@ -229,6 +281,8 @@ class HttpSinkTest
Seq(),
topic,
converters,
batchSize,
jsonTidy,
),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class HttpWriter(
recordsQueueRef: Ref[IO, Queue[RenderedRecord]],
commitContextRef: Ref[IO, HttpCommitContext],
errorThreshold: Int,
tidyJson: Boolean,
errorReporter: ReportingController,
successReporter: ReportingController,
) extends LazyLogging {
Expand Down Expand Up @@ -177,7 +178,7 @@ class HttpWriter(

private def flush(records: Seq[RenderedRecord]): IO[ProcessedTemplate] =
for {
processed <- IO.fromEither(template.process(records))
processed <- IO.fromEither(template.process(records, tidyJson))
_ <- reportResult(records, processed, sender.sendHttpRequest(processed))
} yield processed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ object HttpWriterManager extends StrictLogging {
terminate,
config.errorThreshold,
config.uploadSyncPeriod,
config.tidyJson,
config.errorReportingController,
config.successReportingController,
)
Expand All @@ -130,6 +131,7 @@ class HttpWriterManager(
deferred: Deferred[IO, Either[Throwable, Unit]],
errorThreshold: Int,
uploadSyncPeriod: Int,
tidyJson: Boolean,
errorReportingController: ReportingController,
successReportingController: ReportingController,
)(
Expand All @@ -146,6 +148,7 @@ class HttpWriterManager(
Ref.unsafe[IO, Queue[RenderedRecord]](Queue()),
Ref.unsafe[IO, HttpCommitContext](HttpCommitContext.default(sinkName)),
errorThreshold,
tidyJson,
errorReportingController,
successReportingController,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ case class HttpSinkConfig(
uploadSyncPeriod: Int,
retries: RetriesConfig,
timeout: TimeoutConfig,
tidyJson: Boolean,
errorReportingController: ReportingController,
successReportingController: ReportingController,
)
Expand Down Expand Up @@ -120,6 +121,7 @@ object HttpSinkConfig {
retries = RetriesConfig(maxRetries, maxTimeoutMs, onStatusCodes)
connectionTimeoutMs = connectConfig.getInt(HttpSinkConfigDef.ConnectionTimeoutMsProp)
timeout = TimeoutConfig(connectionTimeoutMs)
jsonTidy = connectConfig.getBoolean(HttpSinkConfigDef.JsonTidyProp)
errorReportingController = createAndStartController(new ErrorReportingController(connectConfig))
successReportingController = createAndStartController(new SuccessReportingController(connectConfig))
} yield HttpSinkConfig(
Expand All @@ -134,6 +136,7 @@ object HttpSinkConfig {
uploadSyncPeriod,
retries,
timeout,
jsonTidy,
errorReportingController,
successReportingController,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ object HttpSinkConfigDef {
|The password for basic authentication.
|""".stripMargin

val JsonTidyProp: String = "connect.http.json.tidy"
val JsonTidyPropDoc: String =
"""
|Tidy the output json.
|""".stripMargin

val config: ConfigDef = {
val configDef = new ConfigDef()
.withClientSslSupport()
Expand Down Expand Up @@ -253,6 +259,13 @@ object HttpSinkConfigDef {
Importance.HIGH,
BasicAuthenticationPasswordDoc,
)
.define(
JsonTidyProp,
Type.BOOLEAN,
false,
Importance.HIGH,
JsonTidyPropDoc,
)
ReporterConfig.withErrorRecordReportingSupport(configDef)
ReporterConfig.withSuccessRecordReportingSupport(configDef)
OAuth2Config.append(configDef)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink.tpl

import com.typesafe.scalalogging.LazyLogging
import org.json4s._
import org.json4s.native.JsonMethods._

object JsonTidy extends LazyLogging {
implicit val formats: Formats = DefaultFormats

def cleanUp(jsonString: String): String =
try {
val json = parse(jsonString)
compact(render(json))
} catch {
case e: Exception =>
logger.error("Error formatting json", e)
jsonString
}
}
Loading

0 comments on commit 6caac97

Please sign in to comment.