Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lwcapi: use subv2 type for events #1635

Merged
merged 1 commit into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.netflix.atlas.core.model.Expr
import com.netflix.atlas.core.model.FilterExpr
import com.netflix.atlas.core.model.ModelExtractors
import com.netflix.atlas.core.model.StatefulExpr
import com.netflix.atlas.core.model.StyleExpr
import com.netflix.atlas.core.model.TraceQuery
import com.netflix.atlas.core.model.TraceVocabulary
import com.netflix.atlas.core.stacklang.Interpreter
Expand All @@ -37,10 +38,12 @@ import com.typesafe.config.Config

import scala.util.Success

private[stream] class ExprInterpreter(config: Config) {
class ExprInterpreter(config: Config) {

import ExprInterpreter.*

private val tsInterpreter = Interpreter(new CustomVocabulary(config).allWords)

private val eventInterpreter = Interpreter(
new CustomVocabulary(config, List(EventVocabulary)).allWords
)
Expand All @@ -60,23 +63,32 @@ private[stream] class ExprInterpreter(config: Config) {
// time shifts, filters, and integral. The filters and integral are excluded because
// they can be confusing as the time window for evaluation is not bounded.
val results = graphCfg.exprs.flatMap(_.perOffset)
results.foreach { result =>
results.foreach(validate)

// Perform host rewrites based on the Atlas hostname
val host = uri.authority.host.toString()
val rewritten = hostRewriter.rewrite(host, results)
graphCfg.copy(query = rewritten.mkString(","), parsedQuery = Success(rewritten))
}

/**
* Check that data expressions are supported. The streaming path doesn't support
* time shifts, filters, and integral. The filters and integral are excluded because
* they can be confusing as the time window for evaluation is not bounded.
*/
private def validate(styleExpr: StyleExpr): Unit = {
styleExpr.perOffset.foreach { s =>
// Use rewrite as a helper for searching the expression for invalid operations
result.expr.rewrite {
s.expr.rewrite {
case op: StatefulExpr.Integral => invalidOperator(op); op
case op: FilterExpr => invalidOperator(op); op
case op: DataExpr if !op.offset.isZero => invalidOperator(op); op
}

// Double check all data expressions do not have an offset. In some cases for named rewrites
// the check above may not detect the offset.
result.expr.dataExprs.filterNot(_.offset.isZero).foreach(invalidOperator)
s.expr.dataExprs.filterNot(_.offset.isZero).foreach(invalidOperator)
}

// Perform host rewrites based on the Atlas hostname
val host = uri.authority.host.toString()
val rewritten = hostRewriter.rewrite(host, results)
graphCfg.copy(query = rewritten.mkString(","), parsedQuery = Success(rewritten))
}

private def invalidOperator(expr: Expr): Unit = {
Expand All @@ -103,45 +115,80 @@ private[stream] class ExprInterpreter(config: Config) {
}

private def invalidValue(value: Any): IllegalArgumentException = {
new IllegalArgumentException(s"invalid value on stack; $value")
new IllegalArgumentException(s"invalid value on stack: $value")
}

private def parseTimeSeries(query: String): List[StyleExpr] = {
val exprs = tsInterpreter.execute(query).stack.map {
case ModelExtractors.PresentationType(t) => t
case value => throw invalidValue(value)
}
exprs.foreach(validate)
exprs
}

private def parseEvents(query: String): List[EventExpr] = {
eventInterpreter.execute(query).stack.map {
case ModelExtractors.EventExprType(t) => t
case value => throw invalidValue(value)
}
}

private def evalEvents(uri: Uri): List[EventExpr] = {
uri.query().get("q") match {
case Some(query) =>
eventInterpreter.execute(query).stack.map {
case ModelExtractors.EventExprType(t) => t
case value => throw invalidValue(value)
}
parseEvents(query)
case None =>
throw new IllegalArgumentException(s"missing required parameter: q ($uri)")
}
}

private def parseTraceEvents(query: String): List[TraceQuery.SpanFilter] = {
traceInterpreter.execute(query).stack.map {
case ModelExtractors.TraceFilterType(t) => t
case value => throw invalidValue(value)
}
}

private def evalTraceEvents(uri: Uri): List[TraceQuery.SpanFilter] = {
uri.query().get("q") match {
case Some(query) =>
traceInterpreter.execute(query).stack.map {
case ModelExtractors.TraceFilterType(t) => t
case value => throw invalidValue(value)
}
parseTraceEvents(query)
case None =>
throw new IllegalArgumentException(s"missing required parameter: q ($uri)")
}
}

private def parseTraceTimeSeries(query: String): List[TraceQuery.SpanTimeSeries] = {
val exprs = traceInterpreter.execute(query).stack.map {
case ModelExtractors.TraceTimeSeriesType(t) => t
case value => throw invalidValue(value)
}
exprs.foreach(t => validate(t.expr))
exprs
}

private def evalTraceTimeSeries(uri: Uri): List[TraceQuery.SpanTimeSeries] = {
uri.query().get("q") match {
case Some(query) =>
traceInterpreter.execute(query).stack.map {
case ModelExtractors.TraceTimeSeriesType(t) => t
case value => throw invalidValue(value)
}
parseTraceTimeSeries(query)
case None =>
throw new IllegalArgumentException(s"missing required parameter: q ($uri)")
}
}

/** Parse an expression based on the type. */
def parseQuery(expr: String, exprType: ExprType): List[Expr] = {
val exprs = exprType match {
case ExprType.TIME_SERIES => parseTimeSeries(expr)
case ExprType.EVENTS => parseEvents(expr)
case ExprType.TRACE_EVENTS => parseTraceEvents(expr)
case ExprType.TRACE_TIME_SERIES => parseTraceTimeSeries(expr)
}
exprs.distinct
}

/** Parse an expression that is part of a URI. */
def parseQuery(uri: Uri): (ExprType, List[Expr]) = {
val exprType = determineExprType(uri)
val exprs = exprType match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ case class ExpressionMetadata(
object ExpressionMetadata {

def apply(expression: String, exprType: ExprType, step: Long): ExpressionMetadata = {
val f = if (step > 0) step else ApiSettings.defaultStep
new ExpressionMetadata(expression, exprType, f, computeId(expression, f))
val dfltStep = if (exprType.isTimeSeriesType) ApiSettings.defaultStep else 0L
val f = if (step > 0) step else dfltStep
new ExpressionMetadata(expression, exprType, f, computeId(expression, exprType, f))
}

def apply(expression: String): ExpressionMetadata = {
apply(expression, ExprType.TIME_SERIES, ApiSettings.defaultStep)
}

def computeId(e: String, f: Long): String = {
Strings.zeroPad(Hash.sha1bytes(s"$f~$e"), 40)
def computeId(e: String, t: ExprType, f: Long): String = {
Strings.zeroPad(Hash.sha1bytes(s"$f~$t~$e"), 40)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ package com.netflix.atlas.lwcapi

import java.util.concurrent.TimeUnit
import com.github.benmanes.caffeine.cache.Caffeine
import com.netflix.atlas.core.model.CustomVocabulary
import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.model.ModelExtractors
import com.netflix.atlas.core.model.EventExpr
import com.netflix.atlas.core.model.Query
import com.netflix.atlas.core.model.Query.KeyQuery
import com.netflix.atlas.core.stacklang.Interpreter
import com.netflix.atlas.core.model.StyleExpr
import com.netflix.atlas.eval.model.ExprType
import com.netflix.atlas.eval.stream.ExprInterpreter
import com.netflix.spectator.ipc.ServerGroup
import com.typesafe.config.Config

Expand All @@ -42,7 +41,7 @@ class ExpressionSplitter(config: Config) {

private val keepKeys = Set("nf.app", "nf.cluster", "nf.shard1", "nf.shard2", "nf.stack")

private val interpreter = Interpreter(new CustomVocabulary(config).allWords)
private val interpreter = new ExprInterpreter(config)

/**
* Processing the expressions can be quite expensive. In particular compiling regular
Expand Down Expand Up @@ -115,25 +114,31 @@ class ExpressionSplitter(config: Config) {
}
}

private def parse(expression: String): Try[List[DataExprMeta]] = Try {
val context = interpreter.execute(expression)
val dataExprs = context.stack.flatMap {
case ModelExtractors.PresentationType(t) => t.perOffset.flatMap(_.expr.dataExprs)
case _ => throw new IllegalArgumentException("expression is invalid")
}

// Offsets are not supported
dataExprs.foreach { dataExpr =>
if (!dataExpr.offset.isZero) {
throw new IllegalArgumentException(
s":offset not supported for streaming evaluation [[$dataExpr]]"
)
}
}

dataExprs.distinct.map { e =>
val q = intern(compress(e.query))
DataExprMeta(e, e.toString, q)
private def parse(expression: String, exprType: ExprType): Try[List[DataExprMeta]] = Try {
val parsedExpressions = interpreter.parseQuery(expression, exprType)
exprType match {
case ExprType.EVENTS =>
parsedExpressions.collect {
case e: EventExpr =>
val q = intern(compress(e.query))
DataExprMeta(e.toString, q)
}
case ExprType.TIME_SERIES =>
parsedExpressions
.collect {
case se: StyleExpr => se.expr.dataExprs
}
.flatten
.distinct
.map { e =>
val q = intern(compress(e.query))
DataExprMeta(e.toString, q)
}
case ExprType.TRACE_EVENTS | ExprType.TRACE_TIME_SERIES =>
parsedExpressions.map { e =>
// Tracing cannot be scoped to specific infrastructure, always use True
DataExprMeta(e.toString, Query.True)
}
}
}

Expand All @@ -142,19 +147,20 @@ class ExpressionSplitter(config: Config) {
* contention and most threads are blocked. This just does and get/put which potentially
* recomputes some values, but for this case that is preferable.
*/
private def getFromCache(k: String): Try[List[DataExprMeta]] = {
val value = exprCache.getIfPresent(k)
private def getFromCache(k: String, exprType: ExprType): Try[List[DataExprMeta]] = {
val key = s"$k:$exprType"
val value = exprCache.getIfPresent(key)
if (value == null) {
val tmp = parse(k)
exprCache.put(k, tmp)
val tmp = parse(k, exprType)
exprCache.put(key, tmp)
tmp
} else {
value
}
}

def split(expression: String, exprType: ExprType, frequency: Long): List[Subscription] = {
getFromCache(expression) match {
getFromCache(expression, exprType) match {
case Success(exprs: List[?]) => exprs.map(e => toSubscription(e, exprType, frequency))
case Failure(t) => throw t
}
Expand Down Expand Up @@ -207,5 +213,5 @@ class ExpressionSplitter(config: Config) {
}

object ExpressionSplitter {
private case class DataExprMeta(expr: DataExpr, exprString: String, compressedQuery: Query)
private case class DataExprMeta(exprString: String, compressedQuery: Query)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.netflix.atlas.lwcapi

import com.netflix.atlas.eval.model.ExprType
import org.apache.pekko.NotUsed
import org.apache.pekko.http.scaladsl.model.ws.BinaryMessage
import org.apache.pekko.http.scaladsl.model.ws.Message
Expand All @@ -31,6 +32,7 @@ import com.netflix.atlas.eval.model.LwcDataExpr
import com.netflix.atlas.eval.model.LwcHeartbeat
import com.netflix.atlas.eval.model.LwcMessages
import com.netflix.atlas.eval.model.LwcSubscription
import com.netflix.atlas.eval.model.LwcSubscriptionV2
import com.netflix.atlas.json.JsonSupport
import com.netflix.atlas.pekko.CustomDirectives.*
import com.netflix.atlas.pekko.DiagnosticMessage
Expand Down Expand Up @@ -163,7 +165,11 @@ class SubscribeApi(
.flatMapConcat { _ =>
val steps = sm
.subscriptionsForStream(streamId)
.map(_.metadata.frequency)
.map { sub =>
// For events where step doesn't really matter use 5s as that is the typical heartbeat
// frequency. This only gets used for the time associated with the heartbeat messages.
if (sub.metadata.frequency == 0L) 5_000L else sub.metadata.frequency
}
.distinct
.map { step =>
// To account for some delays for data coming from real systems, the heartbeat
Expand Down Expand Up @@ -198,7 +204,13 @@ class SubscribeApi(
val subMessages = addedSubs.map { sub =>
val meta = sub.metadata
val exprInfo = LwcDataExpr(meta.id, meta.expression, meta.frequency)
LwcSubscription(expr.expression, List(exprInfo))
if (expr.exprType == ExprType.TIME_SERIES) {
// For backwards compatibility for older versions of eval library, use v1
// subscription response when it is a time series type
LwcSubscription(expr.expression, List(exprInfo))
} else {
LwcSubscriptionV2(expr.expression, expr.exprType, List(exprInfo))
}
}
queue.offer(subMessages)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,11 @@ class ExpressionApiSuite extends MUnitRouteSuite {
}

private val skanCount =
"""{"expression":"nf.cluster,skan,:eq,:count","exprType":"TIME_SERIES","frequency":60000,"id":"6278fa6047c07316d7e265a1004882ab9e1007af"}"""
"""{"expression":"nf.cluster,skan,:eq,:count","exprType":"TIME_SERIES","frequency":60000,"id":"039722fefa66c2cdd0147595fb9b9a50351f90f0"}"""

private val skanSum =
"""{"expression":"nf.cluster,skan,:eq,:sum","exprType":"TIME_SERIES","frequency":60000,"id":"36e0a2c61b48e062bba5361d059afd313c82c674"}"""
"""{"expression":"nf.cluster,skan,:eq,:sum","exprType":"TIME_SERIES","frequency":60000,"id":"17e0dc5b1224c825c81cf033c46d0f0490c1ca7f"}"""

private val brhMax =
"""{"expression":"nf.app,brh,:eq,:max","exprType":"TIME_SERIES","frequency":60000,"id":"16f1b0930c0eeae0225374ea88c01e161e589aff"}"""
"""{"expression":"nf.app,brh,:eq,:max","exprType":"TIME_SERIES","frequency":60000,"id":"b19b2aa2c802c29216d4aa36024f71a3c92f84db"}"""
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ExpressionMetadataSuite extends FunSuite {

test("computes id") {
val expr = ExpressionMetadata("test")
assertEquals(expr.id, "2684d3c5cb245bd2fd6ee4ea30a500e97ace8141")
assertEquals(expr.id, "59ec3895749cc3fb279d41dabc1d4943361de999")
}

test("id computation considers frequency") {
Expand Down Expand Up @@ -114,14 +114,14 @@ class ExpressionMetadataSuite extends FunSuite {

test("renders as json with default frequency") {
val expected =
"{\"expression\":\"this\",\"exprType\":\"TIME_SERIES\",\"frequency\":60000,\"id\":\"fc3a081088771e05bdc3aa99ffd8770157dfe6ce\"}"
"{\"expression\":\"this\",\"exprType\":\"TIME_SERIES\",\"frequency\":60000,\"id\":\"8189ff24a1e801c924689aeb0490d5d840f23582\"}"
val json = ExpressionMetadata("this").toJson
assertEquals(expected, json)
}

test("renders as json with frequency of 0") {
val expected =
"{\"expression\":\"this\",\"exprType\":\"TIME_SERIES\",\"frequency\":60000,\"id\":\"fc3a081088771e05bdc3aa99ffd8770157dfe6ce\"}"
"{\"expression\":\"this\",\"exprType\":\"TIME_SERIES\",\"frequency\":60000,\"id\":\"8189ff24a1e801c924689aeb0490d5d840f23582\"}"
val json = ExpressionMetadata("this", ExprType.TIME_SERIES, 0).toJson
assertEquals(expected, json)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ExpressionSplitterSuite extends FunSuite {
val msg = intercept[IllegalArgumentException] {
splitter.split("foo", ExprType.TIME_SERIES, frequency1)
}
assertEquals(msg.getMessage, "expression is invalid")
assertEquals(msg.getMessage, "invalid value on stack: foo")
}

test("throws IAE for expressions with offset") {
Expand Down
Loading
Loading