Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cluster): add NAT support #758

Merged
merged 3 commits into from
Dec 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ used in the world's biggest online commerce company [Alibaba](http://www.alibaba
0. Support for ES6 types, such as `Map` and `Set`.
0. Support for GEO commands (Redis 3.2 Unstable).
0. Sophisticated error handling strategy.
0. Support for NAT mapping.

# Links
* [API Documentation](API.md)
Expand Down Expand Up @@ -836,6 +837,26 @@ Promise.all(masters.map(function (node) {
});
```

### NAT Mapping
Sometimes the cluster is hosted within a internal network that can only be accessed via a NAT (Network Address Translation) instance. See [Accessing ElastiCache from outside AWS](https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/accessing-elasticache.html) as an example.

You can specify nat mapping rules via `natMap` option:

```javascript
const cluster = new Redis.Cluster([{
host: '203.0.113.73',
port: 30001
}], {
natMap: {
'10.0.1.230:30001': {host: '203.0.113.73', port: 30001},
'10.0.1.231:30001': {host: '203.0.113.73', port: 30002},
'10.0.1.232:30001': {host: '203.0.113.73', port: 30003}
}
})
```

This option is also useful when the cluster is running inside a Docker container.

### Transaction and pipeline in Cluster mode
Almost all features that are supported by `Redis` are also supported by `Redis.Cluster`, e.g. custom commands, transaction and pipeline.
However there are some differences when using transaction and pipeline in Cluster mode:
Expand Down
2 changes: 2 additions & 0 deletions lib/cluster/ClusterOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {NodeRole} from './util'
import {lookup} from 'dns'

export type DNSLookupFunction = (hostname: string, callback: (err: NodeJS.ErrnoException, address: string, family: number) => void) => void
export type NatMap = {[key: string]: {host: string, port: number}}

/**
* Options for Cluster constructor
Expand Down Expand Up @@ -116,6 +117,7 @@ export interface IClusterOptions {
* @default require('dns').lookup
*/
dnsLookup?: DNSLookupFunction
natMap?: NatMap
}

export const DEFAULT_CLUSTER_OPTIONS: IClusterOptions = {
Expand Down
25 changes: 18 additions & 7 deletions lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {EventEmitter} from 'events'
import ClusterAllFailedError from '../errors/ClusterAllFailedError'
import {defaults, noop} from '../utils/lodash'
import ConnectionPool from './ConnectionPool'
import {NodeKey, IRedisOptions, normalizeNodeOptions, NodeRole, getUniqueHostnamesFromOptions} from './util'
import {NodeKey, IRedisOptions, normalizeNodeOptions, NodeRole, getUniqueHostnamesFromOptions, nodeKeyToRedisOptions} from './util'
import ClusterSubscriber from './ClusterSubscriber'
import DelayQueue from './DelayQueue'
import ScanStream from '../ScanStream'
Expand Down Expand Up @@ -416,6 +416,18 @@ class Cluster extends EventEmitter {
}
}

natMapper(nodeKey: NodeKey | IRedisOptions): IRedisOptions {
if (this.options.natMap && typeof this.options.natMap === 'object') {
const key = typeof nodeKey === 'string' ? nodeKey : `${nodeKey.host}:${nodeKey.port}`
const mapped = this.options.natMap[key]
if (mapped) {
debug('NAT mapping %s -> %O', key, mapped)
return mapped
}
}
return typeof nodeKey === 'string' ? nodeKeyToRedisOptions(nodeKey) : nodeKey
}

sendCommand(command, stream, node) {
if (this.status === 'wait') {
this.connect().catch(noop)
Expand Down Expand Up @@ -449,16 +461,15 @@ class Cluster extends EventEmitter {
} else {
_this.slots[slot] = [key]
}
const splitKey = key.split(':')
_this.connectionPool.findOrCreate({host: splitKey[0], port: Number(splitKey[1])})
_this.connectionPool.findOrCreate(_this.natMapper(key))
tryConnection()
_this.refreshSlotsCache()
},
ask: function (slot, key) {
debug('command %s is required to ask %s:%s', command.name, key)
const splitKey = key.split(':')
_this.connectionPool.findOrCreate({host: splitKey[0], port: Number(splitKey[1])})
tryConnection(false, key)
const mapped = _this.natMapper(key)
_this.connectionPool.findOrCreate(mapped)
tryConnection(false, `${mapped.host}:${mapped.port}`)
},
tryagain: partialTry,
clusterDown: partialTry,
Expand Down Expand Up @@ -610,7 +621,7 @@ class Cluster extends EventEmitter {

const keys = []
for (let j = 2; j < items.length; j++) {
items[j] = {host: items[j][0], port: items[j][1]}
items[j] = this.natMapper({host: items[j][0], port: items[j][1]})
items[j].readOnly = j !== 2
nodes.push(items[j])
keys.push(items[j].host + ':' + items[j].port)
Expand Down
11 changes: 11 additions & 0 deletions lib/cluster/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ export function getNodeKey(node: IRedisOptions): NodeKey {
return node.host + ':' + node.port
}

export function nodeKeyToRedisOptions(nodeKey: NodeKey): IRedisOptions {
const portIndex = nodeKey.lastIndexOf(':')
if (portIndex === -1) {
throw new Error(`Invalid node key ${nodeKey}`)
}
return {
host: nodeKey.slice(0, portIndex),
port: Number(nodeKey.slice(portIndex + 1))
}
}

export function normalizeNodeOptions(nodes: Array<string | number | object>): IRedisOptions[] {
return nodes.map((node) => {
const options: any = {}
Expand Down
127 changes: 127 additions & 0 deletions test/functional/cluster/nat.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
const calculateSlot = require('cluster-key-slot')

describe('NAT', () => {
it('works for normal case', (done) => {
const slotTable = [
[0, 1, ['192.168.1.1', 30001]],
[2, 16383, ['192.168.1.2', 30001]]
]

let cluster
new MockServer(30001, null, slotTable)
new MockServer(30002, ([command, arg]) => {
if (command === 'get' && arg === 'foo') {
cluster.disconnect()
done()
}
}, slotTable)

cluster = new Redis.Cluster([{
host: '127.0.0.1',
port: 30001
}], {
natMap: {
'192.168.1.1:30001': {host: '127.0.0.1', port: 30001},
'192.168.1.2:30001': {host: '127.0.0.1', port: 30002}
}
})

cluster.get('foo')
})

it('works if natMap does not match all the cases', (done) => {
const slotTable = [
[0, 1, ['192.168.1.1', 30001]],
[2, 16383, ['127.0.0.1', 30002]]
]

let cluster
new MockServer(30001, null, slotTable)
new MockServer(30002, ([command, arg]) => {
if (command === 'get' && arg === 'foo') {
cluster.disconnect()
done()
}
}, slotTable)

cluster = new Redis.Cluster([{
host: '127.0.0.1',
port: 30001
}], {
natMap: {
'192.168.1.1:30001': {host: '127.0.0.1', port: 30001}
}
})

cluster.get('foo')
})

it('works for moved', (done) => {
const slotTable = [
[0, 16383, ['192.168.1.1', 30001]]
]

let cluster
new MockServer(30001, ([command, arg]) => {
if (command === 'get' && arg === 'foo') {
return new Error('MOVED ' + calculateSlot('foo') + ' 192.168.1.2:30001');
}
}, slotTable)
new MockServer(30002, ([command, arg]) => {
if (command === 'get' && arg === 'foo') {
cluster.disconnect()
done()
}
}, slotTable)

cluster = new Redis.Cluster([{
host: '127.0.0.1',
port: 30001
}], {
natMap: {
'192.168.1.1:30001': {host: '127.0.0.1', port: 30001},
'192.168.1.2:30001': {host: '127.0.0.1', port: 30002}
}
})

cluster.get('foo')
})

it('works for ask', (done) => {
const slotTable = [
[0, 16383, ['192.168.1.1', 30001]]
]

let cluster
let asked = false
new MockServer(30001, ([command, arg]) => {
if (command === 'get' && arg === 'foo') {
return new Error('ASK ' + calculateSlot('foo') + ' 192.168.1.2:30001');
}
}, slotTable)
new MockServer(30002, ([command, arg]) => {
if (command === 'asking') {
asked = true
}
if (command === 'get' && arg === 'foo') {
if (!asked) {
throw new Error('expected asked to be true')
}
cluster.disconnect()
done()
}
}, slotTable)

cluster = new Redis.Cluster([{
host: '127.0.0.1',
port: 30001
}], {
natMap: {
'192.168.1.1:30001': {host: '127.0.0.1', port: 30001},
'192.168.1.2:30001': {host: '127.0.0.1', port: 30002}
}
})

cluster.get('foo')
})
})
10 changes: 10 additions & 0 deletions test/unit/cluster.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

var Cluster = require('../../lib/cluster').default;
const {nodeKeyToRedisOptions} = require('../../lib/cluster/util')

describe('cluster', function () {
beforeEach(function () {
Expand Down Expand Up @@ -36,3 +37,12 @@ describe('cluster', function () {
});
});
});

describe('nodeKeyToRedisOptions()', () => {
it('returns correct result', () => {
expect(nodeKeyToRedisOptions('127.0.0.1:6379')).to.eql({port: 6379, host: '127.0.0.1'})
expect(nodeKeyToRedisOptions('192.168.1.1:30001')).to.eql({port: 30001, host: '192.168.1.1'})
expect(nodeKeyToRedisOptions('::0:6379')).to.eql({port: 6379, host: '::0'})
expect(nodeKeyToRedisOptions('0:0:6379')).to.eql({port: 6379, host: '0:0'})
})
})