Skip to content

Commit

Permalink
fix: don't clear context.meta on finalize, production needs that
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromegn committed Feb 28, 2018
1 parent 296eac8 commit 1c2ef6d
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 54 deletions.
34 changes: 17 additions & 17 deletions src/bridge/proxy_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,29 @@ export class ProxyStream {
this.stream = base
this.buffered = []
this.bufferedByteLength = 0
this.stream.on("close", ()=> this.ended = true)
this.stream.on("end", ()=> this.ended = true)
this.stream.on("error", ()=> this.ended = true)
this.stream.on("close", () => this.ended = true)
this.stream.on("end", () => this.ended = true)
this.stream.on("error", () => this.ended = true)
}

read(size?: number): Buffer | null{
read(size?: number): Buffer | null {
let chunk = this.stream.read(size)
if(chunk && !this.tainted) this.bufferChunk(chunk)
if (chunk && !this.tainted) this.bufferChunk(chunk)
return chunk
}

bufferChunk(chunk: Buffer) : boolean{
if(this.tainted) return false
bufferChunk(chunk: Buffer): boolean {
if (this.tainted) return false
this.bufferedByteLength += chunk.byteLength
if(this.bufferedByteLength > 10 * 1024 * 1024){
if (this.bufferedByteLength > 10 * 1024 * 1024) {
// no longer buffering, can't be used
this.tainted = true
this.buffered = []
this.bufferedByteLength = 0
}else{
} else {
this.buffered.push(chunk)
}
return !this.tainted
return !this.tainted
}

get ref() {
Expand Down Expand Up @@ -85,25 +85,25 @@ registerBridge("subscribeProxyStream", function (ctx: Context, config: Config, r
})*/
//setImmediate(() => stream.resume())
})
registerBridge("readProxyStream", function(ctx: Context, config: Config, ref: ivm.Reference<ProxyStream>, cb: ivm.Reference<Function>){
ctx.addCallback(cb)
registerBridge("readProxyStream", function (ctx: Context, config: Config, ref: ivm.Reference<ProxyStream>, cb: ivm.Reference<Function>) {
const proxyable = ref.deref({ release: true })
const stream = proxyable.stream

let attempts = 0
const tryRead = function(){
const tryRead = function () {
attempts += 1
let chunk = proxyable.read(1024 * 1024)
let data: ivm.Copy<ArrayBuffer> | null = null
if(chunk){
if (chunk) {
data = transferInto(chunk)
}
if(data || attempts >= 10 || proxyable.ended){
ctx.addCallback(cb)
if (data || attempts >= 10 || proxyable.ended) {
ctx.applyCallback(cb, [null, data, proxyable.tainted])
}else if(attempts >= 10 && !proxyable.ended){
} else if (attempts >= 10 && !proxyable.ended) {
// wait a bit, with a backoff
setTimeout(tryRead, 20 * attempts)
}else{
} else {
ctx.applyCallback(cb, [null, null, proxyable.tainted])
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export class Context extends EventEmitter {

addCallback(fn: ivm.Reference<Function>) {
this.callbacks.push(fn)
log.silly("Added callback", fn)
this.emit("callbackAdded", fn)
}

Expand All @@ -81,6 +82,7 @@ export class Context extends EventEmitter {
}

async _applyCallback(fn: ivm.Reference<Function>, args: any[], opts?: any) {
log.silly("Applying callback", fn, args)
try {
if (this.iso.isDisposed)
return
Expand Down Expand Up @@ -237,12 +239,13 @@ export class Context extends EventEmitter {
}
log.silly("Callbacks present initially, waiting.")
const cbFn = () => {
log.silly("Callback applied handler in finalize.", this.callbacks.length)
if (this.callbacks.length === 0) {
this.removeListener("callbackApplied", cbFn)
resolve()
return
}
log.silly("Callbacks still present, waiting.")
log.silly("Callbacks still present, waiting.", this.callbacks.length)
}
this.on("callbackApplied", cbFn)
})
Expand All @@ -260,7 +263,6 @@ export class Context extends EventEmitter {
// don't really care
}
}
this.meta = {} // reset own meta
this.logMetadata = {} // reset log meta data!
}
}
Expand Down
1 change: 0 additions & 1 deletion src/default_context_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ export class DefaultContextStore implements ContextStore {
try {
t2 = t.start("createContext")
const ctx = await createContext(config, iso, { inspector: !!this.options.inspect })
ctx.meta.iso = iso
t2.end()

ctx.set('app', app.forV8())
Expand Down
69 changes: 35 additions & 34 deletions v8env/fly/streams.js
Original file line number Diff line number Diff line change
@@ -1,64 +1,65 @@
export default function streamsInit(ivm, dispatcher){
import { logger } from '../logger'

export default function streamsInit(ivm, dispatcher) {
return {
refToStream(ref){
refToStream(ref) {
let closed = false
let resumed = false
const r = new ReadableStream({
start(controller) {
dispatcher.dispatch("subscribeProxyStream",
ref,
new ivm.Reference((name, ...args) => {
new ivm.Reference(function (name, ...args) {
logger.debug("GOT EVENT:", name)
if (name === "close" || name === "end") {
if (!closed) {
closed = true
controller.close()
}
} else if (name === "error") {
controller.error(new Error(args[0]))
/*} else if (name === "data") {
controller.enqueue(args[0])
if(controller.desiredSize <= 0 && resumed){
resumed = false
dispatcher.dispatch("controlProxyStream", "pause", ref)
}//*/ //not using events, calling read manually with pull for now
/*} else if (name === "data") {
controller.enqueue(args[0])
if(controller.desiredSize <= 0 && resumed){
resumed = false
dispatcher.dispatch("controlProxyStream", "pause", ref)
}//*/ //not using events, calling read manually with pull for now
} else
logger.debug("unhandled event", name)
})
)
},
pull(controller){
pull(controller) {
//if(r.locked && !resumed){
if(closed){
return Promise.resolve(null)
}
return new Promise((resolve, reject) => {
resumed = true
dispatcher.dispatch("readProxyStream", ref, new ivm.Reference((err, data, tainted) => {
if(err){
controller.error(new Error(err))
reject(err)
return
}
// if data is blank the stream will keep pulling
// readProxyStream tries a few times to minimize bridge calls
controller.enqueue(data)
if(data && r._ref && tainted){
// once underlying ref is tainted, we can't pass it around anymore
// but we can still use it internally
r._ref = undefined
}
resolve()
}))
if (closed) {
return Promise.resolve(null)
}
return new Promise((resolve, reject) => {
dispatcher.dispatch("readProxyStream", ref, new ivm.Reference((err, data, tainted) => {
if (err) {
controller.error(new Error(err))
reject(err)
return
}
// if data is blank the stream will keep pulling
// readProxyStream tries a few times to minimize bridge calls
controller.enqueue(data)
if (data && r._ref && tainted) {
// once underlying ref is tainted, we can't pass it around anymore
// but we can still use it internally
r._ref = undefined
}
resolve()
}))

})
})
//}
},
cancel() {
logger.debug("readable stream was cancelled")
}
})
r._ref = ref
return r
return r
}
}
}

0 comments on commit 1c2ef6d

Please sign in to comment.