Skip to content

Commit

Permalink
Merge pull request #2270 from DataDog/xgouchet/RUM-6235/sse_request
Browse files Browse the repository at this point in the history
RUM-6235 Handle sse request
  • Loading branch information
xgouchet authored Sep 19, 2024
2 parents 529e791 + 4fc8a54 commit a6bd6a5
Show file tree
Hide file tree
Showing 13 changed files with 295 additions and 33 deletions.
1 change: 1 addition & 0 deletions detekt_custom.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1237,6 +1237,7 @@ datadog:
- "okhttp3.Response.code()"
- "okhttp3.Response.header(kotlin.String, kotlin.String?)"
- "okhttp3.ResponseBody.contentLength()"
- "okhttp3.ResponseBody.contentType()"
- "okio.Buffer.constructor()"
# endregion
# region org.json
Expand Down
15 changes: 11 additions & 4 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ timber = "5.0.1"
coroutines = "1.4.2"

# Local Server
ktor = "1.6.0"
ktor = "1.6.8"
ktorServer = "3.0.0-rc-1"

# Otel
jctools = "3.3.0"
Expand Down Expand Up @@ -233,8 +234,10 @@ coroutinesCore = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", ver

# Local Server
ktorCore = { module = "io.ktor:ktor", version.ref = "ktor" }
ktorNetty = { module = "io.ktor:ktor-server-netty", version.ref = "ktor" }
ktorGson = { module = "io.ktor:ktor-gson", version.ref = "ktor" }
ktorServerCore = { module = "io.ktor:ktor-server-core", version.ref = "ktorServer" }
ktorServerNetty = { module = "io.ktor:ktor-server-netty", version.ref = "ktorServer" }
ktorServerSSE = { module = "io.ktor:ktor-server-sse", version.ref = "ktorServer" }

# Otel
jctools = { module = "org.jctools:jctools-core", version.ref = "jctools" }
Expand Down Expand Up @@ -306,8 +309,12 @@ glide = [

ktor = [
"ktorCore",
"ktorNetty",
"ktorGson"
"ktorGson",
]
ktorServer = [
"ktorServerCore",
"ktorServerNetty",
"ktorServerSSE"
]

traceCore = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,13 +357,21 @@ internal constructor(

private fun getBodyLength(response: Response, internalLogger: InternalLogger): Long? {
return try {
val body = response.body ?: return null
val body = response.body
val contentType = body?.contentType()?.let {
// manually rebuild the mimetype as `toString()` can also include the charsets
it.type + "/" + it.subtype
}
val isStream = contentType in STREAM_CONTENT_TYPES
val isWebSocket = !response.header(WEBSOCKET_ACCEPT_HEADER, null).isNullOrBlank()
if (body == null || isStream || isWebSocket) {
return null
}
// if there is a Content-Length available, we can read it directly
// however, OkHttp will drop Content-Length header if transparent compression is
// used (since the value reported cannot be applied to decompressed body), so to be
// able to still read it, we force decompression by calling peekBody
body.contentLengthOrNull() ?: response.peekBody(MAX_BODY_PEEK)
.contentLengthOrNull()
body.contentLengthOrNull() ?: response.peekBody(MAX_BODY_PEEK).contentLengthOrNull()
} catch (e: IOException) {
internalLogger.log(
InternalLogger.Level.ERROR,
Expand Down Expand Up @@ -481,6 +489,15 @@ internal constructor(

internal companion object {

internal val STREAM_CONTENT_TYPES = setOf(
"text/event-stream",
"application/grpc",
"application/grpc+proto",
"application/grpc+json"
)

internal const val WEBSOCKET_ACCEPT_HEADER = "Sec-WebSocket-Accept"

internal const val WARN_RUM_DISABLED =
"You set up a DatadogInterceptor for %s, but RUM features are disabled. " +
"Make sure you initialized the Datadog SDK with a valid Application Id, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import fr.xgouchet.elmyr.junit5.ForgeConfiguration
import fr.xgouchet.elmyr.junit5.ForgeExtension
import io.opentracing.Tracer
import okhttp3.MediaType
import okhttp3.MediaType.Companion.toMediaType
import okhttp3.Protocol
import okhttp3.Response
import okhttp3.ResponseBody
Expand Down Expand Up @@ -221,6 +222,88 @@ internal class DatadogInterceptorTest : TracingInterceptorNotSendingSpanTest() {
}
}

@Test
fun `M start and stop RUM Resource W intercept() {successful streaming request}`(
@IntForgery(min = 200, max = 300) statusCode: Int,
forge: Forge
) {
// Given
val mimeType = forge.anElementFrom(DatadogInterceptor.STREAM_CONTENT_TYPES)
fakeMediaType = mimeType.toMediaType()
stubChain(mockChain, statusCode)
val expectedStartAttrs = emptyMap<String, Any?>()
val expectedStopAttrs = mapOf(
RumAttributes.TRACE_ID to fakeTraceIdAsString,
RumAttributes.SPAN_ID to fakeSpanId,
RumAttributes.RULE_PSR to fakeTracingSampleRate
) + fakeAttributes
val kind = RumResourceKind.NATIVE

// When
testedInterceptor.intercept(mockChain)

// Then
inOrder(rumMonitor.mockInstance) {
argumentCaptor<ResourceId> {
verify(rumMonitor.mockInstance).startResource(
capture(),
eq(fakeMethod),
eq(fakeUrl),
eq(expectedStartAttrs)
)
verify(rumMonitor.mockInstance).stopResource(
capture(),
eq(statusCode),
eq(null),
eq(kind),
eq(expectedStopAttrs)
)
assertThat(firstValue).isEqualTo(secondValue)
}
}
}

@Test
fun `M start and stop RUM Resource W intercept() {successful websocket request}`(
@IntForgery(min = 200, max = 300) statusCode: Int,
@StringForgery websocketHash: String
) {
// Given
stubChain(mockChain, statusCode) {
header("Sec-WebSocket-Accept", websocketHash)
}
val expectedStartAttrs = emptyMap<String, Any?>()
val expectedStopAttrs = mapOf(
RumAttributes.TRACE_ID to fakeTraceIdAsString,
RumAttributes.SPAN_ID to fakeSpanId,
RumAttributes.RULE_PSR to fakeTracingSampleRate
) + fakeAttributes
val kind = RumResourceKind.NATIVE

// When
testedInterceptor.intercept(mockChain)

// Then
inOrder(rumMonitor.mockInstance) {
argumentCaptor<ResourceId> {
verify(rumMonitor.mockInstance).startResource(
capture(),
eq(fakeMethod),
eq(fakeUrl),
eq(expectedStartAttrs)
)
verify(rumMonitor.mockInstance).stopResource(
capture(),
eq(statusCode),
eq(null),
eq(kind),
eq(expectedStopAttrs)
)
assertThat(firstValue).isEqualTo(secondValue)
}
}
}

@Test
fun `M start and stop RUM Resource W intercept() {successful request, unknown method}`(
@IntForgery(min = 200, max = 300) statusCode: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1346,8 +1346,12 @@ internal open class TracingInterceptorNotSendingSpanTest {

// region Internal

internal fun stubChain(chain: Interceptor.Chain, statusCode: Int) {
return stubChain(chain) { forgeResponse(statusCode) }
internal fun stubChain(
chain: Interceptor.Chain,
statusCode: Int,
responseBuilder: Response.Builder.() -> Unit = {}
) {
return stubChain(chain) { forgeResponse(statusCode, responseBuilder) }
}

internal fun stubChain(
Expand Down Expand Up @@ -1400,7 +1404,7 @@ internal open class TracingInterceptorNotSendingSpanTest {
return builder.build()
}

private fun forgeResponse(statusCode: Int): Response {
private fun forgeResponse(statusCode: Int, additionalConfig: Response.Builder.() -> Unit = {}): Response {
val builder = Response.Builder()
.request(fakeRequest)
.protocol(Protocol.HTTP_2)
Expand All @@ -1410,6 +1414,7 @@ internal open class TracingInterceptorNotSendingSpanTest {
if (fakeMediaType != null) {
builder.header(TracingInterceptor.HEADER_CT, fakeMediaType?.type.orEmpty())
}
builder.additionalConfig()
return builder.build()
}

Expand Down
1 change: 1 addition & 0 deletions sample/kotlin/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ dependencies {
implementation("com.squareup.retrofit2:converter-gson:2.9.0")
implementation(libs.okHttp)
implementation(libs.gson)
implementation("com.launchdarkly:okhttp-eventsource:2.5.0")

// Misc
implementation(libs.timber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ internal class TracesFragment : Fragment(), View.OnClickListener {
rootView.findViewById<Button>(R.id.start_coroutine_operation).setOnClickListener(this)
rootView.findViewById<Button>(R.id.start_request).setOnClickListener(this)
rootView.findViewById<Button>(R.id.start_404_request).setOnClickListener(this)
rootView.findViewById<Button>(R.id.start_sse_request).setOnClickListener(this)
progressBarAsync = rootView.findViewById(R.id.spinner_async)
progressBarCoroutine = rootView.findViewById(R.id.spinner_coroutine)
progressBarRequest = rootView.findViewById(R.id.spinner_request)
Expand Down Expand Up @@ -73,6 +74,7 @@ internal class TracesFragment : Fragment(), View.OnClickListener {

// region View.OnClickListener

@Suppress("LongMethod")
override fun onClick(v: View?) {
when (v?.id) {
R.id.start_async_operation -> {
Expand Down Expand Up @@ -123,6 +125,17 @@ internal class TracesFragment : Fragment(), View.OnClickListener {
}
)
}
R.id.start_sse_request -> {
setInProgress()
viewModel.startSseRequest(
onResponse = {
setCompleteStatus(R.drawable.ic_check_circle_green_24dp)
},
onException = {
setCompleteStatus(R.drawable.ic_error_red_24dp)
}
)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import com.datadog.android.trace.coroutines.launchTraced
import com.datadog.android.trace.coroutines.withContextTraced
import com.datadog.android.trace.withinSpan
import com.datadog.android.vendor.sample.LocalServer
import com.launchdarkly.eventsource.EventHandler
import com.launchdarkly.eventsource.EventSource
import com.launchdarkly.eventsource.MessageEvent
import io.opentracing.Span
import io.opentracing.log.Fields
import io.opentracing.util.GlobalTracer
Expand All @@ -33,10 +36,12 @@ import kotlinx.coroutines.flow.map
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import java.net.URI
import java.time.Duration
import java.util.Locale
import java.util.Random

@Suppress("DEPRECATION")
@Suppress("DEPRECATION", "StringLiteralDuplication", "TooManyFunctions")
internal class TracesViewModel(
private val okHttpClient: OkHttpClient,
private val localServer: LocalServer
Expand Down Expand Up @@ -80,7 +85,7 @@ internal class TracesViewModel(
onException: (Throwable) -> Unit,
onCancel: () -> Unit
) {
networkRequestTask = RequestTask(
networkRequestTask = GetRequestTask(
localServer.getUrl(),
okHttpClient,
onResponse,
Expand All @@ -95,7 +100,7 @@ internal class TracesViewModel(
onException: (Throwable) -> Unit,
onCancel: () -> Unit
) {
networkRequestTask = RequestTask(
networkRequestTask = GetRequestTask(
"https://www.datadoghq.com/notfound",
okHttpClient,
onResponse,
Expand All @@ -105,6 +110,19 @@ internal class TracesViewModel(
networkRequestTask?.execute()
}

fun startSseRequest(
onResponse: () -> Unit,
onException: (Throwable) -> Unit
) {
networkRequestTask = SSERequestTask(
localServer.sseUrl(),
okHttpClient,
onResponse,
onException
)
networkRequestTask?.execute()
}

fun stopAsyncOperations() {
asyncOperationTask?.cancel(true)
networkRequestTask?.cancel(true)
Expand Down Expand Up @@ -184,9 +202,9 @@ internal class TracesViewModel(

// endregion

// region RequestTask
// region GetRequestTask

private class RequestTask(
private class GetRequestTask(
private val url: String,
private val okHttpClient: OkHttpClient,
private val onResponse: (Response) -> Unit,
Expand Down Expand Up @@ -259,6 +277,64 @@ internal class TracesViewModel(

// endregion

// region SSERequestTask

private class SSERequestTask(
private val url: String,
private val okHttpClient: OkHttpClient,
private val onResponse: () -> Unit,
private val onException: (Throwable) -> Unit
) : AsyncTask<Unit, Unit, Result>(), EventHandler {
private var currentActiveMainSpan: Span? = null

@Deprecated("Deprecated in Java")
override fun onPreExecute() {
super.onPreExecute()
currentActiveMainSpan = GlobalTracer.get().activeSpan()
}

@Deprecated("Deprecated in Java")
@Suppress("TooGenericExceptionCaught", "LogNotTimber", "MagicNumber")
override fun doInBackground(vararg params: Unit?): Result {
return try {
val eventSourceSse = EventSource.Builder(this, URI.create(url))
.client(okHttpClient)
.connectTimeout(Duration.ofSeconds(3))
.backoffResetThreshold(Duration.ofSeconds(3))
.build()

eventSourceSse?.start()
Result.Success("")
} catch (e: Exception) {
Log.e("Response", "Error", e)
Result.Failure(throwable = e)
}
}

override fun onOpen() {
Log.i("SSE", "onOpen")
}

override fun onError(e: Throwable?) {
Log.e("SSE", "onError", e)
e?.let { onException(it) }
}

override fun onComment(comment: String?) {
Log.i("SSE", "onComment: $comment")
}

override fun onMessage(message: String?, event: MessageEvent?) {
Log.i("SSE", "onMessage: $message | $event")
}

override fun onClosed() {
onResponse()
}
}

// endregion

// region AsyncOperationTask

private class AsyncOperationTask(
Expand Down
Loading

0 comments on commit a6bd6a5

Please sign in to comment.