Skip to content

Commit

Permalink
Polish Rate Limit implementation #2
Browse files Browse the repository at this point in the history
  • Loading branch information
krebernisak authored and justinkaseman committed Mar 22, 2021
1 parent 9337de4 commit ff67798
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 91 deletions.
22 changes: 5 additions & 17 deletions bootstrap/src/lib/rate-limit/actions.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,16 @@
import { AdapterRequest } from '@chainlink/types'
import { createAction, nanoid } from '@reduxjs/toolkit'
import { getParticipantId } from './util'
import { createAction } from '@reduxjs/toolkit'
import { ActionBase, toActionPayload } from '../store'

const DEFAULT_COST = 1

const toActionPayload = <A extends ActionBase>(data: any): A => ({
id: nanoid(),
createdAt: new Date().toISOString(),
...data,
})

export interface ActionBase {
id: string
createdAt: string
}

export interface RequestObservedPayload extends ActionBase {
requestId: string
typeId: string
cost: number
}

export const requestObserved = createAction(
'RL/REQUEST_OBSERVED',
(input: AdapterRequest, cost = DEFAULT_COST) => ({
payload: toActionPayload({ requestId: getParticipantId(input), cost }),
(typeId: string, cost = DEFAULT_COST) => ({
payload: toActionPayload({ typeId, cost }),
}),
)
72 changes: 49 additions & 23 deletions bootstrap/src/lib/rate-limit/index.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,55 @@
import hash from 'object-hash'
import { AdapterRequest } from '@chainlink/types'
import { Middleware } from '../..'
import { requestObserved } from './actions'
import rootReducer, { Heartbeat, IntervalNames, Intervals } from './reducer'
import rootReducer, {
Heartbeat,
Heartbeats,
IntervalNames,
Intervals,
selectObserved,
} from './reducer'
import { configureStore } from './store'
import { getParticipantId, getMaxReqAllowed, getParticipantCost } from './util'
import * as config from './config'
import { WARMUP_REQUEST_ID } from '../cache-warmer/config'

export * as actions from './actions'
export const { store } = configureStore(rootReducer)

const { store } = configureStore(rootReducer)
export const computeThroughput = (
state: Heartbeats,
interval: IntervalNames,
id: string,
): number => {
// All observed in interval
const observedRequests = selectObserved(state, interval)
const throughput = observedRequests.length + 1
const cost = getAverageCost(observedRequests) || 1
// All of type observed in interval
const observedRequestsOfType = selectObserved(state, interval, id)
const throughputOfType = observedRequestsOfType.length + 1
const costOfType = getAverageCost(observedRequestsOfType) || 1
// Compute max throughput by weight
const weight = (throughputOfType * costOfType) / (throughput * cost)
return maxThroughput(weight)
}

export const getThroughput = (interval: IntervalNames, input?: AdapterRequest): Heartbeat[] => {
const { heartbeats } = store.getState()
const getAverageCost = (requests: Heartbeat[]): number => {
if (!requests || requests.length === 0) return 0
return requests.reduce((totalCost, h) => totalCost + h.cost, 0) / requests.length
}

if (input) {
const participantId = getParticipantId(input)
return heartbeats.participants[participantId]?.[interval] || []
}
return heartbeats.total[interval] || []
const maxThroughput = (weight: number): number => {
const maxAllowedCapacity = 0.9 * config.get().totalCapacity // Interval.Minute
return weight * maxAllowedCapacity
}

/**
* Returns hash of the input request payload excluding some volatile paths
*
* @param request payload
*/
const makeId = (request: AdapterRequest): string => hash(request, config.get().hashOpts)

/**
* Calculate maxAge to keep the item cached so we allow the specified throughput.
*
Expand All @@ -29,18 +60,13 @@ const maxAgeFor = (throughput: number, interval: number) =>
throughput <= 0 ? interval : Math.floor(interval / throughput)

export const withRateLimit: Middleware = async (execute) => async (input) => {
const totalReqPerMin = getThroughput(IntervalNames.MINUTE)
const participantReqPerMin = getThroughput(IntervalNames.MINUTE, input)
const cost = getParticipantCost(participantReqPerMin)
const maxReqPerMin = getMaxReqAllowed(
totalReqPerMin.length + 1,
participantReqPerMin.length + 1,
cost,
)
const maxAge = maxAgeFor(maxReqPerMin, Intervals[IntervalNames.MINUTE])
const state = store.getState()
const { heartbeats } = state
const requestTypeId = makeId(input)
const maxThroughput = computeThroughput(heartbeats, IntervalNames.MINUTE, requestTypeId)
const maxAge = maxAgeFor(maxThroughput, Intervals[IntervalNames.MINUTE])
const result = await execute({ ...input, data: { ...input.data, maxAge } })
store.dispatch(requestObserved(input, result.data.cost))
if (input.id !== WARMUP_REQUEST_ID)
store.dispatch(requestObserved(makeId(input), result.data.cost))
return result
}

export { store }
31 changes: 18 additions & 13 deletions bootstrap/src/lib/rate-limit/reducer.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import { createReducer, combineReducers } from '@reduxjs/toolkit'
import { requestObserved } from './actions'

export interface Interval {
[key: string]: number
}

export enum IntervalNames {
SEC = 'SEC',
MINUTE = 'MINUTE',
HOUR = 'HOUR',
DAY = 'DAY',
}

export const Intervals: Interval = {
export const Intervals: { [key: string]: number } = {
[IntervalNames.SEC]: 1000,
[IntervalNames.MINUTE]: 60 * 1000,
[IntervalNames.HOUR]: 60 * 60 * 1000,
Expand All @@ -25,6 +21,10 @@ export interface Heartbeat {
}

export interface StateTree {
heartbeats: Heartbeats
}

export interface Heartbeats {
total: {
[interval: string]: Heartbeat[]
}
Expand All @@ -35,31 +35,30 @@ export interface StateTree {
}
}

const initialTimeWindowsState = () => ({
const initialIntervalsState = () => ({
SEC: [],
MINUTE: [],
HOUR: [],
DAY: [],
})

const initialState: StateTree = {
total: initialTimeWindowsState(),
const initialHeartbeatsState: Heartbeats = {
total: initialIntervalsState(),
participants: {},
}

const heartbeatReducer = createReducer<StateTree>(initialState, (builder) => {
const heartbeatReducer = createReducer<Heartbeats>(initialHeartbeatsState, (builder) => {
builder.addCase(requestObserved, (state, action) => {
const heartbeat: Heartbeat = {
id: action.payload.requestId,
cost: action.payload.cost,
timestamp: Date.parse(action.payload.createdAt),
}

if (!state.participants[heartbeat.id]) {
state.participants[heartbeat.id] = initialTimeWindowsState()
}

const { id } = heartbeat
// Init if first time seeing this id
if (!state.participants[id]) state.participants[id] = initialIntervalsState()

for (const [intervalName, interval] of Object.entries(Intervals)) {
state.total[intervalName].push(heartbeat)
state.participants[id][intervalName].push(heartbeat)
Expand All @@ -75,6 +74,12 @@ const heartbeatReducer = createReducer<StateTree>(initialState, (builder) => {
})
})

export const selectObserved = (
state: Heartbeats,
interval: IntervalNames,
id?: string,
): Heartbeat[] => (id ? state.participants[id]?.[interval] || [] : state.total[interval] || [])

export default combineReducers({
heartbeats: heartbeatReducer,
})
38 changes: 0 additions & 38 deletions bootstrap/src/lib/rate-limit/util.ts

This file was deleted.

12 changes: 12 additions & 0 deletions bootstrap/src/lib/store/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { nanoid } from '@reduxjs/toolkit'

export const toActionPayload = <A extends ActionBase>(data: any): A => ({
id: nanoid(),
createdAt: new Date().toISOString(),
...data,
})

export interface ActionBase {
id: string
createdAt: string
}

0 comments on commit ff67798

Please sign in to comment.