Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add redis pooling and pipelining combo #317

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 111 additions & 107 deletions engine/redis_state_store.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,24 @@ class RedisStateStore {
debug(`Overriding default, volatileKeyTTL=${opts.volatileKeyTTL}s`);
this.volatileKeyTTL = opts.volatileKeyTTL;
}
this.client = new Redis(opts.redisUrl, { enableAutoPipelining: true });
this.pool = this.createRedisPool(10, opts.redisUrl);
}

createRedisPool(size, redisUrl) {
const pool = [];
for (let i = 0; i < size; i++) {
const client = new Redis(redisUrl);
pool.push(client);
}
return pool;
}

getClientFromPool() {
return this.pool.pop();
}

returnClientToPool(client) {
this.pool.push(client);
}

async initAsync(id, initData) {
Expand All @@ -39,158 +56,145 @@ class RedisStateStore {
for (const key of Object.keys(initData)) {
debug(`${this.keyPrefix}:${id}: Initiating key ${key} with data from store`);
data[key] = await this.getAsync(id, key);
//debug(`${this.keyPrefix}:${id}: Key ${key} initiated with data from store: ${data[key]}`);
}
}
return data;
}

async resetAsync(id, initData) {
await this.setAsync(id, "_initiated", false);
await this.initAsync(id, initData);
await this.setAsync(id, "_initiated", false);
await this.initAsync(id, initData);
}

async resetAllAsync() {
const resetAsync = new Promise((resolve, reject) => {
this.client.flushall((err, reply) => {
if (!err) {
console.log("Flushed Redis db: ", reply);
resolve();
} else {
reject(err);
}
});
});
await resetAsync;
const client = this.getClientFromPool();
try {
await client.flushall();
console.log("Flushed Redis db");
} catch (err) {
console.error("Error flushing Redis db:", err);
} finally {
this.returnClientToPool(client);
}
}

async getValues(id, keys) {
const pipeline = this.client.pipeline();
const client = this.getClientFromPool();
const pipeline = client.pipeline();
let data = {};
const startMs = Date.now();

for (const key of keys) {
const storeKey = "" + this.keyPrefix + id + key;
pipeline.get(storeKey, (err, reply) => {
if (!err) {
debug(`REDIS get(pipeline) ${storeKey}:${reply ? reply.length + " chars" : "null"}`);
try {
data[key] = JSON.parse(reply);
} catch (err) {
console.error(`REDIS get(pipeline): Failed to parse ${storeKey} data: '${reply}'`);
}
}
});
const storeKey = `${this.keyPrefix}${id}${key}`;
pipeline.get(storeKey);
}

const results = await pipeline.exec();
const ops = pipeline.length;
await pipeline.exec();

results.forEach((result, index) => {
const reply = result[1];
const storeKey = `${this.keyPrefix}${id}${keys[index]}`;
debug(`REDIS get(pipeline) ${storeKey}:${reply ? reply.length + " chars" : "null"}`);
if (reply) {
try {
data[keys[index]] = JSON.parse(reply);
} catch (err) {
console.error(`REDIS get(pipeline): Failed to parse ${storeKey} data: '${reply}'`);
}
}
});

const ioTimeMs = Date.now() - startMs;
cloudWatchLog(!REDIS_VERBOSE_LOG, 'redis',
{ event: 'getValues', operations: ops, ioTimeMs: ioTimeMs });
cloudWatchLog(!REDIS_VERBOSE_LOG, "redis", { event: "getValues", operations: ops, ioTimeMs: ioTimeMs });
this.returnClientToPool(client);
return data;
}

async getAsync(id, key) {
const client = this.getClientFromPool();
const startMs = Date.now();
const storeKey = "" + this.keyPrefix + id + key;
const getAsync = new Promise((resolve, reject) => {
this.client.get(storeKey, (err, reply) => {
const ioTimeMs = Date.now() - startMs;
debug(`REDIS get ${storeKey}:${reply ? reply.length + " chars" : "null"} (${ioTimeMs}ms) ${ioTimeMs > 1000 ? 'REDISSLOW!' : ''}`);
if (!err) {
let data;
try {
data = JSON.parse(reply);
} catch (err) {
console.error(`REDIS get: Failed to parse ${storeKey} data: '${reply}'`);
}
cloudWatchLog(!REDIS_VERBOSE_LOG, 'redis',
{ event: 'get', operations: 1, ioTimeMs: ioTimeMs });
resolve(data);
} else {
reject(err);
}
});
});
const data = await getAsync;
return data;
const storeKey = `${this.keyPrefix}${id}${key}`;
const reply = await client.get(storeKey);
const ioTimeMs = Date.now() - startMs;

debug(`REDIS get ${storeKey}:${reply ? reply.length + " chars" : "null"} (${ioTimeMs}ms) ${ioTimeMs > 1000 ? "REDISSLOW!" : ""}`);
this.returnClientToPool(client);

if (reply) {
try {
cloudWatchLog(!REDIS_VERBOSE_LOG, "redis", { event: "get", operations: 1, ioTimeMs: ioTimeMs });
return JSON.parse(reply);
} catch (err) {
console.error(`REDIS get: Failed to parse ${storeKey} data: '${reply}'`);
}
}
return null;
}

async setValues(id, data) {
const client = this.getClientFromPool();
const returnData = {};
const startMs = Date.now();
const pipeline = this.client.pipeline();
const pipeline = client.pipeline();

for (const key of Object.keys(data)) {
const storeKey = "" + this.keyPrefix + id + key;
const storeKey = `${this.keyPrefix}${id}${key}`;
const value = data[key];
pipeline.set(storeKey, JSON.stringify(value), (err, res) => {
if (!err) {
debug(`REDIS set(pipeline) ${storeKey}: ${res}`);
returnData[key] = value;
}
});
pipeline.set(storeKey, JSON.stringify(value));
}

const results = await pipeline.exec();
const ops = pipeline.length;
await pipeline.exec();
const ioTimeMs = Date.now() - startMs;
cloudWatchLog(!REDIS_VERBOSE_LOG, 'redis',
{ event: 'setValues', operations: ops, ioTimeMs: ioTimeMs });

results.forEach((result, index) => {
const storeKey = `${this.keyPrefix}${id}${Object.keys(data)[index]}`;
debug(`REDIS set(pipeline) ${storeKey}: ${result[1]}`);
returnData[Object.keys(data)[index]] = data[Object.keys(data)[index]];
});

const ioTimeMs = Date.now() - startMs;
cloudWatchLog(!REDIS_VERBOSE_LOG, "redis", { event: "setValues", operations: ops, ioTimeMs: ioTimeMs });
this.returnClientToPool(client);
return returnData;
}

async setAsync(id, key, value) {
const client = this.getClientFromPool();
const startMs = Date.now();
const storeKey = "" + this.keyPrefix + id + key;
const setAsync = new Promise((resolve, reject) => {
this.client.set(storeKey, JSON.stringify(value), (err, res) => {
const ioTimeMs = Date.now() - startMs;
debug(`REDIS set ${storeKey}: ${res} (${ioTimeMs}ms) ${ioTimeMs > 1000 ? "REDISSLOW!" : ""}`);
if (!err) {
cloudWatchLog(!REDIS_VERBOSE_LOG, 'redis',
{ event: 'set', operations: 1, ioTimeMs: ioTimeMs });
resolve(value);
} else {
reject(err);
}
});
});
return await setAsync;
const storeKey = `${this.keyPrefix}${id}${key}`;
const res = await client.set(storeKey, JSON.stringify(value));
const ioTimeMs = Date.now() - startMs;

debug(`REDIS set ${storeKey}: ${res} (${ioTimeMs}ms) ${ioTimeMs > 1000 ? "REDISSLOW!" : ""}`);
cloudWatchLog(!REDIS_VERBOSE_LOG, "redis", { event: "set", operations: 1, ioTimeMs: ioTimeMs });
this.returnClientToPool(client);
return value;
}

async setVolatileAsync(id, key, value) {
const data = await this.setAsync(id, key, value);
const storeKey = "" + this.keyPrefix + id + key;
const expireAsync = new Promise((resolve, reject) => {
this.client.expire(storeKey, this.volatileKeyTTL, (err, res) => {
if (!err) {
debug(`REDIS expire ${storeKey} ${this.volatileKeyTTL}s: ${res === 1 ? "OK" : "KEY DOES NOT EXIST"}`);
resolve();
} else {
reject(err);
}
});
});
await expireAsync;
const storeKey = `${this.keyPrefix}${id}${key}`;
const client = this.getClientFromPool();

await client.expire(storeKey, this.volatileKeyTTL);
debug(`REDIS expire ${storeKey} ${this.volatileKeyTTL}s`);
this.returnClientToPool(client);
return data;
}

async removeAsync(id, key) {
const client = this.getClientFromPool();
const startMs = Date.now();
const storeKey = "" + this.keyPrefix + id + key;
const delAsync = new Promise((resolve, reject) => {
this.client.del(storeKey, (err, res) => {
const ioTimeMs = Date.now() - startMs;
debug(`REDIS remove ${storeKey}: (${ioTimeMs}ms) ${ioTimeMs > 1000 ? "REDISSLOW!" : ""}`);
if (!err) {
cloudWatchLog(!REDIS_VERBOSE_LOG, 'redis',
{ event: 'remove', operations: 1, ioTimeMs: ioTimeMs });
resolve();
} else {
reject(err);
}
});
});
await delAsync;
const storeKey = `${this.keyPrefix}${id}${key}`;
const res = await client.del(storeKey);
const ioTimeMs = Date.now() - startMs;

debug(`REDIS remove ${storeKey}: (${ioTimeMs}ms) ${ioTimeMs > 1000 ? "REDISSLOW!" : ""}`);
cloudWatchLog(!REDIS_VERBOSE_LOG, "redis", { event: "remove", operations: 1, ioTimeMs: ioTimeMs });
this.returnClientToPool(client);
return res;
}
}

module.exports = RedisStateStore;
module.exports = RedisStateStore;
2 changes: 1 addition & 1 deletion engine/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ class Session {
if (this.timePositionOffset && this.diffCompensation <= 0 && this.alwaysNewSegments) {
timePosition -= this.timePositionOffset;
cloudWatchLog(!this.cloudWatchLogging, 'engine-session',
{ event: 'applyTimePositionOffset', channel: this._sessionId, offsetMs: this.timePositionOffset });
{ event: 'applyTimePositionOffset', channel: this._sessionId, offsetMs: this.timePositionOffset }); // why does this trigger and timeposoff is large? TODO
}
const diff = position - timePosition;
debug(`[${this._sessionId}]: ${timePosition}:${roundToThreeDecimals(position) }:${diff > 0 ? '+' : ''}${roundToThreeDecimals(diff) }ms`);
Expand Down
Loading