diff --git a/engine/redis_state_store.js b/engine/redis_state_store.js index 5b4f617..e9d92ba 100644 --- a/engine/redis_state_store.js +++ b/engine/redis_state_store.js @@ -24,7 +24,24 @@ class RedisStateStore { debug(`Overriding default, volatileKeyTTL=${opts.volatileKeyTTL}s`); this.volatileKeyTTL = opts.volatileKeyTTL; } - this.client = new Redis(opts.redisUrl, { enableAutoPipelining: true }); + this.pool = this.createRedisPool(10, opts.redisUrl); + } + + createRedisPool(size, redisUrl) { + const pool = []; + for (let i = 0; i < size; i++) { + const client = new Redis(redisUrl); + pool.push(client); + } + return pool; + } + + getClientFromPool() { + return this.pool.pop(); + } + + returnClientToPool(client) { + this.pool.push(client); } async initAsync(id, initData) { @@ -39,158 +56,145 @@ class RedisStateStore { for (const key of Object.keys(initData)) { debug(`${this.keyPrefix}:${id}: Initiating key ${key} with data from store`); data[key] = await this.getAsync(id, key); - //debug(`${this.keyPrefix}:${id}: Key ${key} initiated with data from store: ${data[key]}`); } } return data; } async resetAsync(id, initData) { - await this.setAsync(id, "_initiated", false); - await this.initAsync(id, initData); + await this.setAsync(id, "_initiated", false); + await this.initAsync(id, initData); } async resetAllAsync() { - const resetAsync = new Promise((resolve, reject) => { - this.client.flushall((err, reply) => { - if (!err) { - console.log("Flushed Redis db: ", reply); - resolve(); - } else { - reject(err); - } - }); - }); - await resetAsync; + const client = this.getClientFromPool(); + try { + await client.flushall(); + console.log("Flushed Redis db"); + } catch (err) { + console.error("Error flushing Redis db:", err); + } finally { + this.returnClientToPool(client); + } } async getValues(id, keys) { - const pipeline = this.client.pipeline(); + const client = this.getClientFromPool(); + const pipeline = client.pipeline(); let data = {}; const startMs = Date.now(); + for (const key of keys) { - const storeKey = "" + this.keyPrefix + id + key; - pipeline.get(storeKey, (err, reply) => { - if (!err) { - debug(`REDIS get(pipeline) ${storeKey}:${reply ? reply.length + " chars" : "null"}`); - try { - data[key] = JSON.parse(reply); - } catch (err) { - console.error(`REDIS get(pipeline): Failed to parse ${storeKey} data: '${reply}'`); - } - } - }); + const storeKey = `${this.keyPrefix}${id}${key}`; + pipeline.get(storeKey); } + + const results = await pipeline.exec(); const ops = pipeline.length; - await pipeline.exec(); + + results.forEach((result, index) => { + const reply = result[1]; + const storeKey = `${this.keyPrefix}${id}${keys[index]}`; + debug(`REDIS get(pipeline) ${storeKey}:${reply ? reply.length + " chars" : "null"}`); + if (reply) { + try { + data[keys[index]] = JSON.parse(reply); + } catch (err) { + console.error(`REDIS get(pipeline): Failed to parse ${storeKey} data: '${reply}'`); + } + } + }); + const ioTimeMs = Date.now() - startMs; - cloudWatchLog(!REDIS_VERBOSE_LOG, 'redis', - { event: 'getValues', operations: ops, ioTimeMs: ioTimeMs }); + cloudWatchLog(!REDIS_VERBOSE_LOG, "redis", { event: "getValues", operations: ops, ioTimeMs: ioTimeMs }); + this.returnClientToPool(client); return data; } async getAsync(id, key) { + const client = this.getClientFromPool(); const startMs = Date.now(); - const storeKey = "" + this.keyPrefix + id + key; - const getAsync = new Promise((resolve, reject) => { - this.client.get(storeKey, (err, reply) => { - const ioTimeMs = Date.now() - startMs; - debug(`REDIS get ${storeKey}:${reply ? reply.length + " chars" : "null"} (${ioTimeMs}ms) ${ioTimeMs > 1000 ? 'REDISSLOW!' : ''}`); - if (!err) { - let data; - try { - data = JSON.parse(reply); - } catch (err) { - console.error(`REDIS get: Failed to parse ${storeKey} data: '${reply}'`); - } - cloudWatchLog(!REDIS_VERBOSE_LOG, 'redis', - { event: 'get', operations: 1, ioTimeMs: ioTimeMs }); - resolve(data); - } else { - reject(err); - } - }); - }); - const data = await getAsync; - return data; + const storeKey = `${this.keyPrefix}${id}${key}`; + const reply = await client.get(storeKey); + const ioTimeMs = Date.now() - startMs; + + debug(`REDIS get ${storeKey}:${reply ? reply.length + " chars" : "null"} (${ioTimeMs}ms) ${ioTimeMs > 1000 ? "REDISSLOW!" : ""}`); + this.returnClientToPool(client); + + if (reply) { + try { + cloudWatchLog(!REDIS_VERBOSE_LOG, "redis", { event: "get", operations: 1, ioTimeMs: ioTimeMs }); + return JSON.parse(reply); + } catch (err) { + console.error(`REDIS get: Failed to parse ${storeKey} data: '${reply}'`); + } + } + return null; } async setValues(id, data) { + const client = this.getClientFromPool(); const returnData = {}; const startMs = Date.now(); - const pipeline = this.client.pipeline(); + const pipeline = client.pipeline(); + for (const key of Object.keys(data)) { - const storeKey = "" + this.keyPrefix + id + key; + const storeKey = `${this.keyPrefix}${id}${key}`; const value = data[key]; - pipeline.set(storeKey, JSON.stringify(value), (err, res) => { - if (!err) { - debug(`REDIS set(pipeline) ${storeKey}: ${res}`); - returnData[key] = value; - } - }); + pipeline.set(storeKey, JSON.stringify(value)); } + + const results = await pipeline.exec(); const ops = pipeline.length; - await pipeline.exec(); - const ioTimeMs = Date.now() - startMs; - cloudWatchLog(!REDIS_VERBOSE_LOG, 'redis', - { event: 'setValues', operations: ops, ioTimeMs: ioTimeMs }); + + results.forEach((result, index) => { + const storeKey = `${this.keyPrefix}${id}${Object.keys(data)[index]}`; + debug(`REDIS set(pipeline) ${storeKey}: ${result[1]}`); + returnData[Object.keys(data)[index]] = data[Object.keys(data)[index]]; + }); + + const ioTimeMs = Date.now() - startMs; + cloudWatchLog(!REDIS_VERBOSE_LOG, "redis", { event: "setValues", operations: ops, ioTimeMs: ioTimeMs }); + this.returnClientToPool(client); return returnData; } async setAsync(id, key, value) { + const client = this.getClientFromPool(); const startMs = Date.now(); - const storeKey = "" + this.keyPrefix + id + key; - const setAsync = new Promise((resolve, reject) => { - this.client.set(storeKey, JSON.stringify(value), (err, res) => { - const ioTimeMs = Date.now() - startMs; - debug(`REDIS set ${storeKey}: ${res} (${ioTimeMs}ms) ${ioTimeMs > 1000 ? "REDISSLOW!" : ""}`); - if (!err) { - cloudWatchLog(!REDIS_VERBOSE_LOG, 'redis', - { event: 'set', operations: 1, ioTimeMs: ioTimeMs }); - resolve(value); - } else { - reject(err); - } - }); - }); - return await setAsync; + const storeKey = `${this.keyPrefix}${id}${key}`; + const res = await client.set(storeKey, JSON.stringify(value)); + const ioTimeMs = Date.now() - startMs; + + debug(`REDIS set ${storeKey}: ${res} (${ioTimeMs}ms) ${ioTimeMs > 1000 ? "REDISSLOW!" : ""}`); + cloudWatchLog(!REDIS_VERBOSE_LOG, "redis", { event: "set", operations: 1, ioTimeMs: ioTimeMs }); + this.returnClientToPool(client); + return value; } async setVolatileAsync(id, key, value) { const data = await this.setAsync(id, key, value); - const storeKey = "" + this.keyPrefix + id + key; - const expireAsync = new Promise((resolve, reject) => { - this.client.expire(storeKey, this.volatileKeyTTL, (err, res) => { - if (!err) { - debug(`REDIS expire ${storeKey} ${this.volatileKeyTTL}s: ${res === 1 ? "OK" : "KEY DOES NOT EXIST"}`); - resolve(); - } else { - reject(err); - } - }); - }); - await expireAsync; + const storeKey = `${this.keyPrefix}${id}${key}`; + const client = this.getClientFromPool(); + + await client.expire(storeKey, this.volatileKeyTTL); + debug(`REDIS expire ${storeKey} ${this.volatileKeyTTL}s`); + this.returnClientToPool(client); return data; } async removeAsync(id, key) { + const client = this.getClientFromPool(); const startMs = Date.now(); - const storeKey = "" + this.keyPrefix + id + key; - const delAsync = new Promise((resolve, reject) => { - this.client.del(storeKey, (err, res) => { - const ioTimeMs = Date.now() - startMs; - debug(`REDIS remove ${storeKey}: (${ioTimeMs}ms) ${ioTimeMs > 1000 ? "REDISSLOW!" : ""}`); - if (!err) { - cloudWatchLog(!REDIS_VERBOSE_LOG, 'redis', - { event: 'remove', operations: 1, ioTimeMs: ioTimeMs }); - resolve(); - } else { - reject(err); - } - }); - }); - await delAsync; + const storeKey = `${this.keyPrefix}${id}${key}`; + const res = await client.del(storeKey); + const ioTimeMs = Date.now() - startMs; + + debug(`REDIS remove ${storeKey}: (${ioTimeMs}ms) ${ioTimeMs > 1000 ? "REDISSLOW!" : ""}`); + cloudWatchLog(!REDIS_VERBOSE_LOG, "redis", { event: "remove", operations: 1, ioTimeMs: ioTimeMs }); + this.returnClientToPool(client); + return res; } } -module.exports = RedisStateStore; \ No newline at end of file +module.exports = RedisStateStore; diff --git a/engine/session.js b/engine/session.js index d5214f2..a289923 100644 --- a/engine/session.js +++ b/engine/session.js @@ -240,7 +240,7 @@ class Session { if (this.timePositionOffset && this.diffCompensation <= 0 && this.alwaysNewSegments) { timePosition -= this.timePositionOffset; cloudWatchLog(!this.cloudWatchLogging, 'engine-session', - { event: 'applyTimePositionOffset', channel: this._sessionId, offsetMs: this.timePositionOffset }); + { event: 'applyTimePositionOffset', channel: this._sessionId, offsetMs: this.timePositionOffset }); // why does this trigger and timeposoff is large? TODO } const diff = position - timePosition; debug(`[${this._sessionId}]: ${timePosition}:${roundToThreeDecimals(position) }:${diff > 0 ? '+' : ''}${roundToThreeDecimals(diff) }ms`);