Skip to content

Commit

Permalink
API.shutdown now forces final harvest on new aggregators when configu…
Browse files Browse the repository at this point in the history
…red to.
  • Loading branch information
michaelgoin committed Oct 8, 2019
1 parent ac92ad4 commit 27eaf71
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 136 deletions.
4 changes: 2 additions & 2 deletions api.js
Original file line number Diff line number Diff line change
Expand Up @@ -1405,7 +1405,7 @@ function _doShutdown(api, options, callback) {
}

agent.on('started', function shutdownHarvest() {
agent.harvest(afterHarvest)
agent.forceHarvestAll(afterHarvest)
})
agent.on('errored', function logShutdownError(error) {
agent.stop(callback)
Expand All @@ -1417,7 +1417,7 @@ function _doShutdown(api, options, callback) {
}
})
} else if (options.collectPendingData) {
agent.harvest(afterHarvest)
agent.forceHarvestAll(afterHarvest)
} else {
agent.stop(callback)
}
Expand Down
247 changes: 129 additions & 118 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -264,124 +264,7 @@ Agent.prototype.start = function start(callback) {
setTimeout(function afterTimeout() {
logger.info(`Starting initial ${INITIAL_HARVEST_DELAY_MS}ms harvest.`)

const promises = []

const metricPromise = new Promise((resolve) => {
agent.metrics.once(
'finished metric_data data send.',
function onMetricsFinished() {
resolve()
}
)
agent.metrics.send()
})

promises.push(metricPromise)

// TODO: plumb config through to aggregators so they can do their own checking.
if (agent.config.distributed_tracing.enabled &&
agent.config.span_events.enabled) {
const spanPromise = new Promise((resolve) => {
agent.spanEventAggregator.once(
'finished span_event_data data send.',
function onSpansFinished() {
resolve()
}
)
agent.spanEventAggregator.send()
})

promises.push(spanPromise)
}

if (agent.config.custom_insights_events.enabled) {
const customEventPromise = new Promise((resolve) => {
agent.customEventAggregator.once(
'finished custom_event_data data send.',
function onCustomEventsFinished() {
resolve()
}
)
agent.customEventAggregator.send()
})

promises.push(customEventPromise)
}

if (agent.config.transaction_events.enabled) {
const transactionEventPromise = new Promise((resolve) => {
agent.transactionEventAggregator.once(
'finished analytic_event_data data send.',
function onTransactionEventsFinished() {
resolve()
}
)
agent.transactionEventAggregator.send()
})

promises.push(transactionEventPromise)
}

if (agent.config.transaction_tracer.enabled && agent.config.collect_traces) {
const transactionTracePromise = new Promise((resolve) => {
agent.traces.once(
'finished transaction_sample_data data send.',
function onTracesFinished() {
resolve()
}
)
agent.traces.send()
})

promises.push(transactionTracePromise)
}

if (agent.config.slow_sql.enabled) {
const sqlTracePromise = new Promise((resolve) => {
agent.queries.once(
'finished sql_trace_data data send.',
function onSqlTracesFinished() {
resolve()
}
)
agent.queries.send()
})

promises.push(sqlTracePromise)
}

const errorCollectorEnabled =
agent.config.error_collector && agent.config.error_collector.enabled

if (errorCollectorEnabled && agent.config.collect_errors) {
const errorTracePromise = new Promise((resolve) => {
agent.errors.traceAggregator.once(
'finished error_data data send.',
function onErrorTracesFinished() {
resolve()
}
)
agent.errors.traceAggregator.send()
})

promises.push(errorTracePromise)
}

if (errorCollectorEnabled && agent.config.error_collector.capture_events) {
const errorEventPromise = new Promise((resolve) => {
agent.errors.eventAggregator.once(
'finished error_event_data data send.',
function onErrorEventsFinished() {
resolve()
}
)
agent.errors.eventAggregator.send()
})

promises.push(errorEventPromise)
}

Promise.all(promises).then(function afterAllAggregatorsSend() {
agent.forceHarvestAll(function afterAllAggregatorsSend() {
agent.startAggregators()
callback(null, config)
})
Expand All @@ -393,6 +276,134 @@ Agent.prototype.start = function start(callback) {
})
}

/**
* Forces all aggregators to send the data collected.
* @param {Function} callback The callback to invoke when all data types have been sent.
*/
Agent.prototype.forceHarvestAll = function forceHarvestAll(callback) {
const agent = this
const promises = []

const metricPromise = new Promise((resolve) => {
agent.metrics.once(
'finished metric_data data send.',
function onMetricsFinished() {
resolve()
}
)
agent.metrics.send()
})

promises.push(metricPromise)

// TODO: plumb config through to aggregators so they can do their own checking.
if (agent.config.distributed_tracing.enabled &&
agent.config.span_events.enabled) {
const spanPromise = new Promise((resolve) => {
agent.spanEventAggregator.once(
'finished span_event_data data send.',
function onSpansFinished() {
resolve()
}
)
agent.spanEventAggregator.send()
})

promises.push(spanPromise)
}

if (agent.config.custom_insights_events.enabled) {
const customEventPromise = new Promise((resolve) => {
agent.customEventAggregator.once(
'finished custom_event_data data send.',
function onCustomEventsFinished() {
resolve()
}
)
agent.customEventAggregator.send()
})

promises.push(customEventPromise)
}

if (agent.config.transaction_events.enabled) {
const transactionEventPromise = new Promise((resolve) => {
agent.transactionEventAggregator.once(
'finished analytic_event_data data send.',
function onTransactionEventsFinished() {
resolve()
}
)
agent.transactionEventAggregator.send()
})

promises.push(transactionEventPromise)
}

if (agent.config.transaction_tracer.enabled && agent.config.collect_traces) {
const transactionTracePromise = new Promise((resolve) => {
agent.traces.once(
'finished transaction_sample_data data send.',
function onTracesFinished() {
resolve()
}
)
agent.traces.send()
})

promises.push(transactionTracePromise)
}

if (agent.config.slow_sql.enabled) {
const sqlTracePromise = new Promise((resolve) => {
agent.queries.once(
'finished sql_trace_data data send.',
function onSqlTracesFinished() {
resolve()
}
)
agent.queries.send()
})

promises.push(sqlTracePromise)
}

const errorCollectorEnabled =
agent.config.error_collector && agent.config.error_collector.enabled

if (errorCollectorEnabled && agent.config.collect_errors) {
const errorTracePromise = new Promise((resolve) => {
agent.errors.traceAggregator.once(
'finished error_data data send.',
function onErrorTracesFinished() {
resolve()
}
)
agent.errors.traceAggregator.send()
})

promises.push(errorTracePromise)
}

if (errorCollectorEnabled && agent.config.error_collector.capture_events) {
const errorEventPromise = new Promise((resolve) => {
agent.errors.eventAggregator.once(
'finished error_event_data data send.',
function onErrorEventsFinished() {
resolve()
}
)
agent.errors.eventAggregator.send()
})

promises.push(errorEventPromise)
}

Promise.all(promises).then(() => {
callback()
})
}

Agent.prototype.stopAggregators = function stopAggregators() {
this.metrics.stop()
this.errors.stop()
Expand Down
32 changes: 16 additions & 16 deletions test/unit/api/api.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1414,27 +1414,27 @@ describe('the New Relic agent API', function() {
})

describe('when `options.collectPendingData` is `true`', () => {
it('calls harvest when state is `started`', () => {
it('calls forceHarvestAll when state is `started`', () => {
var mock = sinon.mock(agent)
agent.setState('started')
mock.expects('harvest').once()
mock.expects('forceHarvestAll').once()
api.shutdown({collectPendingData: true})
mock.verify()
})

it('calls harvest when state is not "started" and changes to "started"', () => {
it('calls forceHarvestAll when state changes to "started"', () => {
var mock = sinon.mock(agent)
agent.setState('starting')
mock.expects('harvest').once()
mock.expects('forceHarvestAll').once()
api.shutdown({collectPendingData: true})
agent.setState('started')
mock.verify()
})

it('does not call harvest when state is not "started" and not changed', () => {
it('does not call forceHarvestAll when state is not "started"', () => {
var mock = sinon.mock(agent)
agent.setState('starting')
mock.expects('harvest').never()
mock.expects('forceHarvestAll').never()
api.shutdown({collectPendingData: true})
mock.verify()
})
Expand Down Expand Up @@ -1487,29 +1487,29 @@ describe('the New Relic agent API', function() {
})
})

it('calls harvest when a timeout is given and not reached', function() {
it('calls forceHarvestAll when a timeout is given and not reached', function() {
var mock = sinon.mock(agent)
agent.setState('starting')
mock.expects('harvest').once()
mock.expects('forceHarvestAll').once()
api.shutdown({collectPendingData: true, timeout: 1000})
agent.setState('started')
mock.verify()
})

it('calls stop when timeout is reached and does not harvest', function() {
it('calls stop when timeout is reached and does not forceHarvestAll', function() {
var mock = sinon.mock(agent)
agent.setState('starting')
mock.expects('harvest').never()
mock.expects('forceHarvestAll').never()
mock.expects('stop').once()
api.shutdown({collectPendingData: true, timeout: 1000}, function() {
mock.verify()
})
})

it('calls harvest when timeout is not a number', function() {
it('calls forceHarvestAll when timeout is not a number', function() {
var mock = sinon.mock(agent)
agent.setState('starting')
mock.expects('harvest').once()
mock.expects('forceHarvestAll').once()
api.shutdown({collectPendingData: true, timeout: "xyz"}, function() {
mock.verify()
})
Expand All @@ -1530,8 +1530,8 @@ describe('the New Relic agent API', function() {
it('calls stop after harvest', function() {
var mock = sinon.mock(agent)

agent.harvest = function(cb) {
process.nextTick(cb)
agent.forceHarvestAll = function(cb) {
setImmediate(cb)
}

mock.expects('stop').once()
Expand All @@ -1543,8 +1543,8 @@ describe('the New Relic agent API', function() {
it('calls stop when harvest errors', function() {
var mock = sinon.mock(agent)

agent.harvest = function(cb) {
process.nextTick(function() {
agent.forceHarvestAll = function(cb) {
setImmediate(function() {
cb(new Error('some error'))
})
}
Expand Down

0 comments on commit 27eaf71

Please sign in to comment.