Skip to content

Commit

Permalink
🕸️ Implement cluster commands (#110)
Browse files Browse the repository at this point in the history
  • Loading branch information
tumile authored Jul 8, 2020
1 parent b3fb1ac commit 7a2876d
Show file tree
Hide file tree
Showing 7 changed files with 387 additions and 16 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ const plz = msgFV.get("yes");
const thx = msgFV.get("no");
```

**Cluster**

```ts
await redis.meet("127.0.0.1", 6380);
await redis.nodes();
// ... 127.0.0.1:6379@16379 myself,master - 0 1593978765000 0 connected
// ... 127.0.0.1:6380@16380 master - 0 1593978766503 1 connected
```

## Advanced Usage

### Retriable connection
Expand Down
32 changes: 27 additions & 5 deletions command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,6 @@ export type RedisCommands = {
pubsub_channels(pattern: string): Promise<BulkString[]>;
pubsub_numsubs(...channels: string[]): Promise<[BulkString, Integer][]>;
pubsub_numpat(): Promise<Integer>;
// Cluster
readonly(): Promise<Status>;
readwrite(): Promise<Status>;
// Set
sadd(key: string, ...members: string[]): Promise<Integer>;
scard(key: string): Promise<Integer>;
Expand Down Expand Up @@ -505,7 +502,7 @@ XGROUP DESTROY test-man-000 test-group
<pre>
XGROUP SETID mystream consumer-group-name 0
</pre>
*
*
* @param key stream key
* @param groupName the consumer group
* @param xid the XId to use for the next message delivered
Expand Down Expand Up @@ -768,7 +765,32 @@ XRANGE somestream - +
},
): Promise<Integer>;
// Cluster
// cluster //
cluster_addslots(...slots: number[]): Promise<Status>;
cluster_countfailurereports(node_id: string): Promise<Integer>;
cluster_countkeysinslot(slot: number): Promise<Integer>;
cluster_delslots(...slots: number[]): Promise<Status>;
cluster_failover(opt?: "FORCE" | "TAKEOVER"): Promise<Status>;
cluster_flushslots(): Promise<Status>;
cluster_forget(node_id: string): Promise<Status>;
cluster_getkeysinslot(slot: number, count: number): Promise<BulkString[]>;
cluster_info(): Promise<BulkString>;
cluster_keyslot(key: string): Promise<Integer>;
cluster_meet(ip: string, port: number): Promise<Status>;
cluster_myid(): Promise<BulkString>;
cluster_nodes(): Promise<BulkString>;
cluster_replicas(node_id: string): Promise<BulkString[]>;
cluster_replicate(node_id: string): Promise<Status>;
cluster_reset(opt?: "HARD" | "SOFT"): Promise<Status>;
cluster_saveconfig(): Promise<Status>;
cluster_setslot(
slot: number,
subcommand: "IMPORTING" | "MIGRATING" | "NODE" | "STABLE",
node_id?: string,
): Promise<Status>;
cluster_slaves(node_id: string): Promise<BulkString[]>;
cluster_slots(): Promise<ConditionalArray>;
readonly(): Promise<Status>;
readwrite(): Promise<Status>;
// Server
acl_cat(parameter?: string): Promise<BulkString[]>;
acl_deluser(parameter: string): Promise<Integer>;
Expand Down
105 changes: 104 additions & 1 deletion redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,110 @@ class RedisImpl implements RedisCommands {
}
}

cluster_addslots(...slots: number[]): Promise<Status> {
return this.execStatusReply("CLUSTER", "ADDSLOTS", ...slots);
}

cluster_countfailurereports(node_id: string): Promise<Integer> {
return this.execIntegerReply("CLUSTER", "COUNT-FAILURE-REPORTS", node_id);
}

cluster_countkeysinslot(slot: number): Promise<Integer> {
return this.execIntegerReply("CLUSTER", "COUNTKEYSINSLOT", slot);
}

cluster_delslots(...slots: number[]): Promise<Status> {
return this.execStatusReply("CLUSTER", "DELSLOTS", ...slots);
}

cluster_failover(opt?: "FORCE" | "TAKEOVER"): Promise<Status> {
if (opt) {
return this.execStatusReply("CLUSTER", "FAILOVER", opt);
}
return this.execStatusReply("CLUSTER", "FAILOVER");
}

cluster_flushslots(): Promise<Status> {
return this.execStatusReply("CLUSTER", "FLUSHSLOTS");
}

cluster_forget(node_id: string): Promise<Status> {
return this.execStatusReply("CLUSTER", "FORGET", node_id);
}

cluster_getkeysinslot(slot: number, count: number): Promise<BulkString[]> {
return this.execArrayReply<BulkString>(
"CLUSTER",
"GETKEYSINSLOT",
slot,
count,
);
}

cluster_info(): Promise<BulkString> {
return this.execStatusReply("CLUSTER", "INFO");
}

cluster_keyslot(key: string): Promise<Integer> {
return this.execIntegerReply("CLUSTER", "KEYSLOT", key);
}

cluster_meet(ip: string, port: number): Promise<Status> {
return this.execStatusReply("CLUSTER", "MEET", ip, port);
}

cluster_myid(): Promise<BulkString> {
return this.execStatusReply("CLUSTER", "MYID");
}

cluster_nodes(): Promise<BulkString> {
return this.execBulkReply("CLUSTER", "NODES");
}

cluster_replicas(node_id: string): Promise<BulkString[]> {
return this.execArrayReply<BulkString>("CLUSTER", "REPLICAS", node_id);
}

cluster_replicate(node_id: string): Promise<Status> {
return this.execStatusReply("CLUSTER", "REPLICATE", node_id);
}

cluster_reset(opt?: "HARD" | "SOFT"): Promise<Status> {
if (opt) {
return this.execStatusReply("CLUSTER", "RESET", opt);
}
return this.execStatusReply("CLUSTER", "RESET");
}

cluster_saveconfig(): Promise<Status> {
return this.execStatusReply("CLUSTER", "SAVECONFIG");
}

cluster_setslot(
slot: number,
subcommand: "IMPORTING" | "MIGRATING" | "NODE" | "STABLE",
node_id?: string,
): Promise<Status> {
if (node_id) {
return this.execStatusReply(
"CLUSTER",
"SETSLOT",
slot,
subcommand,
node_id,
);
}
return this.execStatusReply("CLUSTER", "SETSLOT", slot, subcommand);
}

cluster_slaves(node_id: string): Promise<BulkString[]> {
return this.execArrayReply<BulkString>("CLUSTER", "SLAVES", node_id);
}

cluster_slots(): Promise<ConditionalArray> {
return this.execArrayReply("CLUSTER", "SLOTS");
}

command() {
return this.execArrayReply("COMMAND") as Promise<
[BulkString, Integer, BulkString[], Integer, Integer, Integer]
Expand Down Expand Up @@ -2036,7 +2140,6 @@ export async function connect({
);

await connection.connect();

return new RedisImpl(connection);
}

Expand Down
1 change: 1 addition & 0 deletions redis_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ import "./tests/string_test.ts";
import "./tests/key_test.ts";
import "./tests/stream_test.ts";
import "./tests/acl_cmd_test.ts";
import "./tests/cluster_test.ts";
6 changes: 6 additions & 0 deletions testdata/redis.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Base configuration
daemonize no
appendonly yes
cluster-config-file nodes.conf
cluster-node-timeout 30000
maxclients 1001
131 changes: 131 additions & 0 deletions tests/cluster_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import {
assert,
assertEquals,
assertStringContains,
} from "../vendor/https/deno.land/std/testing/asserts.ts";
import { newClient, startRedis, stopRedis, TestSuite } from "./test_util.ts";

const suite = new TestSuite("cluster");

const s7000 = await startRedis({ port: 7000, clusterEnabled: true });
const s7001 = await startRedis({ port: 7001, clusterEnabled: true });
const client = await newClient(7000);

suite.afterAll(() => {
stopRedis(s7000);
stopRedis(s7001);
client.close();
});

suite.test("addslots", async () => {
await client.cluster_flushslots();
assertEquals(await client.cluster_addslots(1, 2, 3), "OK");
});

suite.test("myid", async () => {
assert(!!(await client.cluster_myid()));
});

suite.test("countfailurereports", async () => {
const node_id = await client.cluster_myid();
assertEquals(await client.cluster_countfailurereports(node_id), 0);
});

suite.test("countkeysinslot", async () => {
assertEquals(await client.cluster_countkeysinslot(1), 0);
});

suite.test("delslots", async () => {
await client.cluster_flushslots();
assertEquals(await client.cluster_delslots(1, 2, 3), "OK");
});

suite.test("getkeysinslot", async () => {
assertEquals(await client.cluster_getkeysinslot(1, 1), []);
});

suite.test("flushslots", async () => {
assertEquals(await client.cluster_flushslots(), "OK");
});

suite.test("info", async () => {
assertStringContains(await client.cluster_info(), "cluster_state");
});

suite.test("keyslot", async () => {
assertEquals(await client.cluster_keyslot("somekey"), 11058);
});

suite.test("meet", async () => {
assertEquals(await client.cluster_meet("127.0.0.1", 7001), "OK");
});

suite.test("nodes", async () => {
const node_id = await client.cluster_myid();
const nodes = await client.cluster_nodes();
assertStringContains(nodes, node_id);
});

suite.test("replicas", async () => {
const node_id = await client.cluster_myid();
assertEquals(await client.cluster_replicas(node_id), []);
});

suite.test("slaves", async () => {
const node_id = await client.cluster_myid();
assertEquals(await client.cluster_slaves(node_id), []);
});

suite.test("forget", async () => {
const node_id = await client.cluster_myid();
const other_node = (await client.cluster_nodes())
.split("\n")
.find((n) => !n.startsWith(node_id))
?.split(" ")[0];
if (other_node) {
assertEquals(await client.cluster_forget(other_node), "OK");
}
});

suite.test("saveconfig", async () => {
assertEquals(await client.cluster_saveconfig(), "OK");
});

suite.test("setslot", async () => {
const node_id = await client.cluster_myid();
assertEquals(await client.cluster_setslot(1, "NODE", node_id), "OK");
assertEquals(await client.cluster_setslot(1, "MIGRATING", node_id), "OK");
assertEquals(await client.cluster_setslot(1, "STABLE"), "OK");
});

suite.test("slots", async () => {
assert(Array.isArray(await client.cluster_slots()));
});

suite.test("replicate", async () => {
const node_id = await client.cluster_myid();
const other_node = (await client.cluster_nodes())
.split("\n")
.find((n) => !n.startsWith(node_id))
?.split(" ")[0];
if (other_node) {
assertEquals(await client.cluster_replicate(other_node), "OK");
}
});

suite.test("failover", async () => {
const node_id = await client.cluster_myid();
const other_node = (await client.cluster_nodes())
.split("\n")
.find((n) => !n.startsWith(node_id))
?.split(" ")[0];
if (other_node) {
assertEquals(await client.cluster_failover(), "OK");
}
});

suite.test("reset", async () => {
assertEquals(await client.cluster_reset(), "OK");
});

await suite.runTests();
Loading

0 comments on commit 7a2876d

Please sign in to comment.