diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala index d87cc391c..7cd27f423 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala @@ -24,6 +24,7 @@ import com.netflix.atlas.core.model.Expr import com.netflix.atlas.core.model.FilterExpr import com.netflix.atlas.core.model.ModelExtractors import com.netflix.atlas.core.model.StatefulExpr +import com.netflix.atlas.core.model.StyleExpr import com.netflix.atlas.core.model.TraceQuery import com.netflix.atlas.core.model.TraceVocabulary import com.netflix.atlas.core.stacklang.Interpreter @@ -37,10 +38,12 @@ import com.typesafe.config.Config import scala.util.Success -private[stream] class ExprInterpreter(config: Config) { +class ExprInterpreter(config: Config) { import ExprInterpreter.* + private val tsInterpreter = Interpreter(new CustomVocabulary(config).allWords) + private val eventInterpreter = Interpreter( new CustomVocabulary(config, List(EventVocabulary)).allWords ) @@ -60,9 +63,23 @@ private[stream] class ExprInterpreter(config: Config) { // time shifts, filters, and integral. The filters and integral are excluded because // they can be confusing as the time window for evaluation is not bounded. val results = graphCfg.exprs.flatMap(_.perOffset) - results.foreach { result => + results.foreach(validate) + + // Perform host rewrites based on the Atlas hostname + val host = uri.authority.host.toString() + val rewritten = hostRewriter.rewrite(host, results) + graphCfg.copy(query = rewritten.mkString(","), parsedQuery = Success(rewritten)) + } + + /** + * Check that data expressions are supported. The streaming path doesn't support + * time shifts, filters, and integral. The filters and integral are excluded because + * they can be confusing as the time window for evaluation is not bounded. + */ + private def validate(styleExpr: StyleExpr): Unit = { + styleExpr.perOffset.foreach { s => // Use rewrite as a helper for searching the expression for invalid operations - result.expr.rewrite { + s.expr.rewrite { case op: StatefulExpr.Integral => invalidOperator(op); op case op: FilterExpr => invalidOperator(op); op case op: DataExpr if !op.offset.isZero => invalidOperator(op); op @@ -70,13 +87,8 @@ private[stream] class ExprInterpreter(config: Config) { // Double check all data expressions do not have an offset. In some cases for named rewrites // the check above may not detect the offset. - result.expr.dataExprs.filterNot(_.offset.isZero).foreach(invalidOperator) + s.expr.dataExprs.filterNot(_.offset.isZero).foreach(invalidOperator) } - - // Perform host rewrites based on the Atlas hostname - val host = uri.authority.host.toString() - val rewritten = hostRewriter.rewrite(host, results) - graphCfg.copy(query = rewritten.mkString(","), parsedQuery = Success(rewritten)) } private def invalidOperator(expr: Expr): Unit = { @@ -103,45 +115,80 @@ private[stream] class ExprInterpreter(config: Config) { } private def invalidValue(value: Any): IllegalArgumentException = { - new IllegalArgumentException(s"invalid value on stack; $value") + new IllegalArgumentException(s"invalid value on stack: $value") + } + + private def parseTimeSeries(query: String): List[StyleExpr] = { + val exprs = tsInterpreter.execute(query).stack.map { + case ModelExtractors.PresentationType(t) => t + case value => throw invalidValue(value) + } + exprs.foreach(validate) + exprs + } + + private def parseEvents(query: String): List[EventExpr] = { + eventInterpreter.execute(query).stack.map { + case ModelExtractors.EventExprType(t) => t + case value => throw invalidValue(value) + } } private def evalEvents(uri: Uri): List[EventExpr] = { uri.query().get("q") match { case Some(query) => - eventInterpreter.execute(query).stack.map { - case ModelExtractors.EventExprType(t) => t - case value => throw invalidValue(value) - } + parseEvents(query) case None => throw new IllegalArgumentException(s"missing required parameter: q ($uri)") } } + private def parseTraceEvents(query: String): List[TraceQuery.SpanFilter] = { + traceInterpreter.execute(query).stack.map { + case ModelExtractors.TraceFilterType(t) => t + case value => throw invalidValue(value) + } + } + private def evalTraceEvents(uri: Uri): List[TraceQuery.SpanFilter] = { uri.query().get("q") match { case Some(query) => - traceInterpreter.execute(query).stack.map { - case ModelExtractors.TraceFilterType(t) => t - case value => throw invalidValue(value) - } + parseTraceEvents(query) case None => throw new IllegalArgumentException(s"missing required parameter: q ($uri)") } } + private def parseTraceTimeSeries(query: String): List[TraceQuery.SpanTimeSeries] = { + val exprs = traceInterpreter.execute(query).stack.map { + case ModelExtractors.TraceTimeSeriesType(t) => t + case value => throw invalidValue(value) + } + exprs.foreach(t => validate(t.expr)) + exprs + } + private def evalTraceTimeSeries(uri: Uri): List[TraceQuery.SpanTimeSeries] = { uri.query().get("q") match { case Some(query) => - traceInterpreter.execute(query).stack.map { - case ModelExtractors.TraceTimeSeriesType(t) => t - case value => throw invalidValue(value) - } + parseTraceTimeSeries(query) case None => throw new IllegalArgumentException(s"missing required parameter: q ($uri)") } } + /** Parse an expression based on the type. */ + def parseQuery(expr: String, exprType: ExprType): List[Expr] = { + val exprs = exprType match { + case ExprType.TIME_SERIES => parseTimeSeries(expr) + case ExprType.EVENTS => parseEvents(expr) + case ExprType.TRACE_EVENTS => parseTraceEvents(expr) + case ExprType.TRACE_TIME_SERIES => parseTraceTimeSeries(expr) + } + exprs.distinct + } + + /** Parse an expression that is part of a URI. */ def parseQuery(uri: Uri): (ExprType, List[Expr]) = { val exprType = determineExprType(uri) val exprs = exprType match { diff --git a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionMetadata.scala b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionMetadata.scala index 6423614d0..cbdaed881 100644 --- a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionMetadata.scala +++ b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionMetadata.scala @@ -40,15 +40,16 @@ case class ExpressionMetadata( object ExpressionMetadata { def apply(expression: String, exprType: ExprType, step: Long): ExpressionMetadata = { - val f = if (step > 0) step else ApiSettings.defaultStep - new ExpressionMetadata(expression, exprType, f, computeId(expression, f)) + val dfltStep = if (exprType.isTimeSeriesType) ApiSettings.defaultStep else 0L + val f = if (step > 0) step else dfltStep + new ExpressionMetadata(expression, exprType, f, computeId(expression, exprType, f)) } def apply(expression: String): ExpressionMetadata = { apply(expression, ExprType.TIME_SERIES, ApiSettings.defaultStep) } - def computeId(e: String, f: Long): String = { - Strings.zeroPad(Hash.sha1bytes(s"$f~$e"), 40) + def computeId(e: String, t: ExprType, f: Long): String = { + Strings.zeroPad(Hash.sha1bytes(s"$f~$t~$e"), 40) } } diff --git a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionSplitter.scala b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionSplitter.scala index 448500915..f797836bb 100644 --- a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionSplitter.scala +++ b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionSplitter.scala @@ -17,13 +17,12 @@ package com.netflix.atlas.lwcapi import java.util.concurrent.TimeUnit import com.github.benmanes.caffeine.cache.Caffeine -import com.netflix.atlas.core.model.CustomVocabulary -import com.netflix.atlas.core.model.DataExpr -import com.netflix.atlas.core.model.ModelExtractors +import com.netflix.atlas.core.model.EventExpr import com.netflix.atlas.core.model.Query import com.netflix.atlas.core.model.Query.KeyQuery -import com.netflix.atlas.core.stacklang.Interpreter +import com.netflix.atlas.core.model.StyleExpr import com.netflix.atlas.eval.model.ExprType +import com.netflix.atlas.eval.stream.ExprInterpreter import com.netflix.spectator.ipc.ServerGroup import com.typesafe.config.Config @@ -42,7 +41,7 @@ class ExpressionSplitter(config: Config) { private val keepKeys = Set("nf.app", "nf.cluster", "nf.shard1", "nf.shard2", "nf.stack") - private val interpreter = Interpreter(new CustomVocabulary(config).allWords) + private val interpreter = new ExprInterpreter(config) /** * Processing the expressions can be quite expensive. In particular compiling regular @@ -115,25 +114,31 @@ class ExpressionSplitter(config: Config) { } } - private def parse(expression: String): Try[List[DataExprMeta]] = Try { - val context = interpreter.execute(expression) - val dataExprs = context.stack.flatMap { - case ModelExtractors.PresentationType(t) => t.perOffset.flatMap(_.expr.dataExprs) - case _ => throw new IllegalArgumentException("expression is invalid") - } - - // Offsets are not supported - dataExprs.foreach { dataExpr => - if (!dataExpr.offset.isZero) { - throw new IllegalArgumentException( - s":offset not supported for streaming evaluation [[$dataExpr]]" - ) - } - } - - dataExprs.distinct.map { e => - val q = intern(compress(e.query)) - DataExprMeta(e, e.toString, q) + private def parse(expression: String, exprType: ExprType): Try[List[DataExprMeta]] = Try { + val parsedExpressions = interpreter.parseQuery(expression, exprType) + exprType match { + case ExprType.EVENTS => + parsedExpressions.collect { + case e: EventExpr => + val q = intern(compress(e.query)) + DataExprMeta(e.toString, q) + } + case ExprType.TIME_SERIES => + parsedExpressions + .collect { + case se: StyleExpr => se.expr.dataExprs + } + .flatten + .distinct + .map { e => + val q = intern(compress(e.query)) + DataExprMeta(e.toString, q) + } + case ExprType.TRACE_EVENTS | ExprType.TRACE_TIME_SERIES => + parsedExpressions.map { e => + // Tracing cannot be scoped to specific infrastructure, always use True + DataExprMeta(e.toString, Query.True) + } } } @@ -142,11 +147,12 @@ class ExpressionSplitter(config: Config) { * contention and most threads are blocked. This just does and get/put which potentially * recomputes some values, but for this case that is preferable. */ - private def getFromCache(k: String): Try[List[DataExprMeta]] = { - val value = exprCache.getIfPresent(k) + private def getFromCache(k: String, exprType: ExprType): Try[List[DataExprMeta]] = { + val key = s"$k:$exprType" + val value = exprCache.getIfPresent(key) if (value == null) { - val tmp = parse(k) - exprCache.put(k, tmp) + val tmp = parse(k, exprType) + exprCache.put(key, tmp) tmp } else { value @@ -154,7 +160,7 @@ class ExpressionSplitter(config: Config) { } def split(expression: String, exprType: ExprType, frequency: Long): List[Subscription] = { - getFromCache(expression) match { + getFromCache(expression, exprType) match { case Success(exprs: List[?]) => exprs.map(e => toSubscription(e, exprType, frequency)) case Failure(t) => throw t } @@ -207,5 +213,5 @@ class ExpressionSplitter(config: Config) { } object ExpressionSplitter { - private case class DataExprMeta(expr: DataExpr, exprString: String, compressedQuery: Query) + private case class DataExprMeta(exprString: String, compressedQuery: Query) } diff --git a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscribeApi.scala b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscribeApi.scala index 2d2f92457..a224c6145 100644 --- a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscribeApi.scala +++ b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscribeApi.scala @@ -15,6 +15,7 @@ */ package com.netflix.atlas.lwcapi +import com.netflix.atlas.eval.model.ExprType import org.apache.pekko.NotUsed import org.apache.pekko.http.scaladsl.model.ws.BinaryMessage import org.apache.pekko.http.scaladsl.model.ws.Message @@ -31,6 +32,7 @@ import com.netflix.atlas.eval.model.LwcDataExpr import com.netflix.atlas.eval.model.LwcHeartbeat import com.netflix.atlas.eval.model.LwcMessages import com.netflix.atlas.eval.model.LwcSubscription +import com.netflix.atlas.eval.model.LwcSubscriptionV2 import com.netflix.atlas.json.JsonSupport import com.netflix.atlas.pekko.CustomDirectives.* import com.netflix.atlas.pekko.DiagnosticMessage @@ -163,7 +165,11 @@ class SubscribeApi( .flatMapConcat { _ => val steps = sm .subscriptionsForStream(streamId) - .map(_.metadata.frequency) + .map { sub => + // For events where step doesn't really matter use 5s as that is the typical heartbeat + // frequency. This only gets used for the time associated with the heartbeat messages. + if (sub.metadata.frequency == 0L) 5_000L else sub.metadata.frequency + } .distinct .map { step => // To account for some delays for data coming from real systems, the heartbeat @@ -198,7 +204,13 @@ class SubscribeApi( val subMessages = addedSubs.map { sub => val meta = sub.metadata val exprInfo = LwcDataExpr(meta.id, meta.expression, meta.frequency) - LwcSubscription(expr.expression, List(exprInfo)) + if (expr.exprType == ExprType.TIME_SERIES) { + // For backwards compatibility for older versions of eval library, use v1 + // subscription response when it is a time series type + LwcSubscription(expr.expression, List(exprInfo)) + } else { + LwcSubscriptionV2(expr.expression, expr.exprType, List(exprInfo)) + } } queue.offer(subMessages) diff --git a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionApiSuite.scala b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionApiSuite.scala index abb3a7971..aa98bf523 100644 --- a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionApiSuite.scala +++ b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionApiSuite.scala @@ -172,11 +172,11 @@ class ExpressionApiSuite extends MUnitRouteSuite { } private val skanCount = - """{"expression":"nf.cluster,skan,:eq,:count","exprType":"TIME_SERIES","frequency":60000,"id":"6278fa6047c07316d7e265a1004882ab9e1007af"}""" + """{"expression":"nf.cluster,skan,:eq,:count","exprType":"TIME_SERIES","frequency":60000,"id":"039722fefa66c2cdd0147595fb9b9a50351f90f0"}""" private val skanSum = - """{"expression":"nf.cluster,skan,:eq,:sum","exprType":"TIME_SERIES","frequency":60000,"id":"36e0a2c61b48e062bba5361d059afd313c82c674"}""" + """{"expression":"nf.cluster,skan,:eq,:sum","exprType":"TIME_SERIES","frequency":60000,"id":"17e0dc5b1224c825c81cf033c46d0f0490c1ca7f"}""" private val brhMax = - """{"expression":"nf.app,brh,:eq,:max","exprType":"TIME_SERIES","frequency":60000,"id":"16f1b0930c0eeae0225374ea88c01e161e589aff"}""" + """{"expression":"nf.app,brh,:eq,:max","exprType":"TIME_SERIES","frequency":60000,"id":"b19b2aa2c802c29216d4aa36024f71a3c92f84db"}""" } diff --git a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionMetadataSuite.scala b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionMetadataSuite.scala index 5eda35da7..0a3fed3e5 100644 --- a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionMetadataSuite.scala +++ b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionMetadataSuite.scala @@ -48,7 +48,7 @@ class ExpressionMetadataSuite extends FunSuite { test("computes id") { val expr = ExpressionMetadata("test") - assertEquals(expr.id, "2684d3c5cb245bd2fd6ee4ea30a500e97ace8141") + assertEquals(expr.id, "59ec3895749cc3fb279d41dabc1d4943361de999") } test("id computation considers frequency") { @@ -114,14 +114,14 @@ class ExpressionMetadataSuite extends FunSuite { test("renders as json with default frequency") { val expected = - "{\"expression\":\"this\",\"exprType\":\"TIME_SERIES\",\"frequency\":60000,\"id\":\"fc3a081088771e05bdc3aa99ffd8770157dfe6ce\"}" + "{\"expression\":\"this\",\"exprType\":\"TIME_SERIES\",\"frequency\":60000,\"id\":\"8189ff24a1e801c924689aeb0490d5d840f23582\"}" val json = ExpressionMetadata("this").toJson assertEquals(expected, json) } test("renders as json with frequency of 0") { val expected = - "{\"expression\":\"this\",\"exprType\":\"TIME_SERIES\",\"frequency\":60000,\"id\":\"fc3a081088771e05bdc3aa99ffd8770157dfe6ce\"}" + "{\"expression\":\"this\",\"exprType\":\"TIME_SERIES\",\"frequency\":60000,\"id\":\"8189ff24a1e801c924689aeb0490d5d840f23582\"}" val json = ExpressionMetadata("this", ExprType.TIME_SERIES, 0).toJson assertEquals(expected, json) } diff --git a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionSplitterSuite.scala b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionSplitterSuite.scala index 53fa1b6c7..89803badf 100644 --- a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionSplitterSuite.scala +++ b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionSplitterSuite.scala @@ -54,7 +54,7 @@ class ExpressionSplitterSuite extends FunSuite { val msg = intercept[IllegalArgumentException] { splitter.split("foo", ExprType.TIME_SERIES, frequency1) } - assertEquals(msg.getMessage, "expression is invalid") + assertEquals(msg.getMessage, "invalid value on stack: foo") } test("throws IAE for expressions with offset") { diff --git a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscribeApiSuite.scala b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscribeApiSuite.scala index 30101390c..1c217ed55 100644 --- a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscribeApiSuite.scala +++ b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscribeApiSuite.scala @@ -15,15 +15,19 @@ */ package com.netflix.atlas.lwcapi +import com.fasterxml.jackson.databind.JsonNode import org.apache.pekko.http.scaladsl.model.ws.Message import org.apache.pekko.http.scaladsl.testkit.RouteTestTimeout import org.apache.pekko.http.scaladsl.testkit.WSProbe import com.netflix.atlas.eval.model.ExprType import com.netflix.atlas.eval.model.LwcDatapoint +import com.netflix.atlas.eval.model.LwcEvent import com.netflix.atlas.eval.model.LwcExpression import com.netflix.atlas.eval.model.LwcHeartbeat import com.netflix.atlas.eval.model.LwcMessages import com.netflix.atlas.eval.model.LwcSubscription +import com.netflix.atlas.eval.model.LwcSubscriptionV2 +import com.netflix.atlas.json.Json import com.netflix.atlas.pekko.DiagnosticMessage import com.netflix.atlas.pekko.RequestHandler import com.netflix.atlas.pekko.testkit.MUnitRouteSuite @@ -90,4 +94,40 @@ class SubscribeApiSuite extends MUnitRouteSuite { } } } + + test("subscribe websocket event") { + val client = WSProbe() + WS("/api/v2/subscribe/222", client.flow) ~> routes ~> check { + assert(isWebSocketUpgrade) + + // Send list of expressions to subscribe to + val exprs = List(LwcExpression("name,disk,:eq", ExprType.EVENTS, 0L)) + client.sendMessage(LwcMessages.encodeBatch(exprs)) + + // Look for subscription messages, one for sum and one for count + var subscriptions = List.empty[LwcSubscriptionV2] + while (subscriptions.size < 1) { + parseBatch(client.expectMessage()).foreach { + case _: DiagnosticMessage => + case sub: LwcSubscriptionV2 => subscriptions = sub :: subscriptions + case h: LwcHeartbeat => assertEquals(h.step, 5000L) + case v => throw new MatchError(v) + } + } + + // Verify subscription is in the manager, push a message to the queue check that it + // is received by the client + assertEquals(subscriptions.flatMap(_.subExprs).size, 1) + subscriptions.flatMap(_.subExprs).foreach { m => + val tags = Map("name" -> "disk") + val json = Json.decode[JsonNode](Json.encode(tags)) + val event = LwcEvent(m.id, json) + val handlers = sm.handlersForSubscription(m.id) + assertEquals(handlers.size, 1) + handlers.head.offer(Seq(event)) + + assertEquals(parseBatch(client.expectMessage()), List(event)) + } + } + } }