diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index e7d46c8ea..7147feead 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -210,7 +210,7 @@ module Log = | Delete of Measurement /// Trimmed the Tip | Trim of Measurement - /// Queried via the Index + /// Queried via the Index; count=-1 -> aggregate operation | Index of Measurement let [] (|MetricEvent|_|) (logEvent: Serilog.Events.LogEvent): Metric voption = let mutable p = Unchecked.defaultof<_> diff --git a/src/Equinox.CosmosStore/CosmosStoreLinq.fs b/src/Equinox.CosmosStore/CosmosStoreLinq.fs index 453b90f3a..1fcf6fbc8 100644 --- a/src/Equinox.CosmosStore/CosmosStoreLinq.fs +++ b/src/Equinox.CosmosStore/CosmosStoreLinq.fs @@ -1,6 +1,7 @@ namespace Equinox.CosmosStore.Linq -open Equinox.Core.Infrastructure +open Equinox.Core +open Equinox.CosmosStore.Core // Log, JsonCompressedBase64Converter open FSharp.Control // taskSeq open Serilog open System @@ -76,8 +77,8 @@ module Internal = let m = response.Diagnostics.GetQueryMetrics().CumulativeMetrics yield struct (response.Diagnostics.GetClientElapsedTime(), response.RequestCharge, response.Resource, int m.RetrievedDocumentCount, int m.RetrievedDocumentSize, int m.OutputDocumentSize) } - let toAsyncEnum<'T> (desc: string) (iterator: FeedIterator<'T>) = taskSeq { - let sw = System.Diagnostics.Stopwatch.StartNew() + let [] toAsyncEnum<'T> log (container: Container) cat (iterator: FeedIterator<'T>) = taskSeq { + let startTicks = System.Diagnostics.Stopwatch.GetTimestamp() use _ = iterator let mutable responses, items, totalRtt, totalRu, totalRdc, totalRds, totalOds = 0, 0, TimeSpan.Zero, 0., 0, 0, 0 try for rtt, rc, response, rdc, rds, ods in enum_ iterator do @@ -90,44 +91,53 @@ module Internal = for item in response do items <- items + 1 yield item - finally Log.Information("CosmosStoreQuery.enum {desc} {count} ({trips}r {totalRtt:f0}ms; {rdc}i {rds:f2}>{ods:f2} MiB) {rc:f2} RU {latency} ms", - desc, items, responses, totalRtt.TotalMilliseconds, totalRdc, miB totalRds, miB totalOds, totalRu, sw.ElapsedMilliseconds) } + finally + let interval = StopwatchInterval(startTicks, System.Diagnostics.Stopwatch.GetTimestamp()) + let log = let evt = Log.Metric.Index { database = container.Database.Id; container = container.Id; stream = cat + FsCodec.StreamName.Category.SeparatorStr + interval = interval; bytes = totalOds; count = items; ru = totalRu } in log |> Log.event evt + log.Information("EqxCosmos {action:l} {count} ({trips}r {totalRtt:f0}ms; {rdc}i {rds:f2}>{ods:f2} MiB) {rc:f2} RU {latency} ms", + "Index", items, responses, totalRtt.TotalMilliseconds, totalRdc, miB totalRds, miB totalOds, totalRu, interval.ElapsedMilliseconds) } /// Runs a query that renders 'T, Hydrating the results as 'P (can be the same types but e.g. you might want to map an object to a JsonElement etc) - let enum<'T, 'P> desc (container: Container) (query: IQueryable<'T>): TaskSeq<'P> = + let enum<'T, 'P> (log: ILogger) (container: Container) cat (query: IQueryable<'T>): TaskSeq<'P> = let queryDefinition = query.ToQueryDefinition() - if Log.IsEnabled Serilog.Events.LogEventLevel.Debug then Log.Debug("CosmosStoreQuery.query {desc} {query}", desc, queryDefinition.QueryText) - container.GetItemQueryIterator<'P>(queryDefinition) |> toAsyncEnum<'P> desc + if log.IsEnabled Serilog.Events.LogEventLevel.Debug then log.Debug("CosmosStoreQuery.query {cat} {query}", cat, queryDefinition.QueryText) + container.GetItemQueryIterator<'P>(queryDefinition) |> toAsyncEnum<'P> log container cat module AggregateOp = /// Runs one of the typical Cosmos SDK extensions, e.g. CountAsync, logging the costs - let exec (desc: string) (query: IQueryable<'T>) run render: System.Threading.Tasks.Task<'R> = task { - if Log.IsEnabled Serilog.Events.LogEventLevel.Debug then Log.Debug("CosmosStoreQuery.count {desc} {query}", desc, query.ToQueryDefinition().QueryText) - let sw = System.Diagnostics.Stopwatch.StartNew() + let [] exec (log: ILogger) (container: Container) (op: string) (cat: string) (query: IQueryable<'T>) run render: System.Threading.Tasks.Task<'R> = task { + let startTicks = System.Diagnostics.Stopwatch.GetTimestamp() let! (rsp: Response<'R>) = run query let res = rsp.Resource let summary = render res let m = rsp.Diagnostics.GetQueryMetrics().CumulativeMetrics - Log.Information("CosmosStoreQuery.count {desc} {count} ({rdc}i {rds:f2}>{ods:f2} MiB) {rc} RU {latency} ms", - desc, summary, m.RetrievedDocumentCount, miB m.RetrievedDocumentSize, miB m.OutputDocumentSize, rsp.RequestCharge, sw.ElapsedMilliseconds) + let interval = StopwatchInterval(startTicks, System.Diagnostics.Stopwatch.GetTimestamp()) + let totalOds, totalRu = m.OutputDocumentSize, rsp.RequestCharge + let log = let evt = Log.Metric.Index { database = container.Database.Id; container = container.Id; stream = cat + FsCodec.StreamName.Category.SeparatorStr + interval = interval; bytes = int totalOds; count = -1; ru = totalRu } in log |> Log.event evt + log.Information("EqxCosmos {action:l} {cat} {count} ({rdc}i {rds:f2}>{ods:f2} MiB) {rc} RU {latency} ms", + op, cat, summary, m.RetrievedDocumentCount, miB m.RetrievedDocumentSize, miB totalOds, totalRu, interval.ElapsedMilliseconds) return res } /// Runs query.CountAsync, with instrumentation equivalent to what query provides - let countAsync desc (query: IQueryable<'T>) ct = - exec desc query (_.CountAsync(ct)) id + let countAsync (log: ILogger) container cat (query: IQueryable<'T>) ct = + if log.IsEnabled Serilog.Events.LogEventLevel.Debug then log.Debug("CosmosStoreQuery.count {cat} {query}", cat, query.ToQueryDefinition().QueryText) + exec log container "count" cat query (_.CountAsync(ct)) id module Scalar = /// Generates a TOP 1 SQL query let top1 (query: IQueryable<'T>) = query.Take(1) /// Handles a query that's expected to yield 0 or 1 result item - let tryHeadAsync<'T, 'R> desc (container: Container) (query: IQueryable<'T>) (_ct: CancellationToken): Task<'R option> = + let tryHeadAsync<'T, 'R> (log: ILogger) (container: Container) cat (query: IQueryable<'T>) (_ct: CancellationToken): Task<'R option> = let queryDefinition = (top1 query).ToQueryDefinition() - if Log.IsEnabled Serilog.Events.LogEventLevel.Debug then Log.Debug("CosmosStoreQuery.tryScalar {desc} {query}", desc, queryDefinition.QueryText) - container.GetItemQueryIterator<'R>(queryDefinition) |> Query.toAsyncEnum desc |> TaskSeq.tryHead - type Projection<'T, 'M>(query, description, container, enum: IQueryable<'T> -> TaskSeq<'M>) = - static member Create<'P>(q, d, c, hydrate: 'P -> 'M) = Projection<'T, 'M>(q, d, c, Query.enum<'T, 'P> d c >> TaskSeq.map hydrate) + if log.IsEnabled Serilog.Events.LogEventLevel.Debug then log.Debug("CosmosStoreQuery.tryScalar {cat} {query}", queryDefinition.QueryText) + container.GetItemQueryIterator<'R>(queryDefinition) |> Query.toAsyncEnum log container cat |> TaskSeq.tryHead + type Projection<'T, 'M>(query, category, container, enum: IQueryable<'T> -> TaskSeq<'M>, count: IQueryable<'T> -> CancellationToken -> Task) = + static member Create<'P>(q, cat, c, log, hydrate: 'P -> 'M) = + Projection<'T, 'M>(q, cat, c, Query.enum<'T, 'P> log c cat >> TaskSeq.map hydrate, AggregateOp.countAsync log c cat) member _.Enum: TaskSeq<'M> = query |> enum member x.EnumPage(skip, take): TaskSeq<'M> = query |> Query.offsetLimit (skip, take) |> enum - member _.CountAsync: CancellationToken -> Task = query |> AggregateOp.countAsync description + member _.CountAsync: CancellationToken -> Task = query |> count [] member val Query: IQueryable<'T> = query - [] member val Description: string = description + [] member val Category: string = category [] member val Container: Container = container // We want to generate a projection statement of the shape: VALUE {"sn": root["p"], "snap": root["u"][0].["d"]} @@ -138,7 +148,7 @@ module Internal = // This hack is based on https://stackoverflow.com/a/73506241/11635 type SnAndSnap<'I>() = member val sn: FsCodec.StreamName = Unchecked.defaultof<_> with get, set - [)>] + [)>] member val snap: 'I = Unchecked.defaultof<_> with get, set static member CreateItemQueryLambda<'T>(snExpression: Expression -> MemberExpression, snapExpression: Expression>) = let param = Expression.Parameter(typeof<'T>, "x") @@ -178,8 +188,8 @@ module Index = container.GetItemLinqQueryable>().Where(fun d -> d.p.StartsWith(prefix) && d.u[0].c = caseName) /// Returns the StreamName (from the `p` field) for a 0/1 item query; only the TOP 1 item is returned - let tryGetStreamNameAsync description container (query: IQueryable>) ct = - Internal.Scalar.tryHeadAsync description container (query.Select(fun x -> x.p)) ct + let tryGetStreamNameAsync log cat container (query: IQueryable>) ct = + Internal.Scalar.tryHeadAsync log cat container (query.Select(fun x -> x.p)) ct /// Query the items, returning the Stream name and the Snapshot as a JsonElement (Decompressed if applicable) let projectStreamNameAndSnapshot<'I> snapExpression: Expression, SnAndSnap<'I>>> = @@ -197,8 +207,9 @@ type Query<'T, 'M>(inner: Internal.Projection<'T, 'M>) = /// Enables querying based on uncompressed Indexed values stored as secondary unfolds alongside the snapshot [] -type IndexContext<'I>(container, categoryName, caseName) = +type IndexContext<'I>(container, categoryName, caseName, log) = + member val Log = defaultArg log Log.Logger member val Description = $"{categoryName}/{caseName}" with get, set member val Container = container @@ -218,7 +229,7 @@ type IndexContext<'I>(container, categoryName, caseName) = /// Runs the query; yields the StreamName from the TOP 1 Item matching the criteria member x.TryGetStreamNameWhereAsync(criteria: Expressions.Expression, bool>>, ct) = - Index.tryGetStreamNameAsync x.Description container (x.ByCategory().Where criteria) ct + Index.tryGetStreamNameAsync x.Log container categoryName (x.ByCategory().Where criteria) ct /// Runs the query; yields the StreamName from the TOP 1 Item matching the criteria member x.TryGetStreamNameWhere(criteria: Expressions.Expression, bool>>): Async = @@ -227,4 +238,4 @@ type IndexContext<'I>(container, categoryName, caseName) = /// Query the items, grabbing the Stream name and the Snapshot; The StreamName and the (Decompressed if applicable) Snapshot are passed to `hydrate` member x.QueryStreamNameAndSnapshot(query: IQueryable>, selectBody: Expression, 'I>>, hydrate: SnAndSnap -> 'M) = - Internal.Projection.Create(query.Select(Index.projectStreamNameAndSnapshot<'I> selectBody), x.Description, container, hydrate) |> Query + Internal.Projection.Create(query.Select(Index.projectStreamNameAndSnapshot<'I> selectBody), categoryName, container, x.Log, hydrate) |> Query