Skip to content

Commit

Permalink
feat: deal monitor alert
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Nov 27, 2023
1 parent 96d4431 commit 802f54b
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 7 deletions.
3 changes: 1 addition & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"fzstd": "^0.1.0",
"multiformats": "12.0.1",
"uint8arrays": "^4.0.4",
"pretty-ms": "^8.0.0",
"p-all": "^5.0.0",
"p-retry": "^5.1.2",
"stream-read-all": "^4.0.0"
Expand Down
174 changes: 174 additions & 0 deletions packages/core/src/dealer/deal-monitor-alert-tick.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import prettyMilliseconds from 'pretty-ms'

/**
* @typedef {import('@web3-storage/data-segment').PieceLink} PieceLink
* @typedef {import('@web3-storage/filecoin-api/aggregator/api').AggregateStore} AggregatorAggregateStore
* @typedef {import('@web3-storage/filecoin-api/dealer/api').AggregateStore} DealerAggregateStore
* @typedef {import('@web3-storage/filecoin-api/dealer/api').AggregateRecord} AggregateRecord
*
* @typedef {object} MonitorContext
* @property {AggregatorAggregateStore} context.aggregatorAggregateStore
* @property {DealerAggregateStore} context.dealerAggregateStore
* @property {number} context.oldestPieceCriticalThresholdMs
* @property {number} context.oldestPieceWarnThresholdMs
* @property {number} context.aggregateMonitorThresholdMs
* @property {string} context.monitoringNotificationsEndpoint
*
* @typedef {object} Alert
* @property {PieceLink} aggregate
* @property {number} duration
* @property {string} severity
*/

/**
* On CRON tick, get aggregates without deals, and verify if there
*
* @param {MonitorContext} context
*/
export async function dealMonitorAlertTick (context) {
// Get offered deals pending approval/rejection
const offeredAggregates = await context.dealerAggregateStore.query({
status: 'offered',
})
if (offeredAggregates.error) {
return {
error: offeredAggregates.error,
}
}

// Get offered aggregates to monitor
const offeredAggregatesToMonitor = []
const currentTime = Date.now()
for (const offeredAggregate of offeredAggregates.ok) {
const offerTime = (new Date(offeredAggregate.insertedAt)).getTime()
// Monitor if offer time + monitor threshold is bigger than current time
if (offerTime + context.aggregateMonitorThresholdMs > currentTime) {
offeredAggregatesToMonitor.push(offeredAggregate)
}
}

// Get aggregates duration
const monitoredAggregatesResponse = await Promise.all(
offeredAggregatesToMonitor.map(aggregate => monitorAggregate(aggregate, context))
)
// Fail if any failed to get information
const monitoredAggregatesErrorResponse = monitoredAggregatesResponse.find(r => r?.error)
if (monitoredAggregatesErrorResponse) {
return {
error: monitoredAggregatesErrorResponse.error
}
}

const alerts = /** @typedef {Alert[]} */ ([])

// Verify if monitored aggregates should create notifications
for (const res of monitoredAggregatesResponse) {
// @ts-ignore if not ok, should have failed before
const duration = /** @type {number} */ (res.ok?.duration)
// @ts-ignore if not ok, should have failed before
const aggregate = /** @type {import('@web3-storage/data-segment').PieceLink} */ (res.ok?.aggregate)

if (duration > context.oldestPieceCriticalThresholdMs) {
alerts.push({
aggregate,
duration,
severity: 'critical'
})
} else if (duration > context.oldestPieceWarnThresholdMs) {
alerts.push({
aggregate,
duration,
severity: 'warn'
})
}
}

if (!alerts.length) {
return {
ok: {}
}
}

// Send alerts
const alertPayload = getAlertPayload(alerts)
const alertResponse = await fetch(
context.monitoringNotificationsEndpoint,
{
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(alertPayload)
}
)
if (!alertResponse.ok) {
return {
error: new Error(`failed to send alert to ${context.monitoringNotificationsEndpoint} with ${alerts.length}`)
}
}

return {
ok: {}
}
}

/**
* @param {AggregateRecord} aggregateRecord
* @param {MonitorContext} context
*/
async function monitorAggregate (aggregateRecord, context) {
const getAggregateInfo = await context.aggregatorAggregateStore.get({
aggregate: aggregateRecord.aggregate
})
if (getAggregateInfo.error) {
return {
error: getAggregateInfo.error
}
}

// Get aggregate current duration
const currentTime = Date.now()
// @ts-expect-error needs updated dep
const offerTime = (new Date(getAggregateInfo.ok.oldestPieceInsertedAt)).getTime()

return {
ok: {
aggregate: aggregateRecord.aggregate,
duration: currentTime - offerTime
}
}
}

/**
* Construct alert based on payload from Grafana Alerting.
*
* @see https://grafana.com/docs/oncall/latest/integrations/grafana-alerting/
* @see https://prometheus.io/docs/alerting/latest/notifications/#data
*
* @param {Alert[]} alerts
*/
function getAlertPayload (alerts) {
return {
alerts: alerts.map(a => ({
labels: {
aggregate: a.aggregate.toString(),
duration: prettyMilliseconds(a.duration),
severity: a.severity,
},
status: 'firing',
fingerprint: a.aggregate.toString()
})),
status: 'firing',
version: '4',
groupKey: '{}:{alertname=\\FilecoinDealDelay\\}',
receiver: 'combo',
groupLabels: {
alertname: 'FilecoinDealDelay'
},
commonLabels: {
job: 'deal-monitor-alert',
group: 'production',
alertname: 'FilecoinDealDelay'
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import * as Sentry from '@sentry/serverless'
import { Table } from 'sst/node/table'

// store clients
import { createClient as createAggregatorAggregateStoreClient } from '@w3filecoin/core/src/store/aggregator-aggregate-store.js'
import { createClient as createDealerAggregateStoreClient } from '@w3filecoin/core/src/store/dealer-aggregate-store.js'

import { dealMonitorAlertTick } from '@w3filecoin/core/src/dealer/deal-monitor-alert-tick.js'

import { mustGetEnv } from '../utils.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

async function handleEvent () {
const {
aggregatorAggregateStoreTableName,
aggregatorAggregateStoreTableRegion,
dealerAggregateStoreTableName,
dealerAggregateStoreTableRegion,
oldestPieceCriticalThresholdMs,
oldestPieceWarnThresholdMs,
aggregateMonitorThresholdMs,
monitoringNotificationsEndpoint
} = getEnv()

// stores
const aggregatorAggregateStore = createAggregatorAggregateStoreClient(
{ region: aggregatorAggregateStoreTableRegion },
{ tableName: aggregatorAggregateStoreTableName.tableName }
)
const dealerAggregateStore = createDealerAggregateStoreClient({
region: dealerAggregateStoreTableRegion
}, {
tableName: dealerAggregateStoreTableName.tableName
})

const { error } = await dealMonitorAlertTick({
aggregatorAggregateStore,
dealerAggregateStore,
oldestPieceCriticalThresholdMs,
oldestPieceWarnThresholdMs,
aggregateMonitorThresholdMs,
monitoringNotificationsEndpoint
})

if (error) {
console.error(error)
return {
statusCode: 500,
body: error.message
}
}

return {
statusCode: 200,
}
}

/**
* Get Env validating it is set.
*/
function getEnv () {
return {
aggregatorAggregateStoreTableName: Table['aggregator-aggregate-store'],
aggregatorAggregateStoreTableRegion: mustGetEnv('AWS_REGION'),
dealerAggregateStoreTableName: Table['dealer-aggregate-store'],
dealerAggregateStoreTableRegion: mustGetEnv('AWS_REGION'),
oldestPieceCriticalThresholdMs: Number.parseInt(mustGetEnv('OLDEST_PIECE_CRITICAL_THRESHOLD_MS')),
oldestPieceWarnThresholdMs: Number.parseInt(mustGetEnv('OLDEST_PIECE_WARN_THRESHOLD_MS')),
aggregateMonitorThresholdMs: Number.parseInt(mustGetEnv('AGGREGATE_MONITOR_THRESHOLD_MS')),
monitoringNotificationsEndpoint: mustGetEnv('MONITORING_NOTIFICATIONS_ENDPOINT')
}
}

export const main = Sentry.AWSLambda.wrapHandler(handleEvent)
37 changes: 35 additions & 2 deletions stacks/api-stack.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { Api, Config, use } from 'sst/constructs'
import { Api, Config, Cron, use } from 'sst/constructs'

import { DataStack } from './data-stack.js'
import { AggregatorStack } from './aggregator-stack.js'
import {
getApiPackageJson,
getGitInfo,
getCustomDomain,
getEnv,
getAggregatorEnv,
getDealerEnv,
getDealTrackerEnv,
setupSentry
setupSentry,
getResourceName
} from './config.js'

/**
Expand All @@ -30,6 +32,12 @@ export function ApiStack({ app, stack }) {
DEAL_API_HOSTED_ZONE,
DEALER_DID,
} = getDealerEnv()
const {
OLDEST_PIECE_CRITICAL_THRESHOLD_MS,
OLDEST_PIECE_WARN_THRESHOLD_MS,
AGGREGATE_MONITOR_THRESHOLD_MS,
MONITORING_NOTIFICATIONS_ENDPOINT
} = getEnv()

// Setup app monitoring with Sentry
setupSentry(app, stack)
Expand Down Expand Up @@ -174,6 +182,31 @@ export function ApiStack({ app, stack }) {
},
})

// Setup `monitoring`
// only needed for production
if (stack.stage === 'prod') {
const dealMonitorAlertCronName = getResourceName('deal-monitor-alert-cron', stack.stage)
new Cron(stack, dealMonitorAlertCronName, {
schedule: 'rate(30 minutes)',
job: {
function: {
timeout: '5 minutes',
handler: 'packages/functions/src/monitor/handle-deal-monitor-alert-cron-tick.main',
environment: {
OLDEST_PIECE_CRITICAL_THRESHOLD_MS,
OLDEST_PIECE_WARN_THRESHOLD_MS,
AGGREGATE_MONITOR_THRESHOLD_MS,
MONITORING_NOTIFICATIONS_ENDPOINT
},
bind: [
dealerAggregateStoreTable,
aggregatorAggregateStoreTable,
],
}
}
})
}

stack.addOutputs({
AggregatorApiEndpoint: api.url,
AggregatorApiCustomDomain: aggregatorApiCustomDomain ? `https://${aggregatorApiCustomDomain.domainName}` : 'Set AGGREGATOR_HOSTED_ZONE in env to deploy to a custom domain',
Expand Down
14 changes: 11 additions & 3 deletions stacks/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ import { RemovalPolicy } from 'aws-cdk-lib'
import git from 'git-rev-sync'
import * as pack from '../package.json'

export const DEFAULT_FERRY_CARGO_MAX_SIZE = 127*(1<<28)
export const DEFAULT_FERRY_CARGO_MIN_SIZE = 1+127*(1<<27)
// 72 Hours
export const DEFAULT_OLDEST_PIECE_CRITICAL_THRESHOLD_MS = String(72 * 60 * 60 * 1000)
// 60 Hours
export const DEFAULT_OLDEST_PIECE_WARN_THRESHOLD_MS = String(60 * 60 * 60 * 1000)
// 48 Hours
export const DEFAULT_AGGREGATE_MONITOR_THRESHOLD_MS = String(48 * 60 * 60 * 1000)

/**
* Get nicer resources name
Expand Down Expand Up @@ -105,7 +109,11 @@ export function setupSentry (app, stack) {
export function getEnv() {
return {
SENTRY_DSN: mustGetEnv('SENTRY_DSN'),
UCAN_LOG_URL: mustGetEnv('UCAN_LOG_URL')
UCAN_LOG_URL: mustGetEnv('UCAN_LOG_URL'),
OLDEST_PIECE_CRITICAL_THRESHOLD_MS: process.env.OLDEST_PIECE_CRITICAL_THRESHOLD_MS || DEFAULT_OLDEST_PIECE_CRITICAL_THRESHOLD_MS,
OLDEST_PIECE_WARN_THRESHOLD_MS: process.env.OLDEST_PIECE_WARN_THRESHOLD_MS || DEFAULT_OLDEST_PIECE_WARN_THRESHOLD_MS,
AGGREGATE_MONITOR_THRESHOLD_MS: process.env.AGGREGATE_MONITOR_THRESHOLD_MS || DEFAULT_AGGREGATE_MONITOR_THRESHOLD_MS,
MONITORING_NOTIFICATIONS_ENDPOINT: mustGetEnv('MONITORING_NOTIFICATIONS_ENDPOINT')
}
}

Expand Down

0 comments on commit 802f54b

Please sign in to comment.