-
Notifications
You must be signed in to change notification settings - Fork 135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Making UDS error handling and recovery more robust. #189
Changes from all commits
8d7fe2b
5eb2e19
700fb83
ccc83e4
eef577c
9db6aa0
46da650
5d194ab
a8845b2
23b6aaa
572cb0c
0fc3a68
7b36227
7cf8ad8
1e364f5
15e9eb1
f3d3fd6
884c980
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,35 @@ | ||
const os = require('os'), | ||
process = require('process'); | ||
|
||
exports.PROTOCOL = { | ||
TCP: 'tcp', | ||
UDS: 'uds', | ||
UDP: 'udp', | ||
STREAM: 'stream' | ||
}; | ||
|
||
/** | ||
* Determines error codes that signify a connection to a Unix Domain Socket (UDS) | ||
* has failed in a way that can be retried. This codes are OS-specific. | ||
* @returns {number[]} An array of the error codes. | ||
*/ | ||
function udsErrors() { | ||
if (process.platform === 'linux') { | ||
return [ | ||
os.constants.errno.ENOTCONN, | ||
os.constants.errno.ECONNREFUSED, | ||
]; | ||
} | ||
|
||
if (process.platform === 'darwin') { | ||
return [ | ||
os.constants.errno.EDESTADDRREQ, | ||
os.constants.errno.ECONNRESET, | ||
]; | ||
} | ||
|
||
// Unknown / not yet implemented | ||
return []; | ||
} | ||
|
||
exports.udsErrors = udsErrors; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,13 @@ | ||
const util = require('util'), | ||
const process = require('process'), | ||
util = require('util'), | ||
helpers = require('./helpers'), | ||
applyStatsFns = require('./statsFunctions'); | ||
|
||
const { PROTOCOL } = require('./constants'); | ||
const constants = require('./constants'); | ||
const createTransport = require('./transport'); | ||
|
||
const PROTOCOL = constants.PROTOCOL; | ||
const UDS_ERROR_CODES = constants.udsErrors(); | ||
const UDS_DEFAULT_GRACEFUL_RESTART_LIMIT = 1000; | ||
const CACHE_DNS_TTL_DEFAULT = 60000; | ||
|
||
|
@@ -48,6 +51,8 @@ const Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, | |
this.cacheDnsTtl = options.cacheDnsTtl || CACHE_DNS_TTL_DEFAULT; | ||
this.host = options.host || process.env.DD_AGENT_HOST; | ||
this.port = options.port || parseInt(process.env.DD_DOGSTATSD_PORT, 10) || 8125; | ||
this.path = options.path; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's fine, and you shouldn't add these to TypeScript definitions, others ones aren't mentioned in there like this (I don't think) |
||
this.stream = options.stream; | ||
this.prefix = options.prefix || ''; | ||
this.suffix = options.suffix || ''; | ||
this.tagPrefix = options.tagPrefix || '#'; | ||
|
@@ -68,6 +73,8 @@ const Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, | |
this.bufferHolder = options.isChild ? options.bufferHolder : { buffer: '' }; | ||
this.errorHandler = options.errorHandler; | ||
this.udsGracefulErrorHandling = 'udsGracefulErrorHandling' in options ? options.udsGracefulErrorHandling : true; | ||
this.udsGracefulRestartRateLimit = options.udsGracefulRestartRateLimit || UDS_DEFAULT_GRACEFUL_RESTART_LIMIT; // only recreate once per second | ||
this.isChild = options.isChild; | ||
|
||
// If we're mocking the client, create a buffer to record the outgoing calls. | ||
if (this.mock) { | ||
|
@@ -94,16 +101,8 @@ const Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, | |
} | ||
} | ||
|
||
if (! this.socket) { | ||
this.socket = createTransport(this, { | ||
host: this.host, | ||
cacheDns: this.cacheDns, | ||
cacheDnsTtl: this.cacheDnsTtl, | ||
path: options.path, | ||
port: this.port, | ||
protocol: this.protocol, | ||
stream: options.stream | ||
}); | ||
if (!this.socket) { | ||
trySetNewSocket(this); | ||
} | ||
|
||
if (this.socket && !options.isChild && options.errorHandler) { | ||
|
@@ -119,43 +118,11 @@ const Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, | |
// will gracefully (attempt) to re-open the socket with a small delay | ||
// options.udsGracefulRestartRateLimit is the minimum time (ms) between creating sockets | ||
// does not support options.isChild (how to re-create a socket you didn't create?) | ||
if (this.socket && !options.isChild && options.protocol === PROTOCOL.UDS && this.udsGracefulErrorHandling) { | ||
const socketCreateLimit = options.udsGracefulRestartRateLimit || UDS_DEFAULT_GRACEFUL_RESTART_LIMIT; // only recreate once per second | ||
if (this.socket && options.protocol === PROTOCOL.UDS) { | ||
const lastSocketCreateTime = Date.now(); | ||
this.socket.on('error', (err) => { | ||
const code = err.code; | ||
switch (code) { | ||
case 107: | ||
case 111: { | ||
if (Date.now() - lastSocketCreateTime >= socketCreateLimit) { | ||
// recreate the socket, but only once per 30 seconds | ||
if (this.errorHandler) { | ||
this.socket.removeListener('error', this.errorHandler); | ||
} | ||
this.socket.close(); | ||
this.socket = createTransport(this, { | ||
host: this.host, | ||
path: options.path, | ||
port: this.port, | ||
protocol: this.protocol | ||
}); | ||
|
||
if (this.errorHandler) { | ||
this.socket.on('error', this.errorHandler); | ||
} else { | ||
this.socket.on('error', error => console.error(`hot-shots UDS error: ${error}`)); | ||
} | ||
} | ||
break; | ||
} | ||
default: { | ||
break; | ||
} | ||
} | ||
}); | ||
maybeAddUDSErrorHandler(this, lastSocketCreateTime); | ||
} | ||
|
||
|
||
this.messagesInFlight = 0; | ||
this.CHECKS = { | ||
OK: 0, | ||
|
@@ -354,7 +321,16 @@ Client.prototype.sendMessage = function (message, callback) { | |
return; | ||
} | ||
|
||
if (!this.socket) { | ||
const socketWasMissing = !this.socket; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may be considered a breaking change that I've added an attempt to recover here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, thanks. Not sure yet if this will be a minor or major release, but I think this part by itself would fit in ok with what has been minor releases. |
||
if (socketWasMissing && this.protocol === PROTOCOL.UDS) { | ||
trySetNewSocket(this); | ||
if (this.socket) { | ||
// On success, add custom UDS error handling. | ||
maybeAddUDSErrorHandler(this, Date.now()); | ||
} | ||
} | ||
|
||
if (socketWasMissing) { | ||
const error = new Error('Socket not created properly. Check previous errors for details.'); | ||
if (callback) { | ||
return callback(error); | ||
|
@@ -526,3 +502,85 @@ Client.prototype.childClient = function (options) { | |
|
||
exports = module.exports = Client; | ||
exports.StatsD = Client; | ||
|
||
/** | ||
* Handle an error connecting to a Unix Domain Socket (UDS). This will | ||
* attempt to create a new socket and replace and close the client's current | ||
* socket, registering a **new** `udsErrorHandler()` on the newly created socket. | ||
* If a new socket can't be created (e.g. if no UDS currently exists at | ||
* `client.path`) then this will leave the existing socket intact. | ||
* | ||
* Note that this will no-op with an early exit if the last socket create time | ||
* was too recent (within the UDS graceful restart rate limit). | ||
* @param client Client The statsd Client that may be getting a UDS error handler. | ||
* @param lastSocketCreateTime number The timestamp (in milliseconds since the | ||
* epoch) when the current socket was created. | ||
*/ | ||
function udsErrorHandler(client, lastSocketCreateTime) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's fine as is. Great to see the nice docs there. |
||
// recreate the socket, but only once within `udsGracefulRestartRateLimit`. | ||
if (Date.now() - lastSocketCreateTime < client.udsGracefulRestartRateLimit) { | ||
return; | ||
} | ||
|
||
if (client.errorHandler) { | ||
client.socket.removeListener('error', client.errorHandler); | ||
} | ||
|
||
const newSocket = createTransport(client, { | ||
host: client.host, | ||
path: client.path, | ||
port: client.port, | ||
protocol: client.protocol, | ||
}); | ||
if (newSocket) { | ||
client.socket.close(); | ||
client.socket = newSocket; | ||
maybeAddUDSErrorHandler(client, Date.now()); | ||
} else { | ||
console.error('Could not replace UDS connection with new socket'); | ||
return; | ||
} | ||
|
||
if (client.errorHandler) { | ||
client.socket.on('error', client.errorHandler); | ||
} else { | ||
client.socket.on('error', (error) => console.error(`hot-shots UDS error: ${error}`)); | ||
} | ||
} | ||
|
||
/** | ||
* Add a Unix Domain Socket (UDS) error handler to the client's socket, if the | ||
* client is not a "child" client and has graceful error handling enabled for | ||
* UDS. | ||
* @param client Client The statsd Client that may be getting a UDS error handler. | ||
* @param lastSocketCreateTime number The timestamp (in milliseconds since the | ||
* epoch) when the current socket was created. | ||
*/ | ||
function maybeAddUDSErrorHandler(client, lastSocketCreateTime) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm very open to a naming convention other than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't get too hung up on names, unless I think it doesn't fit the style. Not too much of a naming style in here so it's fine. :) Spent a bunch of time making sure I understood the new paths in here, for this function and others. I didn't see any issues, looks good to me. |
||
if (client.isChild || !client.udsGracefulErrorHandling) { | ||
return; | ||
} | ||
|
||
client.socket.on('error', (err) => { | ||
if (UDS_ERROR_CODES.includes(-err.code)) { | ||
udsErrorHandler(client, lastSocketCreateTime); | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
* Try to replace a client's socket with a new transport. If `createTransport()` | ||
* returns `null` this will still set the client's socket to `null`. | ||
* @param client Client The statsd Client that will be getting a new socket | ||
*/ | ||
function trySetNewSocket(client) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm very open to a naming convention other |
||
client.socket = createTransport(client, { | ||
host: client.host, | ||
cacheDns: client.cacheDns, | ||
cacheDnsTtl: client.cacheDnsTtl, | ||
path: client.path, | ||
port: client.port, | ||
protocol: client.protocol, | ||
stream: client.stream, | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh, I didn't realize there was a constant here available on Node 8 and above. Good to know.