Skip to content

Commit

Permalink
fix: tracked down a plugged a memory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromegn committed Mar 16, 2018
1 parent db5e501 commit e860ff1
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 56 deletions.
28 changes: 17 additions & 11 deletions src/bridge/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,20 @@ registerBridge('fetch', function fetchBridge(ctx: Context, bridge: Bridge, urlSt
}
req = httpFn(reqOptions)

req.once("response", handleResponse)

req.on("response", function (res: http.IncomingMessage) {
req.on("error", handleError)

setImmediate(() =>
req.end(body && Buffer.from(body) || null)
)

return cb

function handleResponse(res: http.IncomingMessage) {
log.silly(`Fetch response: ${res.statusCode} ${urlStr} ${JSON.stringify(res.headers)}`)
req.removeListener('response', handleResponse)
req.removeListener('error', handleError)
try {
res.pause()

Expand All @@ -134,21 +145,16 @@ registerBridge('fetch', function fetchBridge(ctx: Context, bridge: Bridge, urlSt
new ProxyStream(res).ref
])


} catch (err) {
log.error("caught error", err)
ctx.tryCallback(cb, [err.toString()])
}
})
}

req.on("error", function (err) {
function handleError(err: Error) {
log.error("error requesting http resource", err)
ctx.tryCallback(cb, [err.toString()])
})

setImmediate(() =>
req.end(body && Buffer.from(body) || null)
)

return cb
req.removeListener('response', handleResponse)
req.removeListener('error', handleError)
}
})
1 change: 1 addition & 0 deletions src/bridge/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const { Syslog } = require('winston-syslog');
let defaultLogger: winston.LoggerInstance;

registerBridge('log', function (ctx: Context, bridge: Bridge, lvl: string, msg: string, meta: any = {}, callback: ivm.Reference<Function>) {
ctx.addReleasable(callback)
ctx.log(lvl, msg, meta, callback)
})

Expand Down
23 changes: 14 additions & 9 deletions src/bridge/proxy_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ export class ProxyStream {
this.stream = base
this.buffered = []
this.bufferedByteLength = 0
this.stream.on("close", () => this.ended = true)
this.stream.on("end", () => this.ended = true)
this.stream.on("error", () => this.ended = true)
this.stream.once("close", () => this.ended = true)
this.stream.once("end", () => this.ended = true)
this.stream.once("error", () => this.ended = true)
}

read(size?: number): Buffer | null {
Expand Down Expand Up @@ -69,15 +69,20 @@ registerBridge("subscribeProxyStream", function (ctx: Context, bridge: Bridge, r
ctx.tryCallback(cb, ["end"])
return
}
stream.on("close", function () {
stream.once("close", streamClose)
stream.once("end", streamEnd)
stream.on("error", streamError)

function streamClose() {
ctx.tryCallback(cb, ["close"])
})
stream.on("end", function () {
stream.removeAllListeners()
}
function streamEnd() {
ctx.tryCallback(cb, ["end"])
})
stream.on("error", function (err: Error) {
}
function streamError(err: Error) {
ctx.tryCallback(cb, ["error", err.toString()])
})
}
})

registerBridge("readProxyStream", function (ctx: Context, bridge: Bridge, ref: ivm.Reference<ProxyStream>, cb: ivm.Reference<Function>) {
Expand Down
21 changes: 7 additions & 14 deletions src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export class Context extends EventEmitter {
}

addCallback(fn: ivm.Reference<Function>) {
this.addReleasable(fn)
this.callbacks.push(fn)
log.silly("Added callback", fn)
this.emit("callbackAdded", fn)
Expand All @@ -84,9 +85,7 @@ export class Context extends EventEmitter {
if (this.iso.isDisposed)
return
for (const arg of args)
if (arg instanceof ivm.Reference)
this.addReleasable(arg)
else if (arg instanceof ivm.ExternalCopy)
if (arg && typeof arg.release === 'function')
this.addReleasable(arg)
try {
return await fn.apply(null, args, opts)
Expand All @@ -96,7 +95,6 @@ export class Context extends EventEmitter {
throw err
}
} finally {
this.addReleasable(fn)
const i = this.callbacks.indexOf(fn)
if (i >= 0) {
this.callbacks.splice(i, 1)
Expand Down Expand Up @@ -177,15 +175,11 @@ export class Context extends EventEmitter {
if (!this.fireEventFn)
this.fireEventFn = await this.global.get("fireEvent") // bypass releasable
for (const arg of args)
if (arg instanceof ivm.Reference)
this.addReleasable(arg)
else if (arg instanceof ivm.ExternalCopy)
if (arg && typeof arg.release === 'function')
this.addReleasable(arg)
log.silly("Firing event", name)
const ret = await this.fireEventFn.apply(null, [name, ...args], opts)
if (ret instanceof ivm.Reference)
this.addReleasable(ret)
else if (ret instanceof ivm.ExternalCopy)
if (ret && typeof ret.release === 'function')
this.addReleasable(ret)
return ret
} catch (err) {
Expand All @@ -203,10 +197,8 @@ export class Context extends EventEmitter {
async set(name: any, value: any): Promise<boolean> {
if (this.iso.isDisposed)
throw new Error("Isolate is disposed or disposing.")
const ret = await this.global.set(name, value)
if (value instanceof ivm.Reference)
this.addReleasable(value)
else if (value instanceof ivm.ExternalCopy)
const ret = this.global.set(name, value)
if (value && typeof value.release === 'function')
this.addReleasable(value)
return ret
}
Expand Down Expand Up @@ -255,6 +247,7 @@ export class Context extends EventEmitter {
try {
rel.release()
} catch (e) {
console.error("RELEASE ERROR:", e.stack)
// don't really care
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ export class Server extends http.Server {
user_agent: request.headers['user-agent']
})

ctx.on("error", contextErrorHandler)
ctx.once("error", contextErrorHandler)

request.headers['x-request-id'] = randomBytes(12).toString('hex')

Expand All @@ -132,6 +132,7 @@ export class Server extends http.Server {
} catch (err) {
log.error("error handling request:", err.stack)
} finally {
ctx.removeListener('error', contextErrorHandler)
this.contextStore.putContext(ctx)
trace.end()
log.debug(trace.report())
Expand Down
12 changes: 7 additions & 5 deletions v8env/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class FetchEvent {
}
}

const emitter = new EventEmitter()
export const emitter = new EventEmitter()

export function addEventListener(name, fn) {
emitter.addListener(name, fn)
Expand All @@ -82,8 +82,10 @@ export function fireEvent(ivm, name, ...args) {
} catch (err) {
logger.debug(err.message, err.stack)
let cb = args[args.length - 1] // should be the last arg
if (cb instanceof ivm.Reference)
if (cb instanceof ivm.Reference) {
cb.apply(undefined, [err.toString()])
cb.release()
}
}
}

Expand All @@ -99,7 +101,7 @@ function fireFetchEvent(ivm, url, req, body, callback) {
global.session = new SessionStore()

let fetchEvent = new FetchEvent('fetch', {
request: new Request(url, Object.assign(req, { body: fly.streams.refToStream(body) }))
request: new Request(url, Object.assign(req, { body: fly.streams.refToStream(body) }))
}, async function (err, res) {
logger.debug("request event callback called", typeof err, typeof res, res instanceof Response)

Expand All @@ -113,9 +115,9 @@ function fireFetchEvent(ivm, url, req, body, callback) {
}

let b = null
if(res.body && res.body._ref){
if (res.body && res.body._ref) {
b = res.body._ref
}else{
} else {
b = transferInto(ivm, await res.arrayBuffer())
}

Expand Down
7 changes: 3 additions & 4 deletions v8env/fly/dispatcher.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import { logger } from '../logger'

export default function dispatcherInit(ivm, dispatch) {
releasables.push(dispatch)
return {
dispatch(name, ...args) {
logger.debug("dispatching", name)
for (const arg of args)
if (arg instanceof ivm.Reference)
global.releasables.push(arg)
else if (arg instanceof ivm.ExternalCopy)
global.releasables.push(arg)
if (arg && typeof arg.release === 'function')
releasables.push(arg)

dispatch.apply(null, [name, ...args])
.then(() => {
Expand Down
16 changes: 11 additions & 5 deletions v8env/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dispatcherInit from './fly/dispatcher'

import { fireEvent, addEventListener, dispatchEvent, FetchEvent } from "./events"
import { fireEvent, addEventListener, dispatchEvent, FetchEvent, emitter } from "./events"
import { Middleware, MiddlewareChain } from "./middleware"
import { FlyBackend } from "./fly-backend"
import { ReadableStream, WritableStream, TransformStream } from 'web-streams-polyfill'
Expand All @@ -10,7 +10,7 @@ import { TextEncoder, TextDecoder } from 'text-encoding'
import consoleInit from './console'
import flyInit from './fly'

import { URL, URLSearchParams } from 'universal-url-lite'
import { URL, URLSearchParams } from 'universal-url-lite'//'whatwg-url'
import Headers from './headers'

import fetchInit from './fetch'
Expand Down Expand Up @@ -40,7 +40,7 @@ global.releasables = []
global.middleware = {}

global.registerMiddleware = function registerMiddleware(type, fn) {
global.middleware[type] = fn
middleware[type] = fn
}

global.bootstrap = function bootstrap() {
Expand All @@ -54,7 +54,6 @@ global.bootstrap = function bootstrap() {
delete global._dispatch

global.fly = flyInit(ivm, dispatcher)
global.releasables.push(global._dispatch)

global.console = consoleInit(ivm, dispatcher)
timersInit(ivm)
Expand Down Expand Up @@ -120,11 +119,18 @@ global.sourceMaps = {}

global.teardown = function teardown() {
let r;
while (r = global.releasables.pop()) {
while (r = releasables.pop()) {
try {
r.release()
} catch (e) {
// fail silently
}
}
emitter.removeAllListeners()
global.sourceMaps = {}
global.teardown = null

// violent
// for (const prop of Object.getOwnPropertyNames(global))
// try { global[prop] = null } catch (e) { }
}
16 changes: 10 additions & 6 deletions v8env/timers.js
Original file line number Diff line number Diff line change
@@ -1,30 +1,34 @@
export default function timersInit(ivm) {
global.setTimeout = (function (st, ivm) {
global.releasables.push(st)
releasables.push(st)
return function (cb, ms) {
return st.apply(null, [new ivm.Reference(cb), ms])
const ref = new ivm.Reference(cb)
releasables.push(ref)
return st.apply(null, [ref, ms])
}
})(global._setTimeout, ivm)
delete global._setTimeout

global.clearTimeout = (function (ct) {
global.releasables.push(ct)
releasables.push(ct)
return function (id) {
return ct.apply(null, [id])
}
})(global._clearTimeout)
delete global._clearTimeout

global.setInterval = (function (si, ivm) {
global.releasables.push(si)
releasables.push(si)
return function (cb, ms) {
return si.apply(null, [new ivm.Reference(cb), ms])
const ref = new ivm.Reference(cb)
releasables.push(ref)
return si.apply(null, [ref, ms])
}
})(global._setInterval, ivm)
delete global._setInterval

global.clearInterval = (function (ci) {
global.releasables.push(ci)
releasables.push(ci)
return function (id) {
return ci.apply(null, [id])
}
Expand Down
1 change: 0 additions & 1 deletion v8env/ts/blob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
* @module fly
* @private
*/
import { TextEncoder } from "text-encoding";

/** @hidden */
export type BlobPart = BufferSource | USVString | Blob
Expand Down

0 comments on commit e860ff1

Please sign in to comment.