Skip to content

Commit

Permalink
feat: Add autopipeline for commands and allow multi slot pipelines.
Browse files Browse the repository at this point in the history
Fixes #536.

Co-Authored-By: Matteo Collina <[email protected]>
  • Loading branch information
2 people authored and AVVS committed Oct 23, 2020
1 parent c2f634f commit aba3c74
Show file tree
Hide file tree
Showing 24 changed files with 1,514 additions and 173 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ node_modules
built

.vscode
benchmarks/fixtures/*.txt
4 changes: 3 additions & 1 deletion API.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ Creates a Redis instance
| [options.reconnectOnError] | <code>function</code> | | See "Quick Start" section |
| [options.readOnly] | <code>boolean</code> | <code>false</code> | Enable READONLY mode for the connection. Only available for cluster mode. |
| [options.stringNumbers] | <code>boolean</code> | <code>false</code> | Force numbers to be always returned as JavaScript strings. This option is necessary when dealing with big numbers (exceed the [-2^53, +2^53] range). |

| [options.enableAutoPipelining] | <code>boolean</code> | <code>false</code> | When enabled, all commands issued during an event loop iteration are automatically wrapped in a pipeline and sent to the server at the same time. This can improve performance by 30-50%. |
| [options.autoPipeliningIgnoredCommands] | <code>string[]</code> | <code>[]</code> | The list of commands which must not be automatically wrapped in pipelines. |
| [options.maxScriptsCachingTime] | <code>number</code> | <code>60000</code> | Default script definition caching time. |
**Example**
```js
var Redis = require('ioredis');
Expand Down
98 changes: 96 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ used in the world's biggest online commerce company [Alibaba](http://www.alibaba
10. Support for GEO commands (Redis 3.2 Unstable).
11. Sophisticated error handling strategy.
12. Support for NAT mapping.
13. Support for autopipelining

# Links

Expand Down Expand Up @@ -961,9 +962,8 @@ This option is also useful when the cluster is running inside a Docker container
Almost all features that are supported by `Redis` are also supported by `Redis.Cluster`, e.g. custom commands, transaction and pipeline.
However there are some differences when using transaction and pipeline in Cluster mode:
0. All keys in a pipeline should belong to the same slot since ioredis sends all commands in a pipeline to the same node.
0. All keys in a pipeline should belong to slots served by the same node, since ioredis sends all commands in a pipeline to the same node.
1. You can't use `multi` without pipeline (aka `cluster.multi({ pipeline: false })`). This is because when you call `cluster.multi({ pipeline: false })`, ioredis doesn't know which node the `multi` command should be sent to.
2. Chaining custom commands in the pipeline is not supported in Cluster mode.
When any commands in a pipeline receives a `MOVED` or `ASK` error, ioredis will resend the whole pipeline to the specified node automatically if all of the following conditions are satisfied:
Expand Down Expand Up @@ -1061,6 +1061,100 @@ const cluster = new Redis.Cluster(
<hr>
## Autopipelining
In standard mode, when you issue multiple commands, ioredis sends them to the server one by one. As described in Redis pipeline documentation, this is a suboptimal use of the network link, especially when such link is not very performant.
The TCP and network overhead negatively affects performance. Commands are stuck in the send queue until the previous ones are correctly delivered to the server. This is a problem known as Head-Of-Line blocking (HOL).
ioredis supports a feature called “auto pipelining”. It can be enabled by setting the option `enableAutoPipelining` to `true`. No other code change is necessary.
In auto pipelining mode, all commands issued during an event loop are enqueued in a pipeline automatically managed by ioredis. At the end of the iteration, the pipeline is executed and thus all commands are sent to the server at the same time.
This feature can dramatically improve throughput and avoids HOL blocking. In our benchmarks, the improvement was between 35% and 50%.
While an automatic pipeline is executing, all new commands will be enqueued in a new pipeline which will be executed as soon as the previous finishes.
When using Redis Cluster, one pipeline per node is created. Commands are assigned to pipelines according to which node serves the slot.
A pipeline will thus contain commands using different slots but that ultimately are assigned to the same node.
Note that the same slot limitation within a single command still holds, as it is a Redis limitation.
### Example of automatic pipeline enqueuing
This sample code uses ioredis with automatic pipeline enabled.
```javascript
const Redis = require('./built');
const http = require('http');

const db = new Redis({ enableAutoPipelining: true });

const server = http.createServer((request, response) => {
const key = new URL(request.url, 'https://localhost:3000/').searchParams.get('key');

db.get(key, (err, value) => {
response.writeHead(200, { 'Content-Type': 'text/plain' });
response.end(value);
});
})

server.listen(3000);
```
When Node receives requests, it schedules them to be processed in one or more iterations of the events loop.
All commands issued by requests processing during one iteration of the loop will be wrapped in a pipeline automatically created by ioredis.
In the example above, the pipeline will have the following contents:
```
GET key1
GET key2
GET key3
...
GET keyN
```
When all events in the current loop have been processed, the pipeline is executed and thus all commands are sent to the server at the same time.
While waiting for pipeline response from Redis, Node will still be able to process requests. All commands issued by request handler will be enqueued in a new automatically created pipeline. This pipeline will not be sent to the server yet.
As soon as a previous automatic pipeline has received all responses from the server, the new pipeline is immediately sent without waiting for the events loop iteration to finish.
This approach increases the utilization of the network link, reduces the TCP overhead and idle times and therefore improves throughput.
### Benchmarks
Here's some of the results of our tests for a single node.
Each iteration of the test runs 1000 random commands on the server.
╔═══════════════════════════╤═════════╤═══════════════╤═══════════╤═════════════════════════╗
║ Slower tests │ Samples │ Result │ Tolerance │ Difference with slowest ║
╟───────────────────────────┼─────────┼───────────────┼───────────┼─────────────────────────╢
║ default │ 1000 │ 174.62 op/sec │ ± 0.45 % │ ║
╟───────────────────────────┼─────────┼───────────────┼───────────┼─────────────────────────╢
║ Fastest test │ Samples │ Result │ Tolerance │ Difference with slowest ║
╟───────────────────────────┼─────────┼───────────────┼───────────┼─────────────────────────╢
║ enableAutoPipelining=true │ 1500 │ 233.33 op/sec │ ± 0.88 % │ + 33.62 % ║
╚═══════════════════════════╧═════════╧═══════════════╧═══════════╧═════════════════════════╝
And here's the same test for a cluster of 3 masters and 3 replicas:
╔═══════════════════════════╤═════════╤═══════════════╤═══════════╤═════════════════════════╗
║ Slower tests │ Samples │ Result │ Tolerance │ Difference with slowest ║
╟───────────────────────────┼─────────┼───────────────┼───────────┼─────────────────────────╢
║ default │ 1000 │ 164.05 op/sec │ ± 0.42 % │ ║
╟───────────────────────────┼─────────┼───────────────┼───────────┼─────────────────────────╢
║ Fastest test │ Samples │ Result │ Tolerance │ Difference with slowest ║
╟───────────────────────────┼─────────┼───────────────┼───────────┼─────────────────────────╢
║ enableAutoPipelining=true │ 3000 │ 235.31 op/sec │ ± 0.94 % │ + 43.44 % ║
╚═══════════════════════════╧═════════╧═══════════════╧═══════════╧═════════════════════════╝
# Error Handling
All the errors returned by the Redis server are instances of `ReplyError`, which can be accessed via `Redis`:
Expand Down
60 changes: 60 additions & 0 deletions benchmarks/autopipelining-cluster.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { cronometro } from 'cronometro'
import { readFileSync } from 'fs'
import { join } from 'path'
import Cluster from '../lib/cluster'

const numNodes = parseInt(process.env.NODES || '3', 10)
const iterations = parseInt(process.env.ITERATIONS || '10000', 10)
const batchSize = parseInt(process.env.BATCH_SIZE || '1000', 10)
const keys = readFileSync(join(__dirname, `fixtures/cluster-${numNodes}.txt`), 'utf-8').split('\n')
const configuration = Array.from(Array(numNodes), (_, i) => ({ host: '127.0.0.1', port: 30000 + i + 1 }))
let cluster

function command(): string {
const choice = Math.random()

if (choice < 0.3) {
return 'ttl'
} else if (choice < 0.6) {
return 'exists'
}

return 'get'
}

function test() {
const index = Math.floor(Math.random() * keys.length)

return Promise.all(Array.from(Array(batchSize)).map(() => cluster[command()](keys[index])))
}

function after(cb) {
cluster.quit()
cb()
}

cronometro(
{
default: {
test,
before(cb) {
cluster = new Cluster(configuration)

cb()
},
after
},
'enableAutoPipelining=true': {
test,
before(cb) {
cluster = new Cluster(configuration, { enableAutoPipelining: true })
cb()
},
after
}
},
{
iterations,
print: { compare: true }
}
)
58 changes: 58 additions & 0 deletions benchmarks/autopipelining-single.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { cronometro } from 'cronometro'
import { readFileSync } from 'fs'
import { join } from 'path'
import Redis from '../lib/redis'

const iterations = parseInt(process.env.ITERATIONS || '10000', 10)
const batchSize = parseInt(process.env.BATCH_SIZE || '1000', 10)
const keys = readFileSync(join(__dirname, 'fixtures/cluster-3.txt'), 'utf-8').split('\n')
let redis

function command(): string {
const choice = Math.random()

if (choice < 0.3) {
return 'ttl'
} else if (choice < 0.6) {
return 'exists'
}

return 'get'
}

function test() {
const index = Math.floor(Math.random() * keys.length)

return Promise.all(Array.from(Array(batchSize)).map(() => redis[command()](keys[index])))
}

function after(cb) {
redis.quit()
cb()
}

cronometro(
{
default: {
test,
before(cb) {
redis = new Redis()

cb()
},
after
},
'enableAutoPipelining=true': {
test,
before(cb) {
redis = new Redis({ enableAutoPipelining: true })
cb()
},
after
}
},
{
iterations,
print: { compare: true }
}
)
38 changes: 38 additions & 0 deletions benchmarks/dropBuffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { cronometro } from 'cronometro'
import Redis from '../lib/redis'

let redis

cronometro(
{
default: {
test() {
return redis.set('foo', 'bar')
},
before(cb) {
redis = new Redis()
cb()
},
after(cb) {
redis.quit()
cb()
}
},
'dropBufferSupport=true': {
test() {
return redis.set('foo', 'bar')
},
before(cb) {
redis = new Redis({ dropBufferSupport: true })
cb()
},
after(cb) {
redis.quit()
cb()
}
}
},
{
print: { compare: true }
}
)
67 changes: 67 additions & 0 deletions benchmarks/fixtures/generate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
'use strict'

const start = process.hrtime.bigint()

import * as calculateSlot from 'cluster-key-slot'
import { writeFileSync } from 'fs'
import { join } from 'path'
import { v4 as uuid } from 'uuid'

// Input parameters
const numKeys = parseInt(process.env.KEYS || '1000000', 10)
const numNodes = parseInt(process.env.NODES || '3', 10)

// Prepare topology
const maxSlot = 16384
const destination = join(__dirname, `cluster-${numNodes}.txt`)
const counts = Array.from(Array(numNodes), () => 0)
const keys = []

/*
This algorithm is taken and adapted from Redis source code
See: https://github.com/redis/redis/blob/d9f970d8d3f0b694f1e8915cab6d4eab9cfb2ef1/src/redis-cli.c#L5453
*/
const nodes = [] // This only holds starting slot, since the ending slot can be computed out of the next one
let first = 0
let cursor = 0
const slotsPerNode = maxSlot / numNodes

for (let i = 0; i < numNodes; i++) {
let last = Math.round(cursor + slotsPerNode - 1)

if (last > maxSlot || i === numNodes - 1) {
last = maxSlot - 1
}

if (last < first) {
last = first
}

nodes.push(first)
first = last + 1
cursor += slotsPerNode
}

// Generate keys and also track slot allocations
for (let i = 0; i < numKeys; i++) {
const key = uuid()
const slot = calculateSlot(key)
const node = nodes.findIndex((start, i) => i === numNodes - 1 || (slot >= start && slot < nodes[i + 1]))

counts[node]++
keys.push(key)
}

// Save keys
writeFileSync(destination, keys.join('\n'))

// Print summary
console.log(`Generated ${numKeys} keys in ${(Number(process.hrtime.bigint() - start) / 1e6).toFixed(2)} ms `)

for (let i = 0; i < numNodes; i++) {
const from = nodes[i]
const to = (i === numNodes - 1 ? maxSlot : nodes[i + 1]) - 1
console.log(
` - Generated ${counts[i]} keys for node(s) serving slots ${from}-${to} (${((counts[i] * 100) / numKeys).toFixed(2)} %)`
)
}
Loading

0 comments on commit aba3c74

Please sign in to comment.