Skip to content

Commit

Permalink
add redis pooling and pipelining combo (#317)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nfrederiksen authored Nov 15, 2024
1 parent b5c8e6b commit 3627eec
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 108 deletions.
218 changes: 111 additions & 107 deletions engine/redis_state_store.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,24 @@ class RedisStateStore {
debug(`Overriding default, volatileKeyTTL=${opts.volatileKeyTTL}s`);
this.volatileKeyTTL = opts.volatileKeyTTL;
}
this.client = new Redis(opts.redisUrl, { enableAutoPipelining: true });
this.pool = this.createRedisPool(10, opts.redisUrl);
}

createRedisPool(size, redisUrl) {
const pool = [];
for (let i = 0; i < size; i++) {
const client = new Redis(redisUrl);
pool.push(client);
}
return pool;
}

getClientFromPool() {
return this.pool.pop();
}

returnClientToPool(client) {
this.pool.push(client);
}

async initAsync(id, initData) {
Expand All @@ -39,158 +56,145 @@ class RedisStateStore {
for (const key of Object.keys(initData)) {
debug(`${this.keyPrefix}:${id}: Initiating key ${key} with data from store`);
data[key] = await this.getAsync(id, key);
//debug(`${this.keyPrefix}:${id}: Key ${key} initiated with data from store: ${data[key]}`);
}
}
return data;
}

async resetAsync(id, initData) {
await this.setAsync(id, "_initiated", false);
await this.initAsync(id, initData);
await this.setAsync(id, "_initiated", false);
await this.initAsync(id, initData);
}

async resetAllAsync() {
const resetAsync = new Promise((resolve, reject) => {
this.client.flushall((err, reply) => {
if (!err) {
console.log("Flushed Redis db: ", reply);
resolve();
} else {
reject(err);
}
});
});
await resetAsync;
const client = this.getClientFromPool();
try {
await client.flushall();
console.log("Flushed Redis db");
} catch (err) {
console.error("Error flushing Redis db:", err);
} finally {
this.returnClientToPool(client);
}
}

async getValues(id, keys) {
const pipeline = this.client.pipeline();
const client = this.getClientFromPool();
const pipeline = client.pipeline();
let data = {};
const startMs = Date.now();

for (const key of keys) {
const storeKey = "" + this.keyPrefix + id + key;
pipeline.get(storeKey, (err, reply) => {
if (!err) {
debug(`REDIS get(pipeline) ${storeKey}:${reply ? reply.length + " chars" : "null"}`);
try {
data[key] = JSON.parse(reply);
} catch (err) {
console.error(`REDIS get(pipeline): Failed to parse ${storeKey} data: '${reply}'`);
}
}
});
const storeKey = `${this.keyPrefix}${id}${key}`;
pipeline.get(storeKey);
}

const results = await pipeline.exec();
const ops = pipeline.length;
await pipeline.exec();

results.forEach((result, index) => {
const reply = result[1];
const storeKey = `${this.keyPrefix}${id}${keys[index]}`;
debug(`REDIS get(pipeline) ${storeKey}:${reply ? reply.length + " chars" : "null"}`);
if (reply) {
try {
data[keys[index]] = JSON.parse(reply);
} catch (err) {
console.error(`REDIS get(pipeline): Failed to parse ${storeKey} data: '${reply}'`);
}
}
});

const ioTimeMs = Date.now() - startMs;
cloudWatchLog(!REDIS_VERBOSE_LOG, 'redis',
{ event: 'getValues', operations: ops, ioTimeMs: ioTimeMs });
cloudWatchLog(!REDIS_VERBOSE_LOG, "redis", { event: "getValues", operations: ops, ioTimeMs: ioTimeMs });
this.returnClientToPool(client);
return data;
}

async getAsync(id, key) {
const client = this.getClientFromPool();
const startMs = Date.now();
const storeKey = "" + this.keyPrefix + id + key;
const getAsync = new Promise((resolve, reject) => {
this.client.get(storeKey, (err, reply) => {
const ioTimeMs = Date.now() - startMs;
debug(`REDIS get ${storeKey}:${reply ? reply.length + " chars" : "null"} (${ioTimeMs}ms) ${ioTimeMs > 1000 ? 'REDISSLOW!' : ''}`);
if (!err) {
let data;
try {
data = JSON.parse(reply);
} catch (err) {
console.error(`REDIS get: Failed to parse ${storeKey} data: '${reply}'`);
}
cloudWatchLog(!REDIS_VERBOSE_LOG, 'redis',
{ event: 'get', operations: 1, ioTimeMs: ioTimeMs });
resolve(data);
} else {
reject(err);
}
});
});
const data = await getAsync;
return data;
const storeKey = `${this.keyPrefix}${id}${key}`;
const reply = await client.get(storeKey);
const ioTimeMs = Date.now() - startMs;

debug(`REDIS get ${storeKey}:${reply ? reply.length + " chars" : "null"} (${ioTimeMs}ms) ${ioTimeMs > 1000 ? "REDISSLOW!" : ""}`);
this.returnClientToPool(client);

if (reply) {
try {
cloudWatchLog(!REDIS_VERBOSE_LOG, "redis", { event: "get", operations: 1, ioTimeMs: ioTimeMs });
return JSON.parse(reply);
} catch (err) {
console.error(`REDIS get: Failed to parse ${storeKey} data: '${reply}'`);
}
}
return null;
}

async setValues(id, data) {
const client = this.getClientFromPool();
const returnData = {};
const startMs = Date.now();
const pipeline = this.client.pipeline();
const pipeline = client.pipeline();

for (const key of Object.keys(data)) {
const storeKey = "" + this.keyPrefix + id + key;
const storeKey = `${this.keyPrefix}${id}${key}`;
const value = data[key];
pipeline.set(storeKey, JSON.stringify(value), (err, res) => {
if (!err) {
debug(`REDIS set(pipeline) ${storeKey}: ${res}`);
returnData[key] = value;
}
});
pipeline.set(storeKey, JSON.stringify(value));
}

const results = await pipeline.exec();
const ops = pipeline.length;
await pipeline.exec();
const ioTimeMs = Date.now() - startMs;
cloudWatchLog(!REDIS_VERBOSE_LOG, 'redis',
{ event: 'setValues', operations: ops, ioTimeMs: ioTimeMs });

results.forEach((result, index) => {
const storeKey = `${this.keyPrefix}${id}${Object.keys(data)[index]}`;
debug(`REDIS set(pipeline) ${storeKey}: ${result[1]}`);
returnData[Object.keys(data)[index]] = data[Object.keys(data)[index]];
});

const ioTimeMs = Date.now() - startMs;
cloudWatchLog(!REDIS_VERBOSE_LOG, "redis", { event: "setValues", operations: ops, ioTimeMs: ioTimeMs });
this.returnClientToPool(client);
return returnData;
}

async setAsync(id, key, value) {
const client = this.getClientFromPool();
const startMs = Date.now();
const storeKey = "" + this.keyPrefix + id + key;
const setAsync = new Promise((resolve, reject) => {
this.client.set(storeKey, JSON.stringify(value), (err, res) => {
const ioTimeMs = Date.now() - startMs;
debug(`REDIS set ${storeKey}: ${res} (${ioTimeMs}ms) ${ioTimeMs > 1000 ? "REDISSLOW!" : ""}`);
if (!err) {
cloudWatchLog(!REDIS_VERBOSE_LOG, 'redis',
{ event: 'set', operations: 1, ioTimeMs: ioTimeMs });
resolve(value);
} else {
reject(err);
}
});
});
return await setAsync;
const storeKey = `${this.keyPrefix}${id}${key}`;
const res = await client.set(storeKey, JSON.stringify(value));
const ioTimeMs = Date.now() - startMs;

debug(`REDIS set ${storeKey}: ${res} (${ioTimeMs}ms) ${ioTimeMs > 1000 ? "REDISSLOW!" : ""}`);
cloudWatchLog(!REDIS_VERBOSE_LOG, "redis", { event: "set", operations: 1, ioTimeMs: ioTimeMs });
this.returnClientToPool(client);
return value;
}

async setVolatileAsync(id, key, value) {
const data = await this.setAsync(id, key, value);
const storeKey = "" + this.keyPrefix + id + key;
const expireAsync = new Promise((resolve, reject) => {
this.client.expire(storeKey, this.volatileKeyTTL, (err, res) => {
if (!err) {
debug(`REDIS expire ${storeKey} ${this.volatileKeyTTL}s: ${res === 1 ? "OK" : "KEY DOES NOT EXIST"}`);
resolve();
} else {
reject(err);
}
});
});
await expireAsync;
const storeKey = `${this.keyPrefix}${id}${key}`;
const client = this.getClientFromPool();

await client.expire(storeKey, this.volatileKeyTTL);
debug(`REDIS expire ${storeKey} ${this.volatileKeyTTL}s`);
this.returnClientToPool(client);
return data;
}

async removeAsync(id, key) {
const client = this.getClientFromPool();
const startMs = Date.now();
const storeKey = "" + this.keyPrefix + id + key;
const delAsync = new Promise((resolve, reject) => {
this.client.del(storeKey, (err, res) => {
const ioTimeMs = Date.now() - startMs;
debug(`REDIS remove ${storeKey}: (${ioTimeMs}ms) ${ioTimeMs > 1000 ? "REDISSLOW!" : ""}`);
if (!err) {
cloudWatchLog(!REDIS_VERBOSE_LOG, 'redis',
{ event: 'remove', operations: 1, ioTimeMs: ioTimeMs });
resolve();
} else {
reject(err);
}
});
});
await delAsync;
const storeKey = `${this.keyPrefix}${id}${key}`;
const res = await client.del(storeKey);
const ioTimeMs = Date.now() - startMs;

debug(`REDIS remove ${storeKey}: (${ioTimeMs}ms) ${ioTimeMs > 1000 ? "REDISSLOW!" : ""}`);
cloudWatchLog(!REDIS_VERBOSE_LOG, "redis", { event: "remove", operations: 1, ioTimeMs: ioTimeMs });
this.returnClientToPool(client);
return res;
}
}

module.exports = RedisStateStore;
module.exports = RedisStateStore;
2 changes: 1 addition & 1 deletion engine/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ class Session {
if (this.timePositionOffset && this.diffCompensation <= 0 && this.alwaysNewSegments) {
timePosition -= this.timePositionOffset;
cloudWatchLog(!this.cloudWatchLogging, 'engine-session',
{ event: 'applyTimePositionOffset', channel: this._sessionId, offsetMs: this.timePositionOffset });
{ event: 'applyTimePositionOffset', channel: this._sessionId, offsetMs: this.timePositionOffset }); // why does this trigger and timeposoff is large? TODO
}
const diff = position - timePosition;
debug(`[${this._sessionId}]: ${timePosition}:${roundToThreeDecimals(position) }:${diff > 0 ? '+' : ''}${roundToThreeDecimals(diff) }ms`);
Expand Down

0 comments on commit 3627eec

Please sign in to comment.