From c5744b8870d1239149b0d195e5dfa64caf35a7d0 Mon Sep 17 00:00:00 2001 From: Paul Kinlan Date: Sun, 19 Apr 2020 17:45:54 +0000 Subject: [PATCH 1/7] First working test of stdin of webworker --- Makefile | 6 +- build/post-worker.js | 6 + build/pre.js | 121 +++++++++++++++++ test/output.txt | 54 ++++++++ test/ringbuffer.js | 130 +++++++++++++++++++ test/test.js | 300 ++++++++++++++++++++++++------------------- 6 files changed, 484 insertions(+), 133 deletions(-) create mode 100644 test/output.txt create mode 100644 test/ringbuffer.js diff --git a/Makefile b/Makefile index ec713ed..441b46a 100644 --- a/Makefile +++ b/Makefile @@ -26,8 +26,8 @@ MP4_SHARED_DEPS = \ build/lame/dist/lib/libmp3lame.so \ build/x264/dist/lib/libx264.so -all: webm mp4 -webm: ffmpeg-webm.js ffmpeg-worker-webm.js +all: webm #mp4 +webm: ffmpeg-worker-webm.js #ffmpeg-webm.js mp4: ffmpeg-mp4.js ffmpeg-worker-mp4.js clean: clean-js \ @@ -216,7 +216,7 @@ build/ffmpeg-mp4/ffmpeg.bc: $(MP4_SHARED_DEPS) EMCC_COMMON_ARGS = \ -O3 \ - --closure 1 \ + --closure 0 \ --memory-init-file 0 \ -s WASM=0 \ -s WASM_ASYNC_COMPILATION=0 \ diff --git a/build/post-worker.js b/build/post-worker.js index be8cdcf..4f035a0 100644 --- a/build/post-worker.js +++ b/build/post-worker.js @@ -3,6 +3,8 @@ var __ffmpegjs_running = false; +let stdinRingBuffer; + // Shim for nodejs if (typeof self === "undefined") { self = require("worker_threads")["parentPort"]; @@ -62,6 +64,10 @@ self.onmessage = function(e) { self.postMessage({"type": "done", "data": result}, transfer); __ffmpegjs_running = false; } + } else if (msg["type"] == "init") { + const buffer = msg["stdin"]; + stdinRingBuffer = RingBuffer.from(buffer); + //self.postMessage({"type": "ready"}); // Should really only do this when stdin ready } else { self.postMessage({"type": "error", "data": "unknown command"}); } diff --git a/build/pre.js b/build/pre.js index 01e4fe0..c910dd4 100644 --- a/build/pre.js +++ b/build/pre.js @@ -1,3 +1,115 @@ +class RingBuffer { + /* + Create's a Ring Buffer backed by a correctly sized SAB. + + There can only be one writer and one reader. + */ + static create(length) { + const buffer = new SharedArrayBuffer( + length + Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH + ); + + return new RingBuffer(buffer); + } + + static from(sab) { + return new RingBuffer(sab); + } + + get buffer() { + return this._sab; + } + + get length() { + let readIndex = Atomics.load(this._header, HEADER.READ); + let writeIndex = Atomics.load(this._header, HEADER.WRITE); + + const delta = writeIndex - readIndex + return (readIndex <= writeIndex) ? delta : delta + this._size; + } + + constructor(sab) { + if (!!sab == false) throw new Error("Shared Array Buffer is undefined"); + if (sab instanceof SharedArrayBuffer == false) + throw new Error("Parameter 0 is not a Shared Array Buffer"); + + this._size = + sab.byteLength - Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH; + this._sab = sab; + this._header = new Uint32Array(sab, 0, HEADER_LENGTH); + this._body = new Uint8Array( + sab, + Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH, + this._size + ); + } + + append(data) { + for (const byte of data) { + let writeIndex = Atomics.load(this._header, HEADER.WRITE); + + Atomics.store(this._body, writeIndex, byte); + + writeIndex = Atomics.add(this._header, HEADER.WRITE, 1); + + if (writeIndex == this._size - 1) { + Atomics.store( + this._header, + HEADER.WRITE, + 0 + ); + } + } + } + + // Reads the next byte of data + read() { + let readIndex = Atomics.load(this._header, HEADER.READ); + let writeIndex = Atomics.load(this._header, HEADER.WRITE); + + if (readIndex == writeIndex) return undefined; + + const value = Atomics.load(this._body, readIndex); + + readIndex = Atomics.add(this._header, HEADER.READ, 1); + + if (readIndex == this._size - 1) { + readIndex = Atomics.store(this._header, HEADER.READ, 0); + } + + return value; + } + + *readToHead() { + // Feels odd to have to create a buffer the same size as the buffer. Just iterate. + let data; + while ((data = this.read()) != undefined) { + yield data; + } + } + + clear() { + Atomics.store(this._header, HEADER.READ, 0); + Atomics.store(this._header, HEADER.WRITE, 0); + } + + debug() { + console.log(this._sab); + console.log(this._header); + console.log(this._body); + } +} + +const HEADER = { + READ: 0, + WRITE: 1, + READING: 2, + WRITING: 4 +}; + +const HEADER_LENGTH = Object.keys(HEADER).length; + + var __ffmpegjs_utf8ToStr; function __ffmpegjs(__ffmpegjs_opts) { @@ -41,6 +153,15 @@ function __ffmpegjs(__ffmpegjs_opts) { }; Module["preRun"] = function() { + + FS.init(function() { + if (!!stdinRingBuffer && stdinRingBuffer.length > 0) { + const charRaw = stdinRingBuffer.read(); // only reads on char. + return charRaw; + } + return null; + }); + (__ffmpegjs_opts["mounts"] || []).forEach(function(mount) { var fs = FS.filesystems[mount["type"]]; if (!fs) { diff --git a/test/output.txt b/test/output.txt new file mode 100644 index 0000000..bb10854 --- /dev/null +++ b/test/output.txt @@ -0,0 +1,54 @@ + +> ffmpeg.js@4.2.9000 test /usr/local/google/home/paulkinlan/ffmpeg.js +> mocha test/test.js "-g" "stdin" + + + + WebM + Worker +SharedArrayBuffer { + [Uint8Contents]: <00 00 00 00 36 03 06 00 00 00 00 00 00 00 00 00 1a 45 df a3 01 00 00 00 00 00 00 1f 42 86 81 01 42 f7 81 01 42 f2 81 04 42 f3 81 08 42 82 84 77 65 62 6d 42 87 81 02 42 85 81 02 18 53 80 67 01 00 00 00 00 06 02 ff 11 4d 9b 74 40 2d 4d bb 8b 53 ab 84 15 49 a9 66 53 ac 81 df 4d bb 8c 53 ab 84 16 54 ae ... 10485676 more bytes>, + byteLength: 10485776 +} +stderr ffmpeg version n4.2.2 Copyright (c) 2000-2019 the FFmpeg developers +stderr built with emcc (Emscripten gcc/clang-like replacement) 1.39.13 +stderr configuration: --cc=emcc --ranlib=emranlib --enable-cross-compile --target-os=none --arch=x86 --disable-runtime-cpudetect --disable-asm --disable-fast-unaligned --disable-pthreads --disable-w32threads --disable-os2threads --disable-debug --disable-stripping --disable-safe-bitstream-reader --disable-all --enable-ffmpeg --enable-avcodec --enable-avformat --enable-avfilter --enable-swresample --enable-swscale --disable-network --disable-d3d11va --disable-dxva2 --disable-vaapi --disable-vdpau --enable-decoder=vp8 --enable-decoder=h264 --enable-decoder=vorbis --enable-decoder=opus --enable-decoder=mp3 --enable-decoder=aac --enable-decoder=pcm_s16le --enable-demuxer=matroska --enable-demuxer=ogg --enable-demuxer=mov --enable-demuxer=mp3 --enable-demuxer=wav --enable-demuxer=concat --enable-protocol=file --enable-filter=aresample --enable-filter=scale --enable-filter=crop --enable-filter=overlay --enable-filter=hstack --enable-filter=vstack --disable-bzlib --disable-iconv --disable-libxcb --disable-lzma --disable-sdl2 --disable-securetransport --disable-xlib --disable-zlib --enable-encoder=libvpx_vp8 --enable-encoder=libopus --enable-muxer=webm --enable-muxer=ogg --enable-muxer=null --enable-libopus --enable-libvpx --extra-cflags=-I../libvpx/dist/include --extra-ldflags=-L../libvpx/dist/lib +stderr libavutil 56. 31.100 / 56. 31.100 +stderr libavcodec 58. 54.100 / 58. 54.100 +stderr libavformat 58. 29.100 / 58. 29.100 +stderr libavfilter 7. 57.100 / 7. 57.100 +stderr libswscale 5. 5.100 / 5. 5.100 +stderr libswresample 3. 5.100 / 3. 5.100 +stderr [vp8 @ 0x6ac758] Warning: not compiled with thread support, using thread emulation +stderr [vorbis @ 0x6af528] Warning: not compiled with thread support, using thread emulation +stderr Input #0, matroska,webm, from '/dev/stdin': +stderr Metadata: +stderr encoder : Lavf56.40.100 +stderr Duration: 00:00:03.95, start: 0.000000, bitrate: N/A +stderr Stream #0:0: Video: vp8, yuv420p, 854x480, SAR 1:1 DAR 427:240, 24 fps, 24 tbr, 1k tbn, 1k tbc (default) +stderr Stream #0:1: Audio: vorbis, 48000 Hz, stereo, fltp (default) +stderr Stream #0:2(eng): Subtitle: webvtt +stderr [vp8 @ 0x6b32e0] Warning: not compiled with thread support, using thread emulation +stderr Stream mapping: +stderr Stream #0:0 -> #0:0 (vp8 (native) -> vp8 (libvpx)) +stderr [libvpx @ 0x6c0548] Warning: not compiled with thread support, using thread emulation +stderr [libvpx @ 0x6c0548] v1.8.2 +stderr Output #0, webm, to 'out.webm': +stderr Metadata: +stderr encoder : Lavf58.29.100 +stderr Stream #0:0: Video: vp8 (libvpx), yuv420p, 854x480 [SAR 1:1 DAR 427:240], q=-1--1, 200 kb/s, 24 fps, 1k tbn, 24 tbc (default) +stderr Metadata: +stderr encoder : Lavc58.54.100 libvpx +stderr Side data: +stderr cpb: bitrate max/min/avg: 0/0/0 buffer size: 0 vbv_delay: -1 +stderr frame= 1 fps=0.3 q=0.0 size= 1kB time=00:00:00.00 bitrate=4760.0kbits/s speed=0.000349x +stderr frame= 3 fps=0.8 q=0.0 size= 1kB time=00:00:00.08 bitrate= 56.7kbits/s speed=0.0217x +stderr frame= 4 fps=0.9 q=0.0 size= 1kB time=00:00:00.12 bitrate= 37.8kbits/s speed=0.0273x +stderr frame= 5 fps=0.9 q=0.0 size= 1kB time=00:00:00.16 bitrate= 28.3kbits/s speed=0.0314x +stderr frame= 5 fps=0.8 q=0.0 Lsize= 37kB time=00:00:00.16 bitrate=1786.0kbits/s speed=0.0277x +stderr video:36kB audio:0kB subtitle:0kB other streams:0kB global headers:0kB muxing overhead: 1.799479% + ✓ should encode test file to WebM/VP8 with stdin (6755ms) + + + 1 passing (7s) + diff --git a/test/ringbuffer.js b/test/ringbuffer.js new file mode 100644 index 0000000..df39e24 --- /dev/null +++ b/test/ringbuffer.js @@ -0,0 +1,130 @@ +/* + Copyright 2020 Google Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +class RingBuffer { + /* + Create's a Ring Buffer backed by a correctly sized SAB. + + There can only be one writer and one reader. + */ + static create(length) { + const buffer = new SharedArrayBuffer( + length + Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH + ); + return new RingBuffer(buffer); + } + + static from(sab) { + return new RingBuffer(sab); + } + + get buffer() { + return this._sab; + } + + get length() { + const readIndex = Atomics.load(this._header, HEADER.READ); + const writeIndex = Atomics.load(this._header, HEADER.WRITE); + + const delta = writeIndex - readIndex; + return (readIndex <= writeIndex) ? delta : delta + this._size; + } + + constructor(sab) { + if (!!sab == false) throw new Error("Shared Array Buffer is undefined"); + if (sab instanceof SharedArrayBuffer == false) + throw new Error("Parameter 0 is not a Shared Array Buffer"); + + this._size = + sab.byteLength - Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH; + this._sab = sab; + this._header = new Uint32Array(sab, 0, HEADER_LENGTH); + this._body = new Uint8Array( + sab, + Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH, + this._size + ); + } + + append(data) { + for (const byte of data) { + let writeIndex = Atomics.load(this._header, HEADER.WRITE); + + Atomics.store(this._body, writeIndex, byte); + + writeIndex = Atomics.add(this._header, HEADER.WRITE, 1); + + if (writeIndex == this._size - 1) { + Atomics.store( + this._header, + HEADER.WRITE, + 0 + ); + } + } + } + + // Reads the next byte of data + read() { + let readIndex = Atomics.load(this._header, HEADER.READ); + let writeIndex = Atomics.load(this._header, HEADER.WRITE); + + if (readIndex == writeIndex) return undefined; + + const value = Atomics.load(this._body, readIndex); + + readIndex = Atomics.add(this._header, HEADER.READ, 1); + + if (readIndex == this._size - 1) { + readIndex = Atomics.store(this._header, HEADER.READ, 0); + } + + return value; + } + + *readToHead() { + // Feels odd to have to create a buffer the same size as the buffer. Just iterate. + let data; + while ((data = this.read()) != undefined) { + yield data; + } + } + + clear() { + Atomics.store(this._header, HEADER.READ, 0); + Atomics.store(this._header, HEADER.WRITE, 0); + } + + debug() { + console.log(this._sab); + console.log(this._header); + console.log(this._body); + } + } + + const HEADER = { + READ: 0, + WRITE: 1, + READING: 2, + WRITING: 4, + }; + + const HEADER_LENGTH = Object.keys(HEADER).length; + + if (typeof window === 'undefined') { + module.exports = RingBuffer; + } + \ No newline at end of file diff --git a/test/test.js b/test/test.js index 77180f6..3c7dffe 100644 --- a/test/test.js +++ b/test/test.js @@ -4,57 +4,58 @@ var path = require("path"); var Worker = require("worker_threads").Worker; var ffmpeg_webm = require("../ffmpeg-webm"); var ffmpeg_mp4 = require("../ffmpeg-mp4"); +var RingBuffer = require("./ringbuffer"); -function noop() {} +function noop() { } var testDataPath = path.join(__dirname, "test.webm"); var testData = new Uint8Array(fs.readFileSync(testDataPath)); -describe("WebM", function() { +describe("WebM", function () { this.timeout(20000); - describe("Sync", function() { - it("should print version to stdout", function() { + describe("Sync", function () { + it("should print version to stdout", function () { var code; var stdout = ""; var stderr = ""; ffmpeg_webm({ arguments: ["-version"], - print: function(data) { stdout += data + "\n"; }, - printErr: function(data) { stderr += data + "\n"; }, - onExit: function(v) {code = v}, + print: function (data) { stdout += data + "\n"; }, + printErr: function (data) { stderr += data + "\n"; }, + onExit: function (v) { code = v }, }); expect(code).to.equal(0); expect(stderr).to.be.empty; expect(stdout).to.match(/^ffmpeg version /); }); - it("shouldn't return input files at MEMFS", function() { + it("shouldn't return input files at MEMFS", function () { var res = ffmpeg_webm({ arguments: [], print: noop, printErr: noop, MEMFS: [ - {name: "test.mkv", data: new Uint8Array(1)}, - {name: "222.webm", data: new Uint8Array(10)}, + { name: "test.mkv", data: new Uint8Array(1) }, + { name: "222.webm", data: new Uint8Array(10) }, ], }); expect(res.MEMFS).to.be.empty; }); - it("should show metadata of test file at NODEFS", function() { + it("should show metadata of test file at NODEFS", function () { var stderr = ""; ffmpeg_webm({ arguments: ["-i", "/data/test.webm"], print: noop, - printErr: function(data) { stderr += data + "\n"; }, - mounts: [{type: "NODEFS", opts: {root: "test"}, mountpoint: "/data"}], + printErr: function (data) { stderr += data + "\n"; }, + mounts: [{ type: "NODEFS", opts: { root: "test" }, mountpoint: "/data" }], }); expect(stderr).to.match(/^Input.*matroska,webm/m); expect(stderr).to.match(/^\s+Stream.*Video: vp8/m); expect(stderr).to.match(/^\s+Stream.*Audio: vorbis/m); }); - it("should encode test file to WebM/VP8 at MEMFS", function() { + it("should encode test file to WebM/VP8 at MEMFS", function () { var code; var res = ffmpeg_webm({ arguments: [ @@ -66,8 +67,8 @@ describe("WebM", function() { stdin: noop, print: noop, printErr: noop, - onExit: function(v) {code = v}, - MEMFS: [{name: "test.webm", data: testData}], + onExit: function (v) { code = v }, + MEMFS: [{ name: "test.webm", data: testData }], }); expect(code).to.equal(0); expect(res.MEMFS).to.have.length(1); @@ -77,7 +78,7 @@ describe("WebM", function() { expect(file.data).to.be.an.instanceof(Uint8Array); }); - it("should encode test file to WebM/Opus at MEMFS", function() { + it("should encode test file to WebM/Opus at MEMFS", function () { var code; var res = ffmpeg_webm({ arguments: [ @@ -89,8 +90,8 @@ describe("WebM", function() { stdin: noop, print: noop, printErr: noop, - onExit: function(v) {code = v}, - MEMFS: [{name: "test.webm", data: testData}], + onExit: function (v) { code = v }, + MEMFS: [{ name: "test.webm", data: testData }], }); expect(code).to.equal(0); expect(res.MEMFS).to.have.length(1); @@ -100,7 +101,7 @@ describe("WebM", function() { expect(file.data).to.be.an.instanceof(Uint8Array); }); - it("should accept ArrayBuffer in MEMFS input", function() { + it("should accept ArrayBuffer in MEMFS input", function () { var code; ffmpeg_webm({ arguments: [ @@ -112,13 +113,13 @@ describe("WebM", function() { stdin: noop, print: noop, printErr: noop, - onExit: function(v) {code = v}, - MEMFS: [{name: "test.webm", data: testData.buffer}], + onExit: function (v) { code = v }, + MEMFS: [{ name: "test.webm", data: testData.buffer }], }); expect(code).to.equal(0); }); - it("should accept Array in MEMFS input", function() { + it("should accept Array in MEMFS input", function () { var data = Array.prototype.slice.call(testData); var code; ffmpeg_webm({ @@ -131,13 +132,13 @@ describe("WebM", function() { stdin: noop, print: noop, printErr: noop, - onExit: function(v) {code = v}, - MEMFS: [{name: "test.webm", data: data}], + onExit: function (v) { code = v }, + MEMFS: [{ name: "test.webm", data: data }], }); expect(code).to.equal(0); }); - it("should accept Uint16Array in MEMFS input", function() { + it("should accept Uint16Array in MEMFS input", function () { var data = new Uint16Array(testData.buffer); var code; ffmpeg_webm({ @@ -150,13 +151,13 @@ describe("WebM", function() { stdin: noop, print: noop, printErr: noop, - onExit: function(v) {code = v}, - MEMFS: [{name: "test.webm", data: data}], + onExit: function (v) { code = v }, + MEMFS: [{ name: "test.webm", data: data }], }); expect(code).to.equal(0); }); - it("should work with crazy output name", function() { + it("should work with crazy output name", function () { var code; var res = ffmpeg_webm({ arguments: [ @@ -168,8 +169,8 @@ describe("WebM", function() { stdin: noop, print: noop, printErr: noop, - onExit: function(v) {code = v}, - MEMFS: [{name: "test.webm", data: testData}], + onExit: function (v) { code = v }, + MEMFS: [{ name: "test.webm", data: testData }], }); expect(code).to.equal(0); expect(res.MEMFS).to.have.length(1); @@ -179,7 +180,7 @@ describe("WebM", function() { expect(file.data).to.be.an.instanceof(Uint8Array); }); - it("should work with other crazy output name", function() { + it("should work with other crazy output name", function () { var res = ffmpeg_webm({ arguments: [ "-i", "test.webm", @@ -190,14 +191,14 @@ describe("WebM", function() { stdin: noop, print: noop, printErr: noop, - MEMFS: [{name: "test.webm", data: testData}], + MEMFS: [{ name: "test.webm", data: testData }], }); expect(res.MEMFS).to.have.length(1); expect(res.MEMFS[0].name).to.equal("__proto__"); expect(res.MEMFS[0].data.length).to.be.above(0); }); - it("should return empty array for empty output", function() { + it("should return empty array for empty output", function () { var res = ffmpeg_webm({ arguments: [ "-i", "test.webm", @@ -207,7 +208,7 @@ describe("WebM", function() { stdin: noop, print: noop, printErr: noop, - MEMFS: [{name: "test.webm", data: testData}], + MEMFS: [{ name: "test.webm", data: testData }], }); expect(res.MEMFS).to.have.length(1); expect(res.MEMFS[0].name).to.equal("out.webm"); @@ -232,7 +233,7 @@ describe("WebM", function() { expect(code).to.equal(0); });*/ - it("should have Ogg muxer", function() { + it("should have Ogg muxer", function () { var res = ffmpeg_webm({ arguments: [ "-i", "test.webm", @@ -243,7 +244,7 @@ describe("WebM", function() { stdin: noop, print: noop, printErr: noop, - MEMFS: [{name: "test.webm", data: testData}], + MEMFS: [{ name: "test.webm", data: testData }], }); expect(res.MEMFS).to.have.length(1); var file = res.MEMFS[0]; @@ -272,86 +273,125 @@ describe("WebM", function() { });*/ }); - describe("Worker", function() { - it("should print version to stdout", function(done) { + describe("Worker", function () { + it("should print version to stdout", function (done) { var stdout = ""; var stderr = ""; var worker = new Worker("./ffmpeg-worker-webm.js"); worker.on("error", done); - worker.on("message", function(msg) { + worker.on("message", function (msg) { switch (msg.type) { - case "ready": - worker.postMessage({type: "run", arguments: ["-version"]}); - break; - case "stdout": - stdout += msg.data + "\n"; - break; - case "stderr": - stderr += msg.data + "\n"; - break; - case "exit": - worker.terminate(); - expect(stderr).to.be.empty; - expect(msg.data).to.equal(0); - expect(stdout).to.match(/^ffmpeg version /); - done(); - break; + case "ready": + worker.postMessage({ type: "run", arguments: ["-version"] }); + break; + case "stdout": + stdout += msg.data + "\n"; + break; + case "stderr": + stderr += msg.data + "\n"; + break; + case "exit": + worker.terminate(); + expect(stderr).to.be.empty; + expect(msg.data).to.equal(0); + expect(stdout).to.match(/^ffmpeg version /); + done(); + break; } }); }); - it("should encode test file to WebM/VP8 at MEMFS", function(done) { + it("should encode test file to WebM/VP8 at MEMFS", function (done) { var worker = new Worker("./ffmpeg-worker-webm.js"); worker.onerror = done; worker.on("error", done); - worker.on("message", function(msg) { + worker.on("message", function (msg) { switch (msg.type) { - case "ready": - worker.postMessage({ - type: "run", - arguments: [ - "-i", "test.webm", - "-frames:v", "5", "-c:v", "libvpx", - "-an", - "out.webm", - ], - MEMFS: [{name: "test.webm", data: testData}], - }); - break; - case "done": - worker.terminate(); - var mem = msg.data.MEMFS; - expect(mem).to.have.length(1); - expect(mem[0].name).to.equal("out.webm"); - expect(mem[0].data.length).to.be.above(0); - done(); - break; + case "ready": + worker.postMessage({ + type: "run", + arguments: [ + "-i", "test.webm", + "-frames:v", "5", "-c:v", "libvpx", + "-an", + "out.webm", + ], + MEMFS: [{ name: "test.webm", data: testData }], + }); + break; + case "done": + worker.terminate(); + var mem = msg.data.MEMFS; + expect(mem).to.have.length(1); + expect(mem[0].name).to.equal("out.webm"); + expect(mem[0].data.length).to.be.above(0); + done(); + break; + } + }); + }); + + it("should encode test file to WebM/VP8 with stdin", function (done) { + const stdinBuffer = RingBuffer.create(10 * 1024 * 1024); + stdinBuffer.append(testData); + + var worker = new Worker("./ffmpeg-worker-webm.js"); + worker.postMessage({ + type: "init", + stdin: stdinBuffer.buffer + }); // Init the worker with the stdin buffer. + worker.onerror = done; + worker.on("error", done); + worker.on("message", function (msg) { + switch (msg.type) { + case "ready": + worker.postMessage({ + type: "run", + arguments: [ + "-f", "webm", + "-i", "/dev/stdin", + "-vcodec", "webm", + "-frames:v", "5", "-c:v", "libvpx", + "-an", + "out.webm", + ], + MEMFS: [{ name: "test.webm", data: testData }], + }); + break; + case "done": + worker.terminate(); + var mem = msg.data.MEMFS; + expect(mem).to.have.length(1); + expect(mem[0].name).to.equal("out.webm"); + expect(mem[0].data.length).to.be.above(0); + done(); + break; } }); }); }); }); -describe("MP4", function() { +describe("MP4", function () { this.timeout(20000); - describe("Sync", function() { - it("should print version to stdout", function() { + describe("Sync", function () { + it("should print version to stdout", function () { var code; var stdout = ""; var stderr = ""; ffmpeg_mp4({ arguments: ["-version"], - print: function(data) { stdout += data + "\n"; }, - printErr: function(data) { stderr += data + "\n"; }, - onExit: function(v) {code = v}, + print: function (data) { stdout += data + "\n"; }, + printErr: function (data) { stderr += data + "\n"; }, + onExit: function (v) { code = v }, }); expect(code).to.equal(0); expect(stderr).to.be.empty; expect(stdout).to.match(/^ffmpeg version /); }); - it("should encode test file to MP4/H.264/MP3 at MEMFS", function() { + it("should encode test file to MP4/H.264/MP3 at MEMFS", function () { var code; var res = ffmpeg_mp4({ arguments: [ @@ -363,8 +403,8 @@ describe("MP4", function() { stdin: noop, print: noop, printErr: noop, - onExit: function(v) {code = v}, - MEMFS: [{name: "test.webm", data: testData}], + onExit: function (v) { code = v }, + MEMFS: [{ name: "test.webm", data: testData }], }); expect(code).to.equal(0); expect(res.MEMFS).to.have.length(1); @@ -374,7 +414,7 @@ describe("MP4", function() { expect(file.data).to.be.an.instanceof(Uint8Array); }); - it("should encode test file to MP4/AAC at MEMFS", function() { + it("should encode test file to MP4/AAC at MEMFS", function () { var res = ffmpeg_mp4({ arguments: [ "-i", "test.webm", @@ -385,7 +425,7 @@ describe("MP4", function() { stdin: noop, print: noop, printErr: noop, - MEMFS: [{name: "test.webm", data: testData}], + MEMFS: [{ name: "test.webm", data: testData }], }); expect(res.MEMFS).to.have.length(1); var file = res.MEMFS[0]; @@ -394,60 +434,60 @@ describe("MP4", function() { }); }); - describe("Worker", function() { - it("should print version to stdout", function(done) { + describe("Worker", function () { + it("should print version to stdout", function (done) { var stdout = ""; var stderr = ""; var worker = new Worker("./ffmpeg-worker-mp4.js"); worker.on("error", done); - worker.on("message", function(msg) { + worker.on("message", function (msg) { switch (msg.type) { - case "ready": - worker.postMessage({type: "run", arguments: ["-version"]}); - break; - case "stdout": - stdout += msg.data + "\n"; - break; - case "stderr": - stderr += msg.data + "\n"; - break; - case "exit": - worker.terminate(); - expect(stderr).to.be.empty; - expect(msg.data).to.equal(0); - expect(stdout).to.match(/^ffmpeg version /); - done(); - break; + case "ready": + worker.postMessage({ type: "run", arguments: ["-version"] }); + break; + case "stdout": + stdout += msg.data + "\n"; + break; + case "stderr": + stderr += msg.data + "\n"; + break; + case "exit": + worker.terminate(); + expect(stderr).to.be.empty; + expect(msg.data).to.equal(0); + expect(stdout).to.match(/^ffmpeg version /); + done(); + break; } }); }); - it("should encode test file to MP4/H.264 at MEMFS", function(done) { + it("should encode test file to MP4/H.264 at MEMFS", function (done) { var worker = new Worker("./ffmpeg-worker-mp4.js"); worker.onerror = done; worker.on("error", done); - worker.on("message", function(msg) { + worker.on("message", function (msg) { switch (msg.type) { - case "ready": - worker.postMessage({ - type: "run", - arguments: [ - "-i", "test.webm", - "-frames:v", "5", "-c:v", "libx264", - "-an", - "out.mp4", - ], - MEMFS: [{name: "test.webm", data: testData}], - }); - break; - case "done": - worker.terminate(); - var mem = msg.data.MEMFS; - expect(mem).to.have.length(1); - expect(mem[0].name).to.equal("out.mp4"); - expect(mem[0].data.length).to.be.above(0); - done(); - break; + case "ready": + worker.postMessage({ + type: "run", + arguments: [ + "-i", "test.webm", + "-frames:v", "5", "-c:v", "libx264", + "-an", + "out.mp4", + ], + MEMFS: [{ name: "test.webm", data: testData }], + }); + break; + case "done": + worker.terminate(); + var mem = msg.data.MEMFS; + expect(mem).to.have.length(1); + expect(mem[0].name).to.equal("out.mp4"); + expect(mem[0].data.length).to.be.above(0); + done(); + break; } }); }); From db3db68ff0776da4d41b7ca40dae94d881ea2722 Mon Sep 17 00:00:00 2001 From: Kagami Hiiragi Date: Sun, 19 Apr 2020 20:59:13 +0300 Subject: [PATCH 2/7] Clarify readme a bit Fixes #46 Fixes #51 --- README.md | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 419fce3..c75cfd6 100644 --- a/README.md +++ b/README.md @@ -64,8 +64,6 @@ You can send the following messages to the worker: * `{type: "run", ...opts}` - Start new job with provided options. ```js -var stdout = ""; -var stderr = ""; var worker = new Worker("ffmpeg-worker-webm.js"); worker.onmessage = function(e) { var msg = e.data; @@ -74,15 +72,13 @@ worker.onmessage = function(e) { worker.postMessage({type: "run", arguments: ["-version"]}); break; case "stdout": - stdout += msg.data + "\n"; + console.log(msg.data); break; case "stderr": - stderr += msg.data + "\n"; + console.log(msg.data); break; - case "exit": - console.log("Process exited with code " + msg.data); - console.log(stdout); - worker.terminate(); + case "done": + console.log(msg.data); break; } }; @@ -106,8 +102,6 @@ var testData = new Uint8Array(fs.readFileSync("test.webm")); var result = ffmpeg({ MEMFS: [{name: "test.webm", data: testData}], arguments: ["-i", "test.webm", "-c:v", "libvpx", "-an", "out.webm"], - // Ignore stdin read requests. - stdin: function() {}, }); // Write out.webm to disk. var out = result.MEMFS[0]; @@ -127,7 +121,6 @@ ffmpeg({ // Mount /data inside application to the current directory. mounts: [{type: "NODEFS", opts: {root: "."}, mountpoint: "/data"}], arguments: ["-i", "/data/test.webm", "-c:v", "libvpx", "-an", "/data/out.webm"], - stdin: function() {}, }); // out.webm was written to the current directory. ``` From 50383e6f22d018e279581f0a3b04d878a40f1f54 Mon Sep 17 00:00:00 2001 From: Kagami Hiiragi Date: Sun, 19 Apr 2020 21:47:44 +0300 Subject: [PATCH 3/7] 4.2.9001 --- package-lock.json | 2 +- package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/package-lock.json b/package-lock.json index a57fb3a..d9d6126 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "ffmpeg.js", - "version": "4.2.9000", + "version": "4.2.9001", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index fb65be5..cf59503 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ffmpeg.js", - "version": "4.2.9000", + "version": "4.2.9001", "description": "Port of FFmpeg with Emscripten", "main": "ffmpeg-webm.js", "scripts": { From 1e88d31df2c274f4fb51b5c3e1f746a1a4866726 Mon Sep 17 00:00:00 2001 From: Paul Kinlan Date: Sun, 19 Apr 2020 19:25:44 +0000 Subject: [PATCH 4/7] Reverting the Makefile back - didn't need to change this :) --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 441b46a..1febddf 100644 --- a/Makefile +++ b/Makefile @@ -26,8 +26,8 @@ MP4_SHARED_DEPS = \ build/lame/dist/lib/libmp3lame.so \ build/x264/dist/lib/libx264.so -all: webm #mp4 -webm: ffmpeg-worker-webm.js #ffmpeg-webm.js +all: webm mp4 +webm: ffmpeg-worker-webm.js ffmpeg-webm.js mp4: ffmpeg-mp4.js ffmpeg-worker-mp4.js clean: clean-js \ From 3b277903538e63efc9b67bb7cf2b5cd1cbe43551 Mon Sep 17 00:00:00 2001 From: Paul Kinlan Date: Sun, 19 Apr 2020 19:49:21 +0000 Subject: [PATCH 5/7] Creating a simple build script because I keep forgetting the cmds --- build-with-docker.sh | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 build-with-docker.sh diff --git a/build-with-docker.sh b/build-with-docker.sh new file mode 100644 index 0000000..1c6e3fe --- /dev/null +++ b/build-with-docker.sh @@ -0,0 +1,3 @@ +# /usr/bin/sh + +echo "cp -a /mnt/{.git,build,Makefile} . && source /root/emsdk/emsdk_env.sh && make && cp ffmpeg*js /mnt" | docker run --rm -i -v `pwd`:/mnt -w /opt kagamihi/ffmpeg.js From 69fb64de0be545338a81456d4518617e6d202a50 Mon Sep 17 00:00:00 2001 From: Paul Kinlan Date: Mon, 20 Apr 2020 13:05:37 +0000 Subject: [PATCH 6/7] Adding Blocking read + Added a test (which only works when debugging :\) + RingBuffer now blocks on read + stdin handler uses blocking Read --- build-with-docker.sh | 10 +- build/pre.js | 169 +++++++++++++++++------- package.json | 3 +- test/output.txt | 151 +++++++++++++++------- test/ringbuffer.js | 296 +++++++++++++++++++++++++++---------------- test/test.js | 87 ++++++++++++- 6 files changed, 515 insertions(+), 201 deletions(-) diff --git a/build-with-docker.sh b/build-with-docker.sh index 1c6e3fe..ba17a41 100644 --- a/build-with-docker.sh +++ b/build-with-docker.sh @@ -1,3 +1,11 @@ # /usr/bin/sh -echo "cp -a /mnt/{.git,build,Makefile} . && source /root/emsdk/emsdk_env.sh && make && cp ffmpeg*js /mnt" | docker run --rm -i -v `pwd`:/mnt -w /opt kagamihi/ffmpeg.js +if [ $# -eq 0 ] +then + TARGET="all" +else + TARGET=$1 +fi + +# Builds in-place. +echo "source /root/emsdk/emsdk_env.sh && make $TARGET" | docker run --rm -i -v "$(pwd):/mnt" -w /mnt kagamihi/ffmpeg.js diff --git a/build/pre.js b/build/pre.js index f9e4a49..3b4af28 100644 --- a/build/pre.js +++ b/build/pre.js @@ -1,31 +1,53 @@ class RingBuffer { - /* - Create's a Ring Buffer backed by a correctly sized SAB. - - There can only be one writer and one reader. - */ - static create(length) { - const buffer = new SharedArrayBuffer( - length + Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH - ); - - return new RingBuffer(buffer); - } - static from(sab) { return new RingBuffer(sab); } - + get buffer() { return this._sab; } - + + get remaining() { + return this._size - this.length; + } + + get size() { + return this._size; + } + get length() { let readIndex = Atomics.load(this._header, HEADER.READ); let writeIndex = Atomics.load(this._header, HEADER.WRITE); + + const delta = writeIndex - readIndex; + return readIndex <= writeIndex ? delta : delta + this._size; + } + + get eof() { + return (this.length === 0 && Atomics.load(this._state, READER_STATE.EOF) === 0) ? true : false; + } + + set eof(val) { + let eofVal = !!val ? 0 : 1; + if (this.length === 0 && val) { + Atomics.notify(this._state, READER_STATE.DATA_AVAILABLE); + } + Atomics.store(this._state, READER_STATE.EOF, eofVal); + } + + /* + Create's a Ring Buffer backed by a correctly sized SAB. + + There can only be one writer and one reader. + */ + static create(length) { + const buffer = new SharedArrayBuffer( + length + + Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH + + Int32Array.BYTES_PER_ELEMENT * READER_STATE_LENGTH + ); - const delta = writeIndex - readIndex - return (readIndex <= writeIndex) ? delta : delta + this._size; + return new RingBuffer(buffer); } constructor(sab) { @@ -33,52 +55,104 @@ class RingBuffer { if (sab instanceof SharedArrayBuffer == false) throw new Error("Parameter 0 is not a Shared Array Buffer"); - this._size = - sab.byteLength - Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH; + this._size = sab.byteLength + - Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH + - Int32Array.BYTES_PER_ELEMENT * READER_STATE_LENGTH; this._sab = sab; this._header = new Uint32Array(sab, 0, HEADER_LENGTH); + this._state = new Int32Array(sab, Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH, READER_STATE_LENGTH) this._body = new Uint8Array( sab, - Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH, + Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH + + Int32Array.BYTES_PER_ELEMENT * READER_STATE_LENGTH, this._size ); + + Atomics.store(this._state, READER_STATE.DATA_AVAILABLE, 0); + Atomics.store(this._state, READER_STATE.EOF, 1); } - append(data) { - for (const byte of data) { - let writeIndex = Atomics.load(this._header, HEADER.WRITE); - - Atomics.store(this._body, writeIndex, byte); + /* + data: An array of Uint8 + attemptToFill (deafault: false): if true, will fill as much of the array as possible + returning the items that couldn't be added. + + */ + append(data, attemptToFill = false) { + const { remaining, length, size } = this; - writeIndex = Atomics.add(this._header, HEADER.WRITE, 1); + if (data.length > remaining && attemptToFill == false) { + throw new Error("Data being appeneded will overflow the buffer"); + } - if (writeIndex == this._size - 1) { - Atomics.store( - this._header, - HEADER.WRITE, - 0 - ); - } + if (data instanceof Array == false && data instanceof Uint8Array == false) { + throw new Error( + "data is not an array that can be converted to Uint8array" + ); } + + let readIndex = Atomics.load(this._header, HEADER.READ); + let writeIndex = Atomics.load(this._header, HEADER.WRITE); + let writeStart = writeIndex % size; + + // We need at most two write operations. + // If the data will go past the end of the buffer, we need + // to write a 2nd batch from the start of the buffer. + // 9, 15 + // batch1, pos [9] = val [0] + // batch2, pos [0] = val [1,2,3,4,5] + + const batch1 = data.slice(0, size - writeStart); + this._body.set(batch1, writeStart); + let writeLength = batch1.length; + + if (writeLength < data.length) { + // We are wrapping around because there was more data. + const batch2 = data.slice(writeLength, remaining - writeLength); + this._body.set(batch2, 0); + writeLength += batch2.length; + + Atomics.add(this._header, HEADER.WRITE, writeLength); + + if (attemptToFill && writeLength < data.length) { + return data.slice(writeLength); + } + } + else { + Atomics.add(this._header, HEADER.WRITE, writeLength); + } + + Atomics.store(this._state, READER_STATE.DATA_AVAILABLE, 1); + Atomics.notify(this._state, READER_STATE.DATA_AVAILABLE); } - // Reads the next byte of data + // Reads the next byte of data. Note: Assuming 4GB of addressable buffer. read() { let readIndex = Atomics.load(this._header, HEADER.READ); let writeIndex = Atomics.load(this._header, HEADER.WRITE); + + if (readIndex == writeIndex - 1) { + // The next blocking read, should wait. + Atomics.store(this._state, READER_STATE.DATA_AVAILABLE, 0); + } - if (readIndex == writeIndex) return undefined; + if (readIndex == writeIndex) { + return undefined; + } - const value = Atomics.load(this._body, readIndex); + const value = Atomics.load(this._body, readIndex % this._size); readIndex = Atomics.add(this._header, HEADER.READ, 1); - if (readIndex == this._size - 1) { - readIndex = Atomics.store(this._header, HEADER.READ, 0); - } - return value; } + + blockingRead() { + if (this.eof) return undefined; + + Atomics.wait(this._state, READER_STATE.DATA_AVAILABLE, 0); + return this.read(); + } *readToHead() { // Feels odd to have to create a buffer the same size as the buffer. Just iterate. @@ -101,14 +175,19 @@ class RingBuffer { } const HEADER = { - READ: 0, - WRITE: 1, - READING: 2, - WRITING: 4 + READ: 0, // 4GB buffer + WRITE: 1, // 4GB buffer }; const HEADER_LENGTH = Object.keys(HEADER).length; +const READER_STATE = { + DATA_AVAILABLE: 0, + WAITING: 1, + EOF: 2 +}; + +const READER_STATE_LENGTH = Object.keys(READER_STATE).length; var __ffmpegjs_utf8ToStr; @@ -179,7 +258,7 @@ function __ffmpegjs(__ffmpegjs_opts) { FS.init(function() { if (!!stdinRingBuffer && stdinRingBuffer.length > 0) { - const charRaw = stdinRingBuffer.read(); // only reads on char. + const charRaw = stdinRingBuffer.blockingRead(); // only reads on char. return charRaw; } return null; diff --git a/package.json b/package.json index cf59503..3810848 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,8 @@ "description": "Port of FFmpeg with Emscripten", "main": "ffmpeg-webm.js", "scripts": { - "test": "mocha test/test.js" + "test": "mocha test/test.js", + "test:debug": "mocha --inspect-brk test/test.js" }, "repository": { "type": "git", diff --git a/test/output.txt b/test/output.txt index bb10854..07d3dc4 100644 --- a/test/output.txt +++ b/test/output.txt @@ -1,53 +1,118 @@ -> ffmpeg.js@4.2.9000 test /usr/local/google/home/paulkinlan/ffmpeg.js -> mocha test/test.js "-g" "stdin" +> ffmpeg.js@4.2.9001 test /usr/local/google/home/paulkinlan/ffmpeg.js +> mocha test/test.js "-g" "large" WebM Worker -SharedArrayBuffer { - [Uint8Contents]: <00 00 00 00 36 03 06 00 00 00 00 00 00 00 00 00 1a 45 df a3 01 00 00 00 00 00 00 1f 42 86 81 01 42 f7 81 01 42 f2 81 04 42 f3 81 08 42 82 84 77 65 62 6d 42 87 81 02 42 85 81 02 18 53 80 67 01 00 00 00 00 06 02 ff 11 4d 9b 74 40 2d 4d bb 8b 53 ab 84 15 49 a9 66 53 ac 81 df 4d bb 8c 53 ab 84 16 54 ae ... 10485676 more bytes>, - byteLength: 10485776 +RingBuffer { + _size: 10485760, + _sab: SharedArrayBuffer { + [Uint8Contents]: <00 00 00 00 36 03 06 00 01 00 00 00 00 00 00 00 01 00 00 00 1a 45 df a3 01 00 00 00 00 00 00 1f 42 86 81 01 42 f7 81 01 42 f2 81 04 42 f3 81 08 42 82 84 77 65 62 6d 42 87 81 02 42 85 81 02 18 53 80 67 01 00 00 00 00 06 02 ff 11 4d 9b 74 40 2d 4d bb 8b 53 ab 84 15 49 a9 66 53 ac 81 df 4d bb 8c 53 ab ... 10485680 more bytes>, + byteLength: 10485780 + }, + _header: Uint32Array(2) [ 0, 394038 ], + _state: Int32Array(3) [ 1, 0, 1 ], + _body: Uint8Array(10485760) [ + 26, 69, 223, 163, 1, 0, 0, 0, 0, 0, 0, 31, + 66, 134, 129, 1, 66, 247, 129, 1, 66, 242, 129, 4, + 66, 243, 129, 8, 66, 130, 132, 119, 101, 98, 109, 66, + 135, 129, 2, 66, 133, 129, 2, 24, 83, 128, 103, 1, + 0, 0, 0, 0, 6, 2, 255, 17, 77, 155, 116, 64, + 45, 77, 187, 139, 83, 171, 132, 21, 73, 169, 102, 83, + 172, 129, 223, 77, 187, 140, 83, 171, 132, 22, 84, 174, + 107, 83, 172, 130, 1, 48, 77, 187, 141, 83, 171, 132, + 28, 83, 187, 107, + ... 10485660 more items + ] } -stderr ffmpeg version n4.2.2 Copyright (c) 2000-2019 the FFmpeg developers -stderr built with emcc (Emscripten gcc/clang-like replacement) 1.39.13 -stderr configuration: --cc=emcc --ranlib=emranlib --enable-cross-compile --target-os=none --arch=x86 --disable-runtime-cpudetect --disable-asm --disable-fast-unaligned --disable-pthreads --disable-w32threads --disable-os2threads --disable-debug --disable-stripping --disable-safe-bitstream-reader --disable-all --enable-ffmpeg --enable-avcodec --enable-avformat --enable-avfilter --enable-swresample --enable-swscale --disable-network --disable-d3d11va --disable-dxva2 --disable-vaapi --disable-vdpau --enable-decoder=vp8 --enable-decoder=h264 --enable-decoder=vorbis --enable-decoder=opus --enable-decoder=mp3 --enable-decoder=aac --enable-decoder=pcm_s16le --enable-demuxer=matroska --enable-demuxer=ogg --enable-demuxer=mov --enable-demuxer=mp3 --enable-demuxer=wav --enable-demuxer=concat --enable-protocol=file --enable-filter=aresample --enable-filter=scale --enable-filter=crop --enable-filter=overlay --enable-filter=hstack --enable-filter=vstack --disable-bzlib --disable-iconv --disable-libxcb --disable-lzma --disable-sdl2 --disable-securetransport --disable-xlib --disable-zlib --enable-encoder=libvpx_vp8 --enable-encoder=libopus --enable-muxer=webm --enable-muxer=ogg --enable-muxer=null --enable-libopus --enable-libvpx --extra-cflags=-I../libvpx/dist/include --extra-ldflags=-L../libvpx/dist/lib -stderr libavutil 56. 31.100 / 56. 31.100 -stderr libavcodec 58. 54.100 / 58. 54.100 -stderr libavformat 58. 29.100 / 58. 29.100 -stderr libavfilter 7. 57.100 / 7. 57.100 -stderr libswscale 5. 5.100 / 5. 5.100 -stderr libswresample 3. 5.100 / 3. 5.100 -stderr [vp8 @ 0x6ac758] Warning: not compiled with thread support, using thread emulation -stderr [vorbis @ 0x6af528] Warning: not compiled with thread support, using thread emulation -stderr Input #0, matroska,webm, from '/dev/stdin': -stderr Metadata: -stderr encoder : Lavf56.40.100 -stderr Duration: 00:00:03.95, start: 0.000000, bitrate: N/A -stderr Stream #0:0: Video: vp8, yuv420p, 854x480, SAR 1:1 DAR 427:240, 24 fps, 24 tbr, 1k tbn, 1k tbc (default) -stderr Stream #0:1: Audio: vorbis, 48000 Hz, stereo, fltp (default) -stderr Stream #0:2(eng): Subtitle: webvtt -stderr [vp8 @ 0x6b32e0] Warning: not compiled with thread support, using thread emulation -stderr Stream mapping: -stderr Stream #0:0 -> #0:0 (vp8 (native) -> vp8 (libvpx)) -stderr [libvpx @ 0x6c0548] Warning: not compiled with thread support, using thread emulation -stderr [libvpx @ 0x6c0548] v1.8.2 -stderr Output #0, webm, to 'out.webm': -stderr Metadata: -stderr encoder : Lavf58.29.100 -stderr Stream #0:0: Video: vp8 (libvpx), yuv420p, 854x480 [SAR 1:1 DAR 427:240], q=-1--1, 200 kb/s, 24 fps, 1k tbn, 24 tbc (default) -stderr Metadata: -stderr encoder : Lavc58.54.100 libvpx -stderr Side data: -stderr cpb: bitrate max/min/avg: 0/0/0 buffer size: 0 vbv_delay: -1 -stderr frame= 1 fps=0.3 q=0.0 size= 1kB time=00:00:00.00 bitrate=4760.0kbits/s speed=0.000349x -stderr frame= 3 fps=0.8 q=0.0 size= 1kB time=00:00:00.08 bitrate= 56.7kbits/s speed=0.0217x -stderr frame= 4 fps=0.9 q=0.0 size= 1kB time=00:00:00.12 bitrate= 37.8kbits/s speed=0.0273x -stderr frame= 5 fps=0.9 q=0.0 size= 1kB time=00:00:00.16 bitrate= 28.3kbits/s speed=0.0314x -stderr frame= 5 fps=0.8 q=0.0 Lsize= 37kB time=00:00:00.16 bitrate=1786.0kbits/s speed=0.0277x -stderr video:36kB audio:0kB subtitle:0kB other streams:0kB global headers:0kB muxing overhead: 1.799479% - ✓ should encode test file to WebM/VP8 with stdin (6755ms) +ffmpeg version n4.2.2 Copyright (c) 2000-2019 the FFmpeg developers + + built with emcc (Emscripten gcc/clang-like replacement) 1.39.13 + + configuration: --cc=emcc --ranlib=emranlib --enable-cross-compile --target-os=none --arch=x86 --disable-runtime-cpudetect --disable-asm --disable-fast-unaligned --disable-pthreads --disable-w32threads --disable-os2threads --disable-debug --disable-stripping --disable-safe-bitstream-reader --disable-all --enable-ffmpeg --enable-avcodec --enable-avformat --enable-avfilter --enable-swresample --enable-swscale --disable-network --disable-d3d11va --disable-dxva2 --disable-vaapi --disable-vdpau --enable-decoder=vp8 --enable-decoder=h264 --enable-decoder=vorbis --enable-decoder=opus --enable-decoder=mp3 --enable-decoder=aac --enable-decoder=pcm_s16le --enable-demuxer=matroska --enable-demuxer=ogg --enable-demuxer=mov --enable-demuxer=mp3 --enable-demuxer=wav --enable-demuxer=concat --enable-protocol=file --enable-filter=aresample --enable-filter=scale --enable-filter=crop --enable-filter=overlay --enable-filter=hstack --enable-filter=vstack --disable-bzlib --disable-iconv --disable-libxcb --disable-lzma --disable-sdl2 --disable-securetransport --disable-xlib --disable-zlib --enable-encoder=libvpx_vp8 --enable-encoder=libopus --enable-muxer=webm --enable-muxer=ogg --enable-muxer=null --enable-libopus --enable-libvpx --extra-cflags=-I../libvpx/dist/include --extra-ldflags=-L../libvpx/dist/lib + + libavutil 56. 31.100 / 56. 31.100 + + libavcodec 58. 54.100 / 58. 54.100 + + libavformat 58. 29.100 / 58. 29.100 + + libavfilter 7. 57.100 / 7. 57.100 + + libswscale 5. 5.100 / 5. 5.100 + + libswresample 3. 5.100 / 3. 5.100 + +[vp8 @ 0x6ac758] Warning: not compiled with thread support, using thread emulation + +[vorbis @ 0x6af528] Warning: not compiled with thread support, using thread emulation + +Input #0, matroska,webm, from '/dev/stdin': + + Metadata: + + encoder : Lavf56.40.100 + + Duration: 00:00:03.95, start: 0.000000, bitrate: N/A + + Stream #0:0: Video: vp8, yuv420p, 854x480, SAR 1:1 DAR 427:240, 24 fps, 24 tbr, 1k tbn, 1k tbc (default) + + Stream #0:1: Audio: vorbis, 48000 Hz, stereo, fltp (default) + + Stream #0:2(eng): Subtitle: webvtt + +[vp8 @ 0x6b32e0] Warning: not compiled with thread support, using thread emulation + +Stream mapping: + + Stream #0:0 -> #0:0 (vp8 (native) -> vp8 (libvpx)) + +[libvpx @ 0x6c0548] Warning: not compiled with thread support, using thread emulation + +[libvpx @ 0x6c0548] v1.8.2 + +Output #0, webm, to 'out.webm': + + Metadata: + + encoder : Lavf58.29.100 + + Stream #0:0: Video: vp8 (libvpx), yuv420p, 854x480 [SAR 1:1 DAR 427:240], q=-1--1, 200 kb/s, 24 fps, 1k tbn, 24 tbc (default) + + Metadata: + + encoder : Lavc58.54.100 libvpx + + Side data: + + cpb: bitrate max/min/avg: 0/0/0 buffer size: 0 vbv_delay: -1 + +frame= 1 fps=0.3 q=0.0 size= 1kB time=00:00:00.00 bitrate=4760.0kbits/s speed=0.000348x +frame= 3 fps=0.8 q=0.0 size= 1kB time=00:00:00.08 bitrate= 56.7kbits/s speed=0.0214x +frame= 4 fps=0.9 q=0.0 size= 1kB time=00:00:00.12 bitrate= 37.8kbits/s speed=0.027x +frame= 5 fps=0.9 q=0.0 size= 1kB time=00:00:00.16 bitrate= 28.3kbits/s speed=0.0312x +frame= 5 fps=0.8 q=0.0 Lsize= 37kB time=00:00:00.16 bitrate=1786.0kbits/s speed=0.0275x + +video:36kB audio:0kB subtitle:0kB other streams:0kB global headers:0kB muxing overhead: 1.799479% + +{ + name: 'out.webm', + data: Uint8Array(37507) [ + 26, 69, 223, 163, 159, 66, 134, 129, 1, 66, 247, 129, + 1, 66, 242, 129, 4, 66, 243, 129, 8, 66, 130, 132, + 119, 101, 98, 109, 66, 135, 129, 2, 66, 133, 129, 2, + 24, 83, 128, 103, 1, 0, 0, 0, 0, 0, 146, 83, + 17, 77, 155, 116, 187, 77, 187, 139, 83, 171, 132, 21, + 73, 169, 102, 83, 172, 129, 229, 77, 187, 140, 83, 171, + 132, 22, 84, 174, 107, 83, 172, 130, 1, 28, 77, 187, + 140, 83, 171, 132, 18, 84, 195, 103, 83, 172, 130, 1, + 94, 77, 187, 140, + ... 37407 more items + ] +} + ✓ should encode test file to WebM/VP8 with stdin - large buffer (6734ms) 1 passing (7s) diff --git a/test/ringbuffer.js b/test/ringbuffer.js index df39e24..a75fe9c 100644 --- a/test/ringbuffer.js +++ b/test/ringbuffer.js @@ -1,130 +1,210 @@ /* - Copyright 2020 Google Inc. +Copyright 2020 Google Inc. - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ class RingBuffer { - /* - Create's a Ring Buffer backed by a correctly sized SAB. - - There can only be one writer and one reader. - */ - static create(length) { - const buffer = new SharedArrayBuffer( - length + Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH - ); - return new RingBuffer(buffer); - } + static from(sab) { + return new RingBuffer(sab); + } + + get buffer() { + return this._sab; + } + + get remaining() { + return this._size - this.length; + } + + get size() { + return this._size; + } + + get length() { + let readIndex = Atomics.load(this._header, HEADER.READ); + let writeIndex = Atomics.load(this._header, HEADER.WRITE); + + const delta = writeIndex - readIndex; + return readIndex <= writeIndex ? delta : delta + this._size; + } - static from(sab) { - return new RingBuffer(sab); - } + get eof() { + return (this.length === 0 && Atomics.load(this._state, READER_STATE.EOF) === 0) ? true : false; + } - get buffer() { - return this._sab; + set eof(val) { + let eofVal = !!val ? 0 : 1; + if (this.length === 0 && val) { + Atomics.notify(this._state, READER_STATE.DATA_AVAILABLE); } + Atomics.store(this._state, READER_STATE.EOF, eofVal); + } - get length() { - const readIndex = Atomics.load(this._header, HEADER.READ); - const writeIndex = Atomics.load(this._header, HEADER.WRITE); - - const delta = writeIndex - readIndex; - return (readIndex <= writeIndex) ? delta : delta + this._size; + /* + Create's a Ring Buffer backed by a correctly sized SAB. + + There can only be one writer and one reader. + */ + static create(length) { + const buffer = new SharedArrayBuffer( + length + + Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH + + Int32Array.BYTES_PER_ELEMENT * READER_STATE_LENGTH + ); + + return new RingBuffer(buffer); + } + + constructor(sab) { + if (!!sab == false) throw new Error("Shared Array Buffer is undefined"); + if (sab instanceof SharedArrayBuffer == false) + throw new Error("Parameter 0 is not a Shared Array Buffer"); + + this._size = sab.byteLength + - Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH + - Int32Array.BYTES_PER_ELEMENT * READER_STATE_LENGTH; + this._sab = sab; + this._header = new Uint32Array(sab, 0, HEADER_LENGTH); + this._state = new Int32Array(sab, Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH, READER_STATE_LENGTH) + this._body = new Uint8Array( + sab, + Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH + + Int32Array.BYTES_PER_ELEMENT * READER_STATE_LENGTH, + this._size + ); + + Atomics.store(this._state, READER_STATE.DATA_AVAILABLE, 0); + Atomics.store(this._state, READER_STATE.EOF, 1); + } + + /* + data: An array of Uint8 + attemptToFill (deafault: false): if true, will fill as much of the array as possible + returning the items that couldn't be added. + + */ + append(data, attemptToFill = false) { + const { remaining, length, size } = this; + + if (data.length > remaining && attemptToFill == false) { + throw new Error("Data being appeneded will overflow the buffer"); } - - constructor(sab) { - if (!!sab == false) throw new Error("Shared Array Buffer is undefined"); - if (sab instanceof SharedArrayBuffer == false) - throw new Error("Parameter 0 is not a Shared Array Buffer"); - - this._size = - sab.byteLength - Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH; - this._sab = sab; - this._header = new Uint32Array(sab, 0, HEADER_LENGTH); - this._body = new Uint8Array( - sab, - Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH, - this._size + + if (data instanceof Array == false && data instanceof Uint8Array == false) { + throw new Error( + "data is not an array that can be converted to Uint8array" ); } - - append(data) { - for (const byte of data) { - let writeIndex = Atomics.load(this._header, HEADER.WRITE); - - Atomics.store(this._body, writeIndex, byte); - - writeIndex = Atomics.add(this._header, HEADER.WRITE, 1); - - if (writeIndex == this._size - 1) { - Atomics.store( - this._header, - HEADER.WRITE, - 0 - ); - } - } + + let readIndex = Atomics.load(this._header, HEADER.READ); + let writeIndex = Atomics.load(this._header, HEADER.WRITE); + let writeStart = writeIndex % size; + + // We need at most two write operations. + // If the data will go past the end of the buffer, we need + // to write a 2nd batch from the start of the buffer. + // 9, 15 + // batch1, pos [9] = val [0] + // batch2, pos [0] = val [1,2,3,4,5] + + const batch1 = data.slice(0, size - writeStart); + this._body.set(batch1, writeStart); + let writeLength = batch1.length; + + if (writeLength < data.length) { + // We are wrapping around because there was more data. + const batch2 = data.slice(writeLength, remaining - writeLength); + this._body.set(batch2, 0); + writeLength += batch2.length; + + Atomics.add(this._header, HEADER.WRITE, writeLength); + + if (attemptToFill && writeLength < data.length) { + return data.slice(writeLength); + } } - - // Reads the next byte of data - read() { - let readIndex = Atomics.load(this._header, HEADER.READ); - let writeIndex = Atomics.load(this._header, HEADER.WRITE); - - if (readIndex == writeIndex) return undefined; - - const value = Atomics.load(this._body, readIndex); - - readIndex = Atomics.add(this._header, HEADER.READ, 1); - - if (readIndex == this._size - 1) { - readIndex = Atomics.store(this._header, HEADER.READ, 0); - } - - return value; + else { + Atomics.add(this._header, HEADER.WRITE, writeLength); } - - *readToHead() { - // Feels odd to have to create a buffer the same size as the buffer. Just iterate. - let data; - while ((data = this.read()) != undefined) { - yield data; - } + + Atomics.store(this._state, READER_STATE.DATA_AVAILABLE, 1); + Atomics.notify(this._state, READER_STATE.DATA_AVAILABLE); + } + + // Reads the next byte of data. Note: Assuming 4GB of addressable buffer. + read() { + let readIndex = Atomics.load(this._header, HEADER.READ); + let writeIndex = Atomics.load(this._header, HEADER.WRITE); + + if (readIndex == writeIndex - 1) { + // The next blocking read, should wait. + Atomics.store(this._state, READER_STATE.DATA_AVAILABLE, 0); } - - clear() { - Atomics.store(this._header, HEADER.READ, 0); - Atomics.store(this._header, HEADER.WRITE, 0); + + if (readIndex == writeIndex) { + return undefined; } + + const value = Atomics.load(this._body, readIndex % this._size); + + readIndex = Atomics.add(this._header, HEADER.READ, 1); + + return value; + } - debug() { - console.log(this._sab); - console.log(this._header); - console.log(this._body); + blockingRead() { + if (this.eof) return undefined; + + Atomics.wait(this._state, READER_STATE.DATA_AVAILABLE, 0); + return this.read(); + } + + *readToHead() { + // Feels odd to have to create a buffer the same size as the buffer. Just iterate. + let data; + while ((data = this.read()) != undefined) { + yield data; } } - - const HEADER = { - READ: 0, - WRITE: 1, - READING: 2, - WRITING: 4, - }; - - const HEADER_LENGTH = Object.keys(HEADER).length; - - if (typeof window === 'undefined') { - module.exports = RingBuffer; + + clear() { + Atomics.store(this._header, HEADER.READ, 0); + Atomics.store(this._header, HEADER.WRITE, 0); } - \ No newline at end of file + + debug() { + console.log(this._sab); + console.log(this._header); + console.log(this._body); + } +} + +const HEADER = { + READ: 0, // 4GB buffer + WRITE: 1, // 4GB buffer +}; + +const HEADER_LENGTH = Object.keys(HEADER).length; + +const READER_STATE = { + DATA_AVAILABLE: 0, + WAITING: 1, + EOF: 2 +}; + +const READER_STATE_LENGTH = Object.keys(READER_STATE).length; + +if (typeof window === 'undefined') { + module.exports = RingBuffer; +} diff --git a/test/test.js b/test/test.js index 3c7dffe..06ee7e4 100644 --- a/test/test.js +++ b/test/test.js @@ -319,22 +319,30 @@ describe("WebM", function () { MEMFS: [{ name: "test.webm", data: testData }], }); break; + case "stdout": + console.log(msg.data); + break; + case "stderr": + console.log(msg.data); + break; case "done": worker.terminate(); var mem = msg.data.MEMFS; expect(mem).to.have.length(1); expect(mem[0].name).to.equal("out.webm"); - expect(mem[0].data.length).to.be.above(0); + expect(mem[0].data.length).to.be.above(49 * 1024); done(); break; } }); }); - it("should encode test file to WebM/VP8 with stdin", function (done) { + it("should encode test file to WebM/VP8 with stdin - large buffer", function (done) { const stdinBuffer = RingBuffer.create(10 * 1024 * 1024); stdinBuffer.append(testData); + console.log(stdinBuffer) + var worker = new Worker("./ffmpeg-worker-webm.js"); worker.postMessage({ type: "init", @@ -344,6 +352,12 @@ describe("WebM", function () { worker.on("error", done); worker.on("message", function (msg) { switch (msg.type) { + case "stdout": + console.log(msg.data); + break; + case "stderr": + console.log(msg.data); + break; case "ready": worker.postMessage({ type: "run", @@ -363,13 +377,80 @@ describe("WebM", function () { var mem = msg.data.MEMFS; expect(mem).to.have.length(1); expect(mem[0].name).to.equal("out.webm"); - expect(mem[0].data.length).to.be.above(0); + expect(mem[0].data.length).to.be.equal(37507); + console.log(mem[0]) done(); break; } }); }); }); + + it("should encode test file to WebM/VP8 with stdin and a tiny buffer", function (done) { + const stdinBuffer = RingBuffer.create(100 * 1024); + + // Init the worker with the stdin buffer. + var worker = new Worker("./ffmpeg-worker-webm.js"); + worker.postMessage({ + type: "init", + stdin: stdinBuffer.buffer + }); + + worker.onerror = done; + worker.on("error", done); + worker.on("message", function (msg) { + switch (msg.type) { + case "stdout": + console.log(msg.data); + break; + case "stderr": + console.log(msg.data); + break; + case "ready": + worker.postMessage({ + type: "run", + arguments: [ + "-f", "webm", + "-i", "/dev/stdin", + "-vcodec", "webm", + "-frames:v", "5", "-c:v", "libvpx", + "-an", + "out.webm", + ], + MEMFS: [{ name: "test.webm", data: testData }], + }); + break; + case "done": + worker.terminate(); + var mem = msg.data.MEMFS; + expect(mem).to.have.length(1); + expect(mem[0].name).to.equal("out.webm"); + expect(mem[0].data.length).to.be.equal(37507); + console.log(mem[0]) + done(); + break; + } + }); + + let offset = 0; + let len = 0; + + processData = () => { + // Pump the data in to the buffer. + len = stdinBuffer.remaining; + + if (len !== 0) { + // Buffer is full. Spin. + const data = testData.slice(offset, offset + len); + stdinBuffer.append(data); + offset = offset + len; + } + + if (offset < testData.length) setTimeout(processData, 100); + } + + setTimeout(processData, 1000); + }) }); describe("MP4", function () { From 1e5783ca26b7f6e87609afba356bd5c837e8c3ca Mon Sep 17 00:00:00 2001 From: Paul Kinlan Date: Mon, 20 Apr 2020 21:09:52 +0000 Subject: [PATCH 7/7] Fixes so that blocking Reads work --- build/pre.js | 34 +++++++++++++++-------- test/index.html | 69 ++++++++++++++++++++++++++++++++++++++++++++++ test/ringbuffer.js | 41 ++++++++++++--------------- test/test.js | 12 +++----- test/worker.js | 0 5 files changed, 113 insertions(+), 43 deletions(-) create mode 100644 test/index.html create mode 100644 test/worker.js diff --git a/build/pre.js b/build/pre.js index 3b4af28..4456541 100644 --- a/build/pre.js +++ b/build/pre.js @@ -1,3 +1,18 @@ +/* + Copyright 2020 Google Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ class RingBuffer { static from(sab) { return new RingBuffer(sab); @@ -67,9 +82,6 @@ class RingBuffer { + Int32Array.BYTES_PER_ELEMENT * READER_STATE_LENGTH, this._size ); - - Atomics.store(this._state, READER_STATE.DATA_AVAILABLE, 0); - Atomics.store(this._state, READER_STATE.EOF, 1); } /* @@ -91,7 +103,6 @@ class RingBuffer { ); } - let readIndex = Atomics.load(this._header, HEADER.READ); let writeIndex = Atomics.load(this._header, HEADER.WRITE); let writeStart = writeIndex % size; @@ -105,6 +116,7 @@ class RingBuffer { const batch1 = data.slice(0, size - writeStart); this._body.set(batch1, writeStart); let writeLength = batch1.length; + let slice = undefined; if (writeLength < data.length) { // We are wrapping around because there was more data. @@ -114,8 +126,8 @@ class RingBuffer { Atomics.add(this._header, HEADER.WRITE, writeLength); - if (attemptToFill && writeLength < data.length) { - return data.slice(writeLength); + if (attemptToFill && (writeLength < data.length)) { + slice = data.slice(writeLength); } } else { @@ -124,6 +136,8 @@ class RingBuffer { Atomics.store(this._state, READER_STATE.DATA_AVAILABLE, 1); Atomics.notify(this._state, READER_STATE.DATA_AVAILABLE); + + return slice; } // Reads the next byte of data. Note: Assuming 4GB of addressable buffer. @@ -133,7 +147,9 @@ class RingBuffer { if (readIndex == writeIndex - 1) { // The next blocking read, should wait. + console.log('next block') Atomics.store(this._state, READER_STATE.DATA_AVAILABLE, 0); + Atomics.notify(this._state, READER_STATE.DATA_AVAILABLE) } if (readIndex == writeIndex) { @@ -166,12 +182,6 @@ class RingBuffer { Atomics.store(this._header, HEADER.READ, 0); Atomics.store(this._header, HEADER.WRITE, 0); } - - debug() { - console.log(this._sab); - console.log(this._header); - console.log(this._body); - } } const HEADER = { diff --git a/test/index.html b/test/index.html new file mode 100644 index 0000000..9ee1d13 --- /dev/null +++ b/test/index.html @@ -0,0 +1,69 @@ + + + + + + \ No newline at end of file diff --git a/test/ringbuffer.js b/test/ringbuffer.js index a75fe9c..a52779a 100644 --- a/test/ringbuffer.js +++ b/test/ringbuffer.js @@ -1,17 +1,17 @@ /* -Copyright 2020 Google Inc. + Copyright 2020 Google Inc. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ class RingBuffer { @@ -83,14 +83,11 @@ class RingBuffer { + Int32Array.BYTES_PER_ELEMENT * READER_STATE_LENGTH, this._size ); - - Atomics.store(this._state, READER_STATE.DATA_AVAILABLE, 0); - Atomics.store(this._state, READER_STATE.EOF, 1); } /* data: An array of Uint8 - attemptToFill (deafault: false): if true, will fill as much of the array as possible + attemptToFill (default: false): if true, will fill as much of the array as possible returning the items that couldn't be added. */ @@ -107,7 +104,6 @@ class RingBuffer { ); } - let readIndex = Atomics.load(this._header, HEADER.READ); let writeIndex = Atomics.load(this._header, HEADER.WRITE); let writeStart = writeIndex % size; @@ -121,6 +117,7 @@ class RingBuffer { const batch1 = data.slice(0, size - writeStart); this._body.set(batch1, writeStart); let writeLength = batch1.length; + let slice = undefined; if (writeLength < data.length) { // We are wrapping around because there was more data. @@ -130,8 +127,8 @@ class RingBuffer { Atomics.add(this._header, HEADER.WRITE, writeLength); - if (attemptToFill && writeLength < data.length) { - return data.slice(writeLength); + if (attemptToFill && (writeLength < data.length)) { + slice = data.slice(writeLength); } } else { @@ -140,6 +137,8 @@ class RingBuffer { Atomics.store(this._state, READER_STATE.DATA_AVAILABLE, 1); Atomics.notify(this._state, READER_STATE.DATA_AVAILABLE); + + return slice; } // Reads the next byte of data. Note: Assuming 4GB of addressable buffer. @@ -149,7 +148,9 @@ class RingBuffer { if (readIndex == writeIndex - 1) { // The next blocking read, should wait. + console.log('next block') Atomics.store(this._state, READER_STATE.DATA_AVAILABLE, 0); + Atomics.notify(this._state, READER_STATE.DATA_AVAILABLE) } if (readIndex == writeIndex) { @@ -182,12 +183,6 @@ class RingBuffer { Atomics.store(this._header, HEADER.READ, 0); Atomics.store(this._header, HEADER.WRITE, 0); } - - debug() { - console.log(this._sab); - console.log(this._header); - console.log(this._body); - } } const HEADER = { diff --git a/test/test.js b/test/test.js index 06ee7e4..268c7f8 100644 --- a/test/test.js +++ b/test/test.js @@ -341,8 +341,6 @@ describe("WebM", function () { const stdinBuffer = RingBuffer.create(10 * 1024 * 1024); stdinBuffer.append(testData); - console.log(stdinBuffer) - var worker = new Worker("./ffmpeg-worker-webm.js"); worker.postMessage({ type: "init", @@ -378,7 +376,6 @@ describe("WebM", function () { expect(mem).to.have.length(1); expect(mem[0].name).to.equal("out.webm"); expect(mem[0].data.length).to.be.equal(37507); - console.log(mem[0]) done(); break; } @@ -387,8 +384,8 @@ describe("WebM", function () { }); it("should encode test file to WebM/VP8 with stdin and a tiny buffer", function (done) { - const stdinBuffer = RingBuffer.create(100 * 1024); - + const stdinBuffer = RingBuffer.create(1 * 1024 * 1024); + // Init the worker with the stdin buffer. var worker = new Worker("./ffmpeg-worker-webm.js"); worker.postMessage({ @@ -426,7 +423,6 @@ describe("WebM", function () { expect(mem).to.have.length(1); expect(mem[0].name).to.equal("out.webm"); expect(mem[0].data.length).to.be.equal(37507); - console.log(mem[0]) done(); break; } @@ -446,10 +442,10 @@ describe("WebM", function () { offset = offset + len; } - if (offset < testData.length) setTimeout(processData, 100); + if (offset < testData.length) setTimeout(processData, 1000); } - setTimeout(processData, 1000); + setImmediate(processData); }) }); diff --git a/test/worker.js b/test/worker.js new file mode 100644 index 0000000..e69de29