Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: unsubscription metric happens only if the subs was active #493

Merged
merged 1 commit into from
May 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions packages/core/bootstrap/src/lib/ws/epics.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Execute } from '@chainlink/types'
import { AnyAction } from 'redux'
import { combineEpics, createEpicMiddleware, Epic } from 'redux-observable'
import { merge, Subject, of, race, Observable } from 'rxjs'
import { merge, Subject, of, race, Observable, EMPTY } from 'rxjs'
import {
catchError,
delay,
Expand Down Expand Up @@ -122,7 +122,7 @@ export const connectEpic: Epic<AnyAction, AnyAction, any, any> = (action$, state
() => wsHandler.unsubscribe(payload.input),
(message) => {
/**
* If the error in on the subscription, next requests will try to subscribe
* If the error happens on the subscription, it will be on subscribing state and eventually unresponsiveTimeout will take care of it (unsubs/subs)
* If the error happens during a subscription, and is only eventual, can be ignored
* If the error happens during a subscription, and the subscription stop receiving messages, the unresponsiveTimeout will take care of it (unsubs/subs)
*/
Expand Down Expand Up @@ -244,11 +244,14 @@ export const connectEpic: Epic<AnyAction, AnyAction, any, any> = (action$, state
const timeout$ = of(subscriptionError({ ...action, reason: 'WS: unsubscribe -> subscribe (unresponsive channel)' }), unsubscribe(action), subscribe(action)).pipe(
delay(config.subscriptionUnresponsiveTTL),
withLatestFrom(state$),
map(([action, state]) => ({ ...action, isActive: state.ws.subscriptions[subscriptionKey]?.active })),
// Filters by active subscription.
// The timeout could think we don't receive messages because of unresponsiveness, and it's actually unsubscribed
filter(({ isActive }) => isActive),
tap(() => logger.info('WS: unsubscribe -> subscribe (unresponsive channel)', { payload: action.subscriptionMsg })),
// isSubscribing is considered too as we want to trigger an unsubscription from a hung channel
mergeMap(([action, state]) => {
const isActive = !!state.ws.subscriptions[subscriptionKey]?.active
const isSubscribing = !!(state.ws.subscriptions[subscriptionKey]?.subscribing > 0)
return isActive || isSubscribing ? of(action) : EMPTY
krebernisak marked this conversation as resolved.
Show resolved Hide resolved
})
)

return race(reset$, timeout$).pipe(filter((a) => !messageReceived.match(a)))
Expand Down Expand Up @@ -334,10 +337,13 @@ export const metricsEpic: Epic<AnyAction, AnyAction, any, any> = (action$, state
ws_subscription_errors.labels(subscriptionErrorLabels(action.payload)).inc()
logger.error('WS: subscription error', { payload: action.payload })
break
case unsubscribed.type:
ws_subscription_active.labels(subscriptionLabels(action.payload)).dec()
logger.info('WS: unsubscribed', { payload: action.payload })
case unsubscribed.type: {
if (state.ws.subscriptions[getSubsId(action.payload.subscriptionMsg)]?.wasEverActive) {
ws_subscription_active.labels(subscriptionLabels(action.payload)).dec()
logger.info('WS: unsubscribed', { payload: action.payload })
}
break
}
case messageReceived.type:
ws_message_total.labels(messageLabels(action.payload)).inc()
break
Expand Down
8 changes: 7 additions & 1 deletion packages/core/bootstrap/src/lib/ws/reducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ export interface SubscriptionsState {
/** Map of all subscriptions by key */
[key: string]: {
active: boolean
wasEverActive?: boolean
unsubscribed?: boolean
subscribing: number
input: AdapterRequest
}
Expand All @@ -75,6 +77,8 @@ export const subscriptionsReducer = createReducer<SubscriptionsState>(
const key = getSubsId(action.payload.subscriptionMsg)
state[key] = {
active: true,
wasEverActive: true,
unsubscribed: false,
subscribing: 0,
input: { ...action.payload.input },
}
Expand All @@ -96,7 +100,9 @@ export const subscriptionsReducer = createReducer<SubscriptionsState>(
builder.addCase(actions.unsubscribed, (state, action) => {
// Remove subscription
const key = getSubsId(action.payload.subscriptionMsg)
delete state[key]

state[key].active = false
state[key].unsubscribed = true
})

builder.addCase(actions.disconnected, (state) => {
Expand Down