It is a fast, reliable, and scalable message broker implementation based on Redis streams.
Suitable for IoT applications with massive network traffic, pub/sub-use cases, or any implementation with multiple producers/consumers.
Allows simple integration with SSE(Server-Sent Event)/WebSocket implementation. See our SSE Fastify based example.
See a live SSE frontend implementation used in our HEARIT.IO
project here
Implements a possibility to scale the message processing across different consumers, without single consumers having to process all the messages. A group of consumers, working as a team, can cooperate and consume a separate portion of those messages from the same channel.
It can be used with a single Redis instance and later updated easily to a cluster configuration without the need for any application change. Under the hood ioredis is used as a client.
Simple integration in web frameworks, already available plug-in fastify-redis-channels for our favorite framework Fastify. See all step-by-step examples there.
The implementation uses native Promises.
Do you want your project to grow? Then start right from the begging.
- Install
- Usage example
- API Documentation
- Error handling
- Running tests
- Tunning Redis and OS
- TODO
- Benchmarks
- Project status
- Authors and acknowledgment
- License
$ npm install @hearit-io/redis-channels --save
Requires running Redis server on a host localhost
and a port 6379.
'use strict'
const {RedisChannels} = require('@hearit-io/redis-channels')
// A consumer implementation
async function consume(tunnel, channels) {
for await (const messages of channels.consume(tunnel)) {
for (const i in messages) {
// Do something with messages
console.log(messages[i].data)
}
}
}
async function main () {
try {
const channels = new RedisChannels()
const tunnelProducer = await channels.use('room')
const tunnelConsumerOne = await channels.use('room')
const tunnelConsumerTwo = await channels.use('room')
// Subscribe consumer tunnels
await channels.subscribe(tunnelConsumerOne)
await channels.subscribe(tunnelConsumerTwo)
// Start consuming
consume(tunnelConsumerOne).catch((error) => {
console.error(error)
})
consume(tunnelConsumerTwo).catch((error) => {
console.error(error)
})
// Produce messages
await channels.produce(tunnelProducer, 'Hello')
await channels.produce(tunnelProducer, 'Wold')
// Unsubscribe all consumers
await channels.unsubscribe(tunnelConsumerOne)
await channels.unsubscribe(tunnelConsumerTwo)
// Delete a channel related to the 'room'.
await channels.delete('room')
}
catch (error) {
console.error(error)
}
}
main()
The result on the console is:
Hello
World
Hello
World
In order to create a group of consumers (a team) which cooperates consuming a different portion of the messages form the same channel, just add a team
parameter in the subscribe
method like this:
// Subscribe consumer tunnels
await channels.subscribe(tunnelConsumerOne, 'team')
await channels.subscribe(tunnelConsumerTwo, 'team')
The result on the console is:
Hello
World
Creates an instance of a RedisChannels class. It uses ioredis to manage connections to a Redis.
Property | Type | Default | Description |
---|---|---|---|
channels | Object | Options related to channels | |
channels.application | string | 'app' | Application name, used as a prefix part in the Redis key |
channels.version | number | 1 | Version number, used as a prefix part in the Redis key |
channels.schema | string | 'channel' | Schema name, used as a prefix part in the Redis key |
channels.overflow | number | 100 | Trims the channel size (approximately), but never less than the specified number of elements. You must choose a value suitable for your use case and available memory |
channels.slots | number | 32 | The number of pre-sharded slots. Possible values are 32 or 64 |
channels.sharded | boolean | false | Whether the channels will be spread even over the shards. It makes sense if your application connects to a Redis cluster |
channels.log | Object | A logger instance, by default logging is disabled, we use abstract-logging for this purpose | |
redis | Object | Options for a Redis clients | |
redis.nodes | Array | An array of nodes in the cluster, [{ port: number, host: string }]. If nodes are defined a connection to a cluster will be tried. | |
redis.url | string | A URL string to a single Redis server, for example 'redis://user:[email protected]:6379/' |
|
redis.host | string | 'localhost' | A host name of a single Redis server |
redis.port | number | 6379 | A port number to connect to a single Redis server |
redis.... | Rest properites | Any other options will be passed to the Redis or the Cluster client |
Channels will use a Redis cluster connected to three nodes with enabled offline queue in a sharded mode.
'use strict'
const {RedisChannels} = require('@hearit-io/redis-channels')
const options = {
channels: {
sharded: true
},
redis: {
nodes: [
{ host: '127.0.0.1', port: 6380 },
{ host: '127.0.0.1', port: 6381 },
{ host: '127.0.0.1', port: 6382 }
],
options: {
enableOfflineQueue: true
}
}
}
const channels = new RedisChannels(options)
Channels will use a single Redis connection defined with a URL and keep alive set to 10 seconds.
'use strict'
const {RedisChannels} = require('@hearit-io/redis-channels')
const options = {
redis: {
url: 'redis://10.101.201.100:3333',
keepAlive: 10000
}
}
const channels = new RedisChannels(options)
The same as above but defined with a host and port.
'use strict'
const {RedisChannels} = require('@hearit-io/redis-channels')
const options = {
redis: {
host: '10.101.201.100',
port: 3333,
keepAlive: 10000
}
}
const channels = new RedisChannels(options)
Creates a tunnel
Object required to do any further operation with the channel associated with the group
.
Parameter | Type | Default | Description |
---|---|---|---|
group | string | A name of a channel group (i.e. chat room etc.) |
Returns a Promise to a tunnel
Object.
Produces a message to a channel.
Parameter | Type | Default | Description |
---|---|---|---|
tunnel | Object | A tunnel required to peform any operation with the channel | |
message | string | A message to produce | |
type | string | 'all' | Identifies a message source (origination) |
Returns a Promise.
Subscribes a tunnel
to make possible a consume operation.
Parameter | Type | Default | Description |
---|---|---|---|
tunnel | Object | A tunnel required to perform any operation with the channel | |
team | string | the same as a consumer | A name of a consumer team. If specified, every consumer within a team will receive different part of the messages arrived in the channels |
consumer | string | a generated v4 uuid | A unique consumer name within a team |
Returns a Promise.
channels.consume(tunnel[, type = 'all', count = 100, timeout = 10000, fromId = time_in_milisecounds-sequence, messageOnTimeOut = false])
It is an asynchronous iterator which returns an array of messages.
Every message is a couple of { id: <string>, data: <string> }
Parameter | Type | Default | Description |
---|---|---|---|
tunnel | Object | A tunnel required to peform any operation with the channel | |
type | string | 'all' | Identifies a message source (origination) to consume |
count | number | 100 | Defines a maximum number of messages consumed per iteration |
timeout | number | 10000 | A blocking timeout in milliseconds |
fromId | string | '>' or '*' | Starts consuming messages newer then a given id. Default value is set to > or * whether if it is consumed in a team or not. This means staring form messages that were never delivered to any other consumer. The format is time_in_milisecounds-sequence or only the miliseconds part of the id. |
messageOnTimeOut | boolean | false | If set, in a case of a timeout a message array [{id: data: null}] will be returned to indecate it. If there were no consumed messages the id value will be undefined. This feature is usefull if you want to build in your consumer an additional functionality - for example sending to an SSE(Service Send Event)/websocket ping messages on every consumer timeout. |
Returns a Promise to an Array of Objects.
Unsubscribes form a tunnel
. This causes the consume
method to finish with the iteration and cleans up the associated consumer.
Parameter | Type | Default | Description |
---|---|---|---|
tunnel | Object | A tunnel required to peform any operation with the channel. Use only subscribed tunnels, otherways an exception will throw. |
Returns a Promise.
Deletes all data in the Redis database associated with the group
.
Parameter | Type | Default | Description |
---|---|---|---|
group | string | A name of a channel group (i.e. chat room etc.) |
Returns a Promise .
Closes all redis clients and deletes all consumers/consumer groups. Useful in agraceful application shutdown.
Returns a Promise .
In an error all methods throw an instance of RedisChannelsError
class.
It has following properties:
Property | Type | Default | Description |
---|---|---|---|
message | string | An error description | |
error | Object | An error object caused the exception |
A usage example of an error handling:
'use strict'
const {RedisChannels, RedisChannelsError} = require('@hearit-io/redis-channels')
try {
// Your code
}
catch (exception) {
if (exception instanceof RedisChannelsError) {
// Handle the error
console.error(exception.message)
console.error (exception.error)
}
}
We want to deliver the highest possible quality to our valuable open source community by implementing 100 % test coverage for all packages.
To run the test cases a running Redis server is required.
In the environment variable REDIS_NODES you can set a list of Redis nodes separated by space as example:
export REDIS_NODES="127.0.0.1:6380 127.0.0.1:6381 127.0.0.1:6382"
If there are more than one entry in the list a connection to a Redis Cluster will be used for the tests. If REDIS_NODES is not set a connection to 127.0.0.1:6379 will be tried.
Similar to a variable REDIS_NODES
, a variable REDIS_DOWN_NODES
is used in the test cases covering missing connection to a Redis server. A default value is '127.0.0.1:3333'.
After each test, the data base will be FLUSHED. Please make sure there is no valuable data in the data base before running the tests!
Start the unit tests with:
npm test
The purpose of those tests is to verify the behavior of your system in a situation of a massive traffic - go to the limits.
This can be used to estimate the maximum number of consumer/producers, the throughput, and the need of operation system tunning.
You can use following environment variables to configure your tests:
NUMBER_OF_GROUPS
- defines the number of channel groups (i. e. chat rooms, ...). Default value is 10.NUMBER_OF_MESSAGES
- number of messages to produce per group. Default value is 100NUMBER_OF_CONSUMERS
- number of consumers per group. Default value is 10SHARDED
- If channels sharded mode will be used. Default value is 'false', to enable it set to 'true'GROUP_PREFIX
- The prefix for the group name. Default value is 'GROUP'.
Following command starts four node
processes:
cd ./test/heavy-load
./heavy-run.sh
The test results for each process can be found in the ./test/heavy-load/log
directory.
To clean-up the data base call:
./heavy-run-cleanup.sh
Make sure there is no valuable data in the data base before running the tests!
You may need to adapt maxclients
configuration parameter in the Redis server confg file (usually /etc/redis.conf
). For more details see the Redis documentation about this topic.
On a Unix based system increase the maximum allowed number of temporary ports.
Check the current system value of local port range with:
cat /proc/sys/net/ipv4/ip_local_port_range
Increase the value by defining a bigger range, for example with the command:
echo "1024 65535" > /proc/sys/net/ipv4/ip_local_port_range
To make the change permanent add following line in /etc/sysctl.conf
file:
net.ipv4.ip_local_port_range = 1024 65535
Check the used TCP and UDP connection with the command:
netstat -an | grep -e tcp -e udp | wc -l
or
ss -s
We reached follwing values in our benchmark tests on a Linux Debain server with 64 GB memory, single i7-7700 CPU 3.60GHz:
- Message size -
4 000
bytes. - Number of Redis servers -
1
. - Number of node processes -
4
. - Number of channels groups (i.e. chat rooms, ...) -
3 000
. - Number of consumers -
30 000
. - Number of produced messages -
300 000
. - Number of consumed messages -
3 000 000
. - Avarage processed messages per second -
24 200
.
The list of already implemented / planed features:
- Limit the maximum number of channel elements in the
produce
method (capped streams). - Implement a scenario where all consumers are served with the same messages arrived in the channels.
- Add Benchmarks.
- Implement a scenario where consumers are served with the different part of the messages arrived in the channels (a work in a team).
- Introduce a parameter in the
consume
method which allows starting message consuming form a given period in the past. - Implement consume with acknowledge functionality.
- Implement a channel monitoring capability.
- TypeScript support.
Smart home automatization designed for visually impaired people. |
---|
@heart-io/redis-channels is used productive in our progressive web app. You can try it with a user [email protected]
and password: 11223344
The package will be updated and maintained regularly.
The main goal of hearit.io is to make accessible the world of IoT to everyone. We created a speaking home suitable for all.
We will be grateful to you if you make awareness to other people of our project.
To finance our idea, we would be glad to support and work on your projects. Contact Emil [email protected] for details and our availability.
Other open-source packages, part of our project, will be available soon. We use Fastify as an application framework.
Emil Usunov
MIT