Skip to content

Commit

Permalink
feat: add maxMemoryLimitBeforeRecycle options
Browse files Browse the repository at this point in the history
  • Loading branch information
AriPerkkio committed Jun 19, 2023
1 parent 977df66 commit 8e39223
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 5 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ We have a similar API to Piscina, so for more information, you can read Piscina'
#### Pool constructor options
- `isolateWorkers`: Default to `false`. Always starts with a fresh worker when running tasks to isolate the environment.
- `terminateTimeout`: Defaults to `null`. If terminating a worker takes `terminateTimeout` amount of milliseconds to execute, an error is raised.
- `isolateWorkers`: Disabled by default. Always starts with a fresh worker when running tasks to isolate the environment.
- `terminateTimeout`: Disabled by default. If terminating a worker takes `terminateTimeout` amount of milliseconds to execute, an error is raised.
- `maxMemoryLimitBeforeRecycle`: Disabled by default. When defined, the worker's heap memory usage is compared against this value after task has been finished. If the current memory usage exceeds this limit, worker is terminated and a new one is started to take its place. This option is useful when your tasks leak memory and you don't want to enable `isolateWorkers` option.
#### Pool methods
Expand Down
1 change: 1 addition & 0 deletions src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export interface ResponseMessage {
taskId: number
result: any
error: unknown | null
usedMemory: number
}

export interface TinypoolPrivateData {
Expand Down
17 changes: 15 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ interface Options {
concurrentTasksPerWorker?: number
useAtomics?: boolean
resourceLimits?: ResourceLimits
maxMemoryLimitBeforeRecycle?: number
argv?: string[]
execArgv?: string[]
env?: EnvSpecifier
Expand Down Expand Up @@ -443,6 +444,7 @@ class WorkerInfo extends AsynchronouslyCreatedResource {
port: MessagePort
sharedBuffer: Int32Array
lastSeenResponseCount: number = 0
usedMemory?: number
onMessage: ResponseCallback

constructor(
Expand Down Expand Up @@ -521,6 +523,7 @@ class WorkerInfo extends AsynchronouslyCreatedResource {
}

_handleResponse(message: ResponseMessage): void {
this.usedMemory = message.usedMemory
this.onMessage(message)

if (this.taskInfos.size === 0) {
Expand Down Expand Up @@ -892,8 +895,18 @@ class ThreadPool {
}

// When `isolateWorkers` is enabled, remove the worker after task is finished
if (this.options.isolateWorkers && taskInfo.workerInfo) {
this._removeWorker(taskInfo.workerInfo)
const shouldIsolateWorker =
this.options.isolateWorkers && taskInfo.workerInfo

// When `maxMemoryLimitBeforeRecycle` is enabled, remove workers that have exceeded the memory limit
const shouldRecycleWorker =
!this.options.isolateWorkers &&
this.options.maxMemoryLimitBeforeRecycle !== undefined &&
(taskInfo.workerInfo?.usedMemory || 0) >
this.options.maxMemoryLimitBeforeRecycle

if (shouldIsolateWorker || shouldRecycleWorker) {
this._removeWorker(taskInfo.workerInfo!)
.then(() => this._ensureMinimumWorkers())
.then(() => this._ensureEnoughWorkersForTaskQueue())
.then(() => resolve(result))
Expand Down
2 changes: 2 additions & 0 deletions src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ function onMessage(
taskId,
result: result,
error: null,
usedMemory: process.memoryUsage().heapUsed,
}

// If the task used e.g. console.log(), wait for the stream to drain
Expand All @@ -191,6 +192,7 @@ function onMessage(
// It may be worth taking a look at the error cloning algorithm we
// use in Node.js core here, it's quite a bit more flexible
error,
usedMemory: process.memoryUsage().heapUsed,
}
}
currentTasks--
Expand Down
17 changes: 17 additions & 0 deletions test/fixtures/leak-memory.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export let leaks = []

/**
* Leak some memory to test memory limit usage.
* The argument `bytes` is not 100% accurate of the leaked bytes but good enough.
*/
export default function run(bytes) {
const before = process.memoryUsage().heapUsed

for (const _ of Array(bytes).fill()) {
leaks.push(new SharedArrayBuffer(1024))
}
const after = process.memoryUsage().heapUsed
const diff = after - before

console.log(`Leaked: ${diff}. Heap used: ${process.memoryUsage().heapUsed}`)
}
40 changes: 39 additions & 1 deletion test/resource-limits.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ test('resourceLimits causes task to reject', async () => {
const worker = new Tinypool({
filename: resolve(__dirname, 'fixtures/resource-limits.js'),
resourceLimits: {

maxOldGenerationSizeMb: 4,
maxYoungGenerationSizeMb: 2,
codeRangeSizeMb: 4,
Expand Down Expand Up @@ -36,3 +35,42 @@ test('resourceLimits causes task to reject', async () => {
/Worker terminated due to reaching memory limit: JS heap out of memory/
)
})

test('worker is recycled after reaching maxMemoryLimitBeforeRecycle', async () => {
const pool = new Tinypool({
filename: resolve(__dirname, 'fixtures/leak-memory.js'),
maxMemoryLimitBeforeRecycle: 10_000_000,
isolateWorkers: false,
minThreads: 1,
maxThreads: 1,
})

const originalWorkerId = pool.threads[0]?.threadId
expect(originalWorkerId).toBeGreaterThan(0)

let finalThreadId = originalWorkerId
let rounds = 0

// This is just an estimate of how to leak "some" memory - it's not accurate.
// Running 100 loops should be enough to make the worker reach memory limit and be recycled.
// Use the `rounds` to make sure we don't reach the limit on the first round.
for (const _ of Array(100).fill(0)) {
await pool.run(10_000)

if (pool.threads[0]) {
finalThreadId = pool.threads[0].threadId
}

if (finalThreadId !== originalWorkerId) {
break
}

rounds++
}

// Test setup should not reach max memory on first round
expect(rounds).toBeGreaterThan(1)

// Thread should have been recycled
expect(finalThreadId).not.toBe(originalWorkerId)
})

0 comments on commit 8e39223

Please sign in to comment.