Skip to content

Commit

Permalink
Add metrics wiring
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Apr 22, 2024
1 parent 2775fe4 commit d95335e
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ module Log =
| Delete of Measurement
/// Trimmed the Tip
| Trim of Measurement
/// Queried via the Index
/// Queried via the Index; count=-1 -> aggregate operation
| Index of Measurement
let [<return: Struct>] (|MetricEvent|_|) (logEvent: Serilog.Events.LogEvent): Metric voption =
let mutable p = Unchecked.defaultof<_>
Expand Down
67 changes: 39 additions & 28 deletions src/Equinox.CosmosStore/CosmosStoreLinq.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace Equinox.CosmosStore.Linq

open Equinox.Core.Infrastructure
open Equinox.Core
open Equinox.CosmosStore.Core // Log, JsonCompressedBase64Converter
open FSharp.Control // taskSeq
open Serilog
open System
Expand Down Expand Up @@ -76,8 +77,8 @@ module Internal =
let m = response.Diagnostics.GetQueryMetrics().CumulativeMetrics
yield struct (response.Diagnostics.GetClientElapsedTime(), response.RequestCharge, response.Resource,
int m.RetrievedDocumentCount, int m.RetrievedDocumentSize, int m.OutputDocumentSize) }
let toAsyncEnum<'T> (desc: string) (iterator: FeedIterator<'T>) = taskSeq {
let sw = System.Diagnostics.Stopwatch.StartNew()
let [<EditorBrowsable(EditorBrowsableState.Never)>] toAsyncEnum<'T> log (container: Container) cat (iterator: FeedIterator<'T>) = taskSeq {
let startTicks = System.Diagnostics.Stopwatch.GetTimestamp()
use _ = iterator
let mutable responses, items, totalRtt, totalRu, totalRdc, totalRds, totalOds = 0, 0, TimeSpan.Zero, 0., 0, 0, 0
try for rtt, rc, response, rdc, rds, ods in enum_ iterator do
Expand All @@ -90,44 +91,53 @@ module Internal =
for item in response do
items <- items + 1
yield item
finally Log.Information("CosmosStoreQuery.enum {desc} {count} ({trips}r {totalRtt:f0}ms; {rdc}i {rds:f2}>{ods:f2} MiB) {rc:f2} RU {latency} ms",
desc, items, responses, totalRtt.TotalMilliseconds, totalRdc, miB totalRds, miB totalOds, totalRu, sw.ElapsedMilliseconds) }
finally
let interval = StopwatchInterval(startTicks, System.Diagnostics.Stopwatch.GetTimestamp())
let log = let evt = Log.Metric.Index { database = container.Database.Id; container = container.Id; stream = cat + FsCodec.StreamName.Category.SeparatorStr
interval = interval; bytes = totalOds; count = items; ru = totalRu } in log |> Log.event evt
log.Information("EqxCosmos {action:l} {count} ({trips}r {totalRtt:f0}ms; {rdc}i {rds:f2}>{ods:f2} MiB) {rc:f2} RU {latency} ms",
"Index", items, responses, totalRtt.TotalMilliseconds, totalRdc, miB totalRds, miB totalOds, totalRu, interval.ElapsedMilliseconds) }
/// Runs a query that renders 'T, Hydrating the results as 'P (can be the same types but e.g. you might want to map an object to a JsonElement etc)
let enum<'T, 'P> desc (container: Container) (query: IQueryable<'T>): TaskSeq<'P> =
let enum<'T, 'P> (log: ILogger) (container: Container) cat (query: IQueryable<'T>): TaskSeq<'P> =
let queryDefinition = query.ToQueryDefinition()
if Log.IsEnabled Serilog.Events.LogEventLevel.Debug then Log.Debug("CosmosStoreQuery.query {desc} {query}", desc, queryDefinition.QueryText)
container.GetItemQueryIterator<'P>(queryDefinition) |> toAsyncEnum<'P> desc
if log.IsEnabled Serilog.Events.LogEventLevel.Debug then log.Debug("CosmosStoreQuery.query {cat} {query}", cat, queryDefinition.QueryText)
container.GetItemQueryIterator<'P>(queryDefinition) |> toAsyncEnum<'P> log container cat
module AggregateOp =
/// Runs one of the typical Cosmos SDK extensions, e.g. CountAsync, logging the costs
let exec (desc: string) (query: IQueryable<'T>) run render: System.Threading.Tasks.Task<'R> = task {
if Log.IsEnabled Serilog.Events.LogEventLevel.Debug then Log.Debug("CosmosStoreQuery.count {desc} {query}", desc, query.ToQueryDefinition().QueryText)
let sw = System.Diagnostics.Stopwatch.StartNew()
let [<EditorBrowsable(EditorBrowsableState.Never)>] exec (log: ILogger) (container: Container) (op: string) (cat: string) (query: IQueryable<'T>) run render: System.Threading.Tasks.Task<'R> = task {
let startTicks = System.Diagnostics.Stopwatch.GetTimestamp()
let! (rsp: Response<'R>) = run query
let res = rsp.Resource
let summary = render res
let m = rsp.Diagnostics.GetQueryMetrics().CumulativeMetrics
Log.Information("CosmosStoreQuery.count {desc} {count} ({rdc}i {rds:f2}>{ods:f2} MiB) {rc} RU {latency} ms",
desc, summary, m.RetrievedDocumentCount, miB m.RetrievedDocumentSize, miB m.OutputDocumentSize, rsp.RequestCharge, sw.ElapsedMilliseconds)
let interval = StopwatchInterval(startTicks, System.Diagnostics.Stopwatch.GetTimestamp())
let totalOds, totalRu = m.OutputDocumentSize, rsp.RequestCharge
let log = let evt = Log.Metric.Index { database = container.Database.Id; container = container.Id; stream = cat + FsCodec.StreamName.Category.SeparatorStr
interval = interval; bytes = int totalOds; count = -1; ru = totalRu } in log |> Log.event evt
log.Information("EqxCosmos {action:l} {cat} {count} ({rdc}i {rds:f2}>{ods:f2} MiB) {rc} RU {latency} ms",
op, cat, summary, m.RetrievedDocumentCount, miB m.RetrievedDocumentSize, miB totalOds, totalRu, interval.ElapsedMilliseconds)
return res }
/// Runs query.CountAsync, with instrumentation equivalent to what query provides
let countAsync desc (query: IQueryable<'T>) ct =
exec desc query (_.CountAsync(ct)) id<int>
let countAsync (log: ILogger) container cat (query: IQueryable<'T>) ct =
if log.IsEnabled Serilog.Events.LogEventLevel.Debug then log.Debug("CosmosStoreQuery.count {cat} {query}", cat, query.ToQueryDefinition().QueryText)
exec log container "count" cat query (_.CountAsync(ct)) id<int>
module Scalar =
/// Generates a TOP 1 SQL query
let top1 (query: IQueryable<'T>) =
query.Take(1)
/// Handles a query that's expected to yield 0 or 1 result item
let tryHeadAsync<'T, 'R> desc (container: Container) (query: IQueryable<'T>) (_ct: CancellationToken): Task<'R option> =
let tryHeadAsync<'T, 'R> (log: ILogger) (container: Container) cat (query: IQueryable<'T>) (_ct: CancellationToken): Task<'R option> =
let queryDefinition = (top1 query).ToQueryDefinition()
if Log.IsEnabled Serilog.Events.LogEventLevel.Debug then Log.Debug("CosmosStoreQuery.tryScalar {desc} {query}", desc, queryDefinition.QueryText)
container.GetItemQueryIterator<'R>(queryDefinition) |> Query.toAsyncEnum desc |> TaskSeq.tryHead
type Projection<'T, 'M>(query, description, container, enum: IQueryable<'T> -> TaskSeq<'M>) =
static member Create<'P>(q, d, c, hydrate: 'P -> 'M) = Projection<'T, 'M>(q, d, c, Query.enum<'T, 'P> d c >> TaskSeq.map hydrate)
if log.IsEnabled Serilog.Events.LogEventLevel.Debug then log.Debug("CosmosStoreQuery.tryScalar {cat} {query}", queryDefinition.QueryText)
container.GetItemQueryIterator<'R>(queryDefinition) |> Query.toAsyncEnum log container cat |> TaskSeq.tryHead
type Projection<'T, 'M>(query, category, container, enum: IQueryable<'T> -> TaskSeq<'M>, count: IQueryable<'T> -> CancellationToken -> Task<int>) =
static member Create<'P>(q, cat, c, log, hydrate: 'P -> 'M) =
Projection<'T, 'M>(q, cat, c, Query.enum<'T, 'P> log c cat >> TaskSeq.map hydrate, AggregateOp.countAsync log c cat)
member _.Enum: TaskSeq<'M> = query |> enum
member x.EnumPage(skip, take): TaskSeq<'M> = query |> Query.offsetLimit (skip, take) |> enum
member _.CountAsync: CancellationToken -> Task<int> = query |> AggregateOp.countAsync description
member _.CountAsync: CancellationToken -> Task<int> = query |> count
[<EditorBrowsable(EditorBrowsableState.Never)>] member val Query: IQueryable<'T> = query
[<EditorBrowsable(EditorBrowsableState.Never)>] member val Description: string = description
[<EditorBrowsable(EditorBrowsableState.Never)>] member val Category: string = category
[<EditorBrowsable(EditorBrowsableState.Never)>] member val Container: Container = container

// We want to generate a projection statement of the shape: VALUE {"sn": root["p"], "snap": root["u"][0].["d"]}
Expand All @@ -138,7 +148,7 @@ module Internal =
// This hack is based on https://stackoverflow.com/a/73506241/11635
type SnAndSnap<'I>() =
member val sn: FsCodec.StreamName = Unchecked.defaultof<_> with get, set
[<System.Text.Json.Serialization.JsonConverter(typeof<Equinox.CosmosStore.Core.JsonCompressedBase64Converter>)>]
[<System.Text.Json.Serialization.JsonConverter(typeof<JsonCompressedBase64Converter>)>]
member val snap: 'I = Unchecked.defaultof<_> with get, set
static member CreateItemQueryLambda<'T>(snExpression: Expression -> MemberExpression, snapExpression: Expression<Func<'T, 'I>>) =
let param = Expression.Parameter(typeof<'T>, "x")
Expand Down Expand Up @@ -178,8 +188,8 @@ module Index =
container.GetItemLinqQueryable<Item<'I>>().Where(fun d -> d.p.StartsWith(prefix) && d.u[0].c = caseName)

/// Returns the StreamName (from the `p` field) for a 0/1 item query; only the TOP 1 item is returned
let tryGetStreamNameAsync description container (query: IQueryable<Item<'I>>) ct =
Internal.Scalar.tryHeadAsync<string, FsCodec.StreamName> description container (query.Select(fun x -> x.p)) ct
let tryGetStreamNameAsync log cat container (query: IQueryable<Item<'I>>) ct =
Internal.Scalar.tryHeadAsync<string, FsCodec.StreamName> log cat container (query.Select(fun x -> x.p)) ct

/// Query the items, returning the Stream name and the Snapshot as a JsonElement (Decompressed if applicable)
let projectStreamNameAndSnapshot<'I> snapExpression: Expression<Func<Item<'I>, SnAndSnap<'I>>> =
Expand All @@ -197,8 +207,9 @@ type Query<'T, 'M>(inner: Internal.Projection<'T, 'M>) =

/// Enables querying based on uncompressed Indexed values stored as secondary unfolds alongside the snapshot
[<NoComparison; NoEquality>]
type IndexContext<'I>(container, categoryName, caseName) =
type IndexContext<'I>(container, categoryName, caseName, log) =

member val Log = defaultArg log Log.Logger
member val Description = $"{categoryName}/{caseName}" with get, set
member val Container = container

Expand All @@ -218,7 +229,7 @@ type IndexContext<'I>(container, categoryName, caseName) =

/// Runs the query; yields the StreamName from the TOP 1 Item matching the criteria
member x.TryGetStreamNameWhereAsync(criteria: Expressions.Expression<Func<Index.Item<'I>, bool>>, ct) =
Index.tryGetStreamNameAsync x.Description container (x.ByCategory().Where criteria) ct
Index.tryGetStreamNameAsync x.Log container categoryName (x.ByCategory().Where criteria) ct

/// Runs the query; yields the StreamName from the TOP 1 Item matching the criteria
member x.TryGetStreamNameWhere(criteria: Expressions.Expression<Func<Index.Item<'I>, bool>>): Async<FsCodec.StreamName option> =
Expand All @@ -227,4 +238,4 @@ type IndexContext<'I>(container, categoryName, caseName) =
/// Query the items, grabbing the Stream name and the Snapshot; The StreamName and the (Decompressed if applicable) Snapshot are passed to `hydrate`
member x.QueryStreamNameAndSnapshot(query: IQueryable<Index.Item<'I>>, selectBody: Expression<Func<Index.Item<'I>, 'I>>,
hydrate: SnAndSnap<System.Text.Json.JsonElement> -> 'M) =
Internal.Projection.Create(query.Select(Index.projectStreamNameAndSnapshot<'I> selectBody), x.Description, container, hydrate) |> Query
Internal.Projection.Create(query.Select(Index.projectStreamNameAndSnapshot<'I> selectBody), categoryName, container, x.Log, hydrate) |> Query

0 comments on commit d95335e

Please sign in to comment.