Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lwc-events: fix sync for time series #1646

Merged
merged 1 commit into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient {
subHandlers.put(sub, q -> handler)
flushableHandlers += handler
}
diff.unchanged.timeSeries.foreach { sub =>
val handlerMeta = subHandlers.get(sub)
if (handlerMeta != null)
flushableHandlers += handlerMeta._2
}
diff.removed.timeSeries.foreach(removeSubscription)

// Trace pass-through
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@
package com.netflix.atlas.lwc.events

import com.netflix.atlas.core.util.SortedTagMap
import com.netflix.atlas.json.Json
import com.netflix.spectator.api.Clock
import com.netflix.spectator.api.ManualClock
import munit.FunSuite

import java.io.StringWriter
import scala.util.Using

class LwcEventClientSuite extends FunSuite {

import LwcEventSuite.*
import LwcEventClientSuite.*

private val clock = new ManualClock()
private val step = 5_000L
Expand Down Expand Up @@ -121,6 +127,41 @@ class LwcEventClientSuite extends FunSuite {
assert(output.result().isEmpty)
}

test("analytics, sync") {
val subs = Subscriptions.fromTypedList(
List(
Subscription("1", step, "app,foo,:eq,:sum", Subscriptions.TimeSeries),
Subscription("2", step, "app,www,:eq,:sum", Subscriptions.TimeSeries)
)
)
val output = List.newBuilder[String]
val client = TestLwcEventClient(subs, output.addOne, clock)
client.process(sampleLwcEvent)
clock.setWallTime(step)
client.process(LwcEvent.HeartbeatLwcEvent(step))
assertEquals(output.result().size, 2)

// Sync expressions, same set
(2 until 10).foreach { i =>
output.clear()
client.sync(subs)
client.process(sampleLwcEvent)
clock.setWallTime(step * i)
client.process(LwcEvent.HeartbeatLwcEvent(step * i))
assertEquals(output.result().size, 2)
}

// Sync expressions, subset
(10 until 20).foreach { i =>
output.clear()
client.sync(subs.copy(timeSeries = subs.timeSeries.tail))
client.process(sampleLwcEvent)
clock.setWallTime(step * i)
client.process(LwcEvent.HeartbeatLwcEvent(step * i))
assertEquals(output.result().size, 1)
}
}

test("trace analytics, basic aggregate") {
val subs = Subscriptions.fromTypedList(
List(
Expand All @@ -136,3 +177,32 @@ class LwcEventClientSuite extends FunSuite {
assertEquals(vs.size, 1)
}
}

object LwcEventClientSuite {

case class TestLwcEventClient(
subscriptions: Subscriptions,
consumer: String => Unit,
clock: Clock
) extends AbstractLwcEventClient(clock) {

sync(subscriptions)

override def sync(subscriptions: Subscriptions): Unit = {
super.sync(subscriptions)
}

override def submit(id: String, event: LwcEvent): Unit = {
Using.resource(new StringWriter()) { w =>
Using.resource(Json.newJsonGenerator(w)) { gen =>
gen.writeStartObject()
gen.writeStringField("id", id)
gen.writeFieldName("event")
event.encode(gen)
gen.writeEndObject()
}
consumer(s"data: ${w.toString}")
}
}
}
}
Loading