Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First working test of stdin of webworker #1

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ MP4_SHARED_DEPS = \
build/x264/dist/lib/libx264.so

all: webm mp4
webm: ffmpeg-webm.js ffmpeg-worker-webm.js
webm: ffmpeg-worker-webm.js ffmpeg-webm.js
mp4: ffmpeg-mp4.js ffmpeg-worker-mp4.js

clean: clean-js \
Expand Down Expand Up @@ -216,7 +216,7 @@ build/ffmpeg-mp4/ffmpeg.bc: $(MP4_SHARED_DEPS)

EMCC_COMMON_ARGS = \
-O3 \
--closure 1 \
--closure 0 \
--memory-init-file 0 \
-s WASM=0 \
-s WASM_ASYNC_COMPILATION=0 \
Expand Down
15 changes: 4 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ You can send the following messages to the worker:
* `{type: "run", ...opts}` - Start new job with provided options.

```js
var stdout = "";
var stderr = "";
var worker = new Worker("ffmpeg-worker-webm.js");
worker.onmessage = function(e) {
var msg = e.data;
Expand All @@ -74,15 +72,13 @@ worker.onmessage = function(e) {
worker.postMessage({type: "run", arguments: ["-version"]});
break;
case "stdout":
stdout += msg.data + "\n";
console.log(msg.data);
break;
case "stderr":
stderr += msg.data + "\n";
console.log(msg.data);
break;
case "exit":
console.log("Process exited with code " + msg.data);
console.log(stdout);
worker.terminate();
case "done":
console.log(msg.data);
break;
}
};
Expand All @@ -106,8 +102,6 @@ var testData = new Uint8Array(fs.readFileSync("test.webm"));
var result = ffmpeg({
MEMFS: [{name: "test.webm", data: testData}],
arguments: ["-i", "test.webm", "-c:v", "libvpx", "-an", "out.webm"],
// Ignore stdin read requests.
stdin: function() {},
});
// Write out.webm to disk.
var out = result.MEMFS[0];
Expand All @@ -127,7 +121,6 @@ ffmpeg({
// Mount /data inside application to the current directory.
mounts: [{type: "NODEFS", opts: {root: "."}, mountpoint: "/data"}],
arguments: ["-i", "/data/test.webm", "-c:v", "libvpx", "-an", "/data/out.webm"],
stdin: function() {},
});
// out.webm was written to the current directory.
```
Expand Down
11 changes: 11 additions & 0 deletions build-with-docker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# /usr/bin/sh

if [ $# -eq 0 ]
then
TARGET="all"
else
TARGET=$1
fi

# Builds in-place.
echo "source /root/emsdk/emsdk_env.sh && make $TARGET" | docker run --rm -i -v "$(pwd):/mnt" -w /mnt kagamihi/ffmpeg.js
6 changes: 6 additions & 0 deletions build/post-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

var __ffmpegjs_running = false;

let stdinRingBuffer;

// Shim for nodejs
if (typeof self === "undefined") {
self = require("worker_threads")["parentPort"];
Expand Down Expand Up @@ -40,6 +42,10 @@ self.onmessage = function(e) {
self.postMessage({"type": "done", "data": result}, transfer);
__ffmpegjs_running = false;
}
} else if (msg["type"] == "init") {
const buffer = msg["stdin"];
stdinRingBuffer = RingBuffer.from(buffer);
//self.postMessage({"type": "ready"}); // Should really only do this when stdin ready
} else {
self.postMessage({"type": "error", "data": "unknown command"});
}
Expand Down
212 changes: 212 additions & 0 deletions build/pre.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,206 @@
/*
Copyright 2020 Google Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
class RingBuffer {
static from(sab) {
return new RingBuffer(sab);
}

get buffer() {
return this._sab;
}

get remaining() {
return this._size - this.length;
}

get size() {
return this._size;
}

get length() {
let readIndex = Atomics.load(this._header, HEADER.READ);
let writeIndex = Atomics.load(this._header, HEADER.WRITE);

const delta = writeIndex - readIndex;
return readIndex <= writeIndex ? delta : delta + this._size;
}

get eof() {
return (this.length === 0 && Atomics.load(this._state, READER_STATE.EOF) === 0) ? true : false;
}

set eof(val) {
let eofVal = !!val ? 0 : 1;
if (this.length === 0 && val) {
Atomics.notify(this._state, READER_STATE.DATA_AVAILABLE);
}
Atomics.store(this._state, READER_STATE.EOF, eofVal);
}

/*
Create's a Ring Buffer backed by a correctly sized SAB.

There can only be one writer and one reader.
*/
static create(length) {
const buffer = new SharedArrayBuffer(
length
+ Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH
+ Int32Array.BYTES_PER_ELEMENT * READER_STATE_LENGTH
);

return new RingBuffer(buffer);
}

constructor(sab) {
if (!!sab == false) throw new Error("Shared Array Buffer is undefined");
if (sab instanceof SharedArrayBuffer == false)
throw new Error("Parameter 0 is not a Shared Array Buffer");

this._size = sab.byteLength
- Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH
- Int32Array.BYTES_PER_ELEMENT * READER_STATE_LENGTH;
this._sab = sab;
this._header = new Uint32Array(sab, 0, HEADER_LENGTH);
this._state = new Int32Array(sab, Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH, READER_STATE_LENGTH)
this._body = new Uint8Array(
sab,
Uint32Array.BYTES_PER_ELEMENT * HEADER_LENGTH
+ Int32Array.BYTES_PER_ELEMENT * READER_STATE_LENGTH,
this._size
);
}

/*
data: An array of Uint8
attemptToFill (deafault: false): if true, will fill as much of the array as possible
returning the items that couldn't be added.

*/
append(data, attemptToFill = false) {
const { remaining, length, size } = this;

if (data.length > remaining && attemptToFill == false) {
throw new Error("Data being appeneded will overflow the buffer");
}

if (data instanceof Array == false && data instanceof Uint8Array == false) {
throw new Error(
"data is not an array that can be converted to Uint8array"
);
}

let writeIndex = Atomics.load(this._header, HEADER.WRITE);
let writeStart = writeIndex % size;

// We need at most two write operations.
// If the data will go past the end of the buffer, we need
// to write a 2nd batch from the start of the buffer.
// 9, 15
// batch1, pos [9] = val [0]
// batch2, pos [0] = val [1,2,3,4,5]

const batch1 = data.slice(0, size - writeStart);
this._body.set(batch1, writeStart);
let writeLength = batch1.length;
let slice = undefined;

if (writeLength < data.length) {
// We are wrapping around because there was more data.
const batch2 = data.slice(writeLength, remaining - writeLength);
this._body.set(batch2, 0);
writeLength += batch2.length;

Atomics.add(this._header, HEADER.WRITE, writeLength);

if (attemptToFill && (writeLength < data.length)) {
slice = data.slice(writeLength);
}
}
else {
Atomics.add(this._header, HEADER.WRITE, writeLength);
}

Atomics.store(this._state, READER_STATE.DATA_AVAILABLE, 1);
Atomics.notify(this._state, READER_STATE.DATA_AVAILABLE);

return slice;
}

// Reads the next byte of data. Note: Assuming 4GB of addressable buffer.
read() {
let readIndex = Atomics.load(this._header, HEADER.READ);
let writeIndex = Atomics.load(this._header, HEADER.WRITE);

if (readIndex == writeIndex - 1) {
// The next blocking read, should wait.
console.log('next block')
Atomics.store(this._state, READER_STATE.DATA_AVAILABLE, 0);
Atomics.notify(this._state, READER_STATE.DATA_AVAILABLE)
}

if (readIndex == writeIndex) {
return undefined;
}

const value = Atomics.load(this._body, readIndex % this._size);

readIndex = Atomics.add(this._header, HEADER.READ, 1);

return value;
}

blockingRead() {
if (this.eof) return undefined;

Atomics.wait(this._state, READER_STATE.DATA_AVAILABLE, 0);
return this.read();
}

*readToHead() {
// Feels odd to have to create a buffer the same size as the buffer. Just iterate.
let data;
while ((data = this.read()) != undefined) {
yield data;
}
}

clear() {
Atomics.store(this._header, HEADER.READ, 0);
Atomics.store(this._header, HEADER.WRITE, 0);
}
}

const HEADER = {
READ: 0, // 4GB buffer
WRITE: 1, // 4GB buffer
};

const HEADER_LENGTH = Object.keys(HEADER).length;

const READER_STATE = {
DATA_AVAILABLE: 0,
WAITING: 1,
EOF: 2
};

const READER_STATE_LENGTH = Object.keys(READER_STATE).length;

var __ffmpegjs_utf8ToStr;

function __ffmpegjs(__ffmpegjs_opts) {
__ffmpegjs_opts = __ffmpegjs_opts || {};
var __ffmpegjs_abort = abort;
Expand Down Expand Up @@ -62,6 +265,15 @@ function __ffmpegjs(__ffmpegjs_opts) {
};

Module["preRun"] = function() {

FS.init(function() {
if (!!stdinRingBuffer && stdinRingBuffer.length > 0) {
const charRaw = stdinRingBuffer.blockingRead(); // only reads on char.
return charRaw;
}
return null;
});

(__ffmpegjs_opts["mounts"] || []).forEach(function(mount) {
var fs = FS.filesystems[mount["type"]];
if (!fs) {
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{
"name": "ffmpeg.js",
"version": "4.2.9000",
"version": "4.2.9001",
"description": "Port of FFmpeg with Emscripten",
"main": "ffmpeg-webm.js",
"scripts": {
"test": "mocha test/test.js"
"test": "mocha test/test.js",
"test:debug": "mocha --inspect-brk test/test.js"
},
"repository": {
"type": "git",
Expand Down
Loading