Skip to content

Commit

Permalink
Inter-Process Communication (#498)
Browse files Browse the repository at this point in the history
  • Loading branch information
gustavohenke committed Aug 18, 2024
1 parent 5cf302d commit b91d0c3
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 28 deletions.
26 changes: 24 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ For more details, visit https://github.com/open-cli-tools/concurrently
### `concurrently(commands[, options])`

- `commands`: an array of either strings (containing the commands to run) or objects
with the shape `{ command, name, prefixColor, env, cwd }`.
with the shape `{ command, name, prefixColor, env, cwd, ipc }`.

- `options` (optional): an object containing any of the below:
- `cwd`: the working directory to be used by all commands. Can be overriden per command.
Expand Down Expand Up @@ -405,11 +405,33 @@ It has the following properties:
- `stderr`: an RxJS observable to the command's `stderr`.
- `error`: an RxJS observable to the command's error events (e.g. when it fails to spawn).
- `timer`: an RxJS observable to the command's timing events (e.g. starting, stopping).
- `messages`: an object with the following properties:

- `incoming`: an RxJS observable for the IPC messages received from the underlying process.
- `outgoing`: an RxJS observable for the IPC messages sent to the underlying process.

Both observables emit [`MessageEvent`](#messageevent)s.<br>
Note that if the command wasn't spawned with IPC support, these won't emit any values.

- `close`: an RxJS observable to the command's close events.
See [`CloseEvent`](#CloseEvent) for more information.
- `start()`: starts the command, setting up all
- `start()`: starts the command and sets up all of the above streams
- `send(message[, handle, options])`: sends a message to the underlying process via IPC channels,
returning a promise that resolves once the message has been sent.
See [Node.js docs](https://nodejs.org/docs/latest/api/child_process.html#subprocesssendmessage-sendhandle-options-callback).
- `kill([signal])`: kills the command, optionally specifying a signal (e.g. `SIGTERM`, `SIGKILL`, etc).

### `MessageEvent`

An object that represents a message that was received from/sent to the underlying command process.<br>
It has the following properties:

- `message`: the message itself.
- `handle`: a [`net.Socket`](https://nodejs.org/docs/latest/api/net.html#class-netsocket),
[`net.Server`](https://nodejs.org/docs/latest/api/net.html#class-netserver) or
[`dgram.Socket`](https://nodejs.org/docs/latest/api/dgram.html#class-dgramsocket),
if one was sent, or `undefined`.

### `CloseEvent`

An object with information about a command's closing event.<br>
Expand Down
165 changes: 164 additions & 1 deletion src/command.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { autoUnsubscribe, subscribeSpyTo } from '@hirez_io/observer-spy';
import { SpawnOptions } from 'child_process';
import { SendHandle, SpawnOptions } from 'child_process';
import { EventEmitter } from 'events';
import * as Rx from 'rxjs';
import { Readable, Writable } from 'stream';
Expand All @@ -16,14 +16,19 @@ import {
type CommandValues = { error: unknown; close: CloseEvent; timer: unknown[] };

let process: ChildProcess;
let sendMessage: jest.Mock;
let spawn: jest.Mocked<SpawnCommand>;
let killProcess: KillProcess;

const IPC_FD = 3;

autoUnsubscribe();

beforeEach(() => {
sendMessage = jest.fn();
process = new (class extends EventEmitter {
readonly pid = 1;
send = sendMessage;
readonly stdout = new Readable({
read() {
// do nothing
Expand Down Expand Up @@ -248,6 +253,164 @@ describe('#start()', () => {

expect((await stderr).toString()).toBe('dang');
});

describe('on incoming messages', () => {
it('does not share to the incoming messages stream, if IPC is disabled', () => {
const { command } = createCommand();
const spy = subscribeSpyTo(command.messages.incoming);
command.start();

process.emit('message', {});
expect(spy.getValuesLength()).toBe(0);
});

it('shares to the incoming messages stream, if IPC is enabled', () => {
const { command } = createCommand({ ipc: IPC_FD });
const spy = subscribeSpyTo(command.messages.incoming);
command.start();

const message1 = {};
process.emit('message', message1, undefined);

const message2 = {};
const handle = {} as SendHandle;
process.emit('message', message2, handle);

expect(spy.getValuesLength()).toBe(2);
expect(spy.getValueAt(0)).toEqual({ message: message1, handle: undefined });
expect(spy.getValueAt(1)).toEqual({ message: message2, handle });
});
});

describe('on outgoing messages', () => {
it('calls onSent with an error if the process does not have IPC enabled', () => {
const { command } = createCommand({ ipc: IPC_FD });
command.start();

Object.assign(process, {
// The TS types don't assume `send` can be undefined,
// despite the Node docs saying so
send: undefined,
});

const onSent = jest.fn();
command.messages.outgoing.next({ message: {}, onSent });
expect(onSent).toHaveBeenCalledWith(expect.any(Error));
});

it('sends the message to the process', () => {
const { command } = createCommand({ ipc: IPC_FD });
command.start();

const message1 = {};
command.messages.outgoing.next({ message: message1, onSent() {} });

const message2 = {};
const handle = {} as SendHandle;
command.messages.outgoing.next({ message: message2, handle, onSent() {} });

const message3 = {};
const options = {};
command.messages.outgoing.next({ message: message3, options, onSent() {} });

expect(process.send).toHaveBeenCalledTimes(3);
expect(process.send).toHaveBeenNthCalledWith(
1,
message1,
undefined,
undefined,
expect.any(Function),
);
expect(process.send).toHaveBeenNthCalledWith(
2,
message2,
handle,
undefined,
expect.any(Function),
);
expect(process.send).toHaveBeenNthCalledWith(
3,
message3,
undefined,
options,
expect.any(Function),
);
});

it('sends the message to the process, if it starts late', () => {
const { command } = createCommand({ ipc: IPC_FD });
command.messages.outgoing.next({ message: {}, onSent() {} });
expect(process.send).not.toHaveBeenCalled();

command.start();
expect(process.send).toHaveBeenCalled();
});

it('calls onSent with the result of sending the message', () => {
const { command } = createCommand({ ipc: IPC_FD });
command.start();

const onSent = jest.fn();
command.messages.outgoing.next({ message: {}, onSent });
expect(onSent).not.toHaveBeenCalled();

sendMessage.mock.calls[0][3]();
expect(onSent).toHaveBeenCalledWith(undefined);

const error = new Error();
sendMessage.mock.calls[0][3](error);
expect(onSent).toHaveBeenCalledWith(error);
});
});
});

describe('#send()', () => {
it('throws if IPC is not set up', () => {
const { command } = createCommand();
const fn = () => command.send({});
expect(fn).toThrow();
});

it('pushes the message on the outgoing messages stream', () => {
const { command } = createCommand({ ipc: IPC_FD });
const spy = subscribeSpyTo(command.messages.outgoing);

const message1 = { foo: true };
command.send(message1);

const message2 = { bar: 123 };
const handle = {} as SendHandle;
command.send(message2, handle);

const message3 = { baz: 'yes' };
const options = {};
command.send(message3, undefined, options);

expect(spy.getValuesLength()).toBe(3);
expect(spy.getValueAt(0)).toMatchObject({
message: message1,
handle: undefined,
options: undefined,
});
expect(spy.getValueAt(1)).toMatchObject({ message: message2, handle, options: undefined });
expect(spy.getValueAt(2)).toMatchObject({ message: message3, handle: undefined, options });
});

it('resolves when onSent callback is called with no arguments', async () => {
const { command } = createCommand({ ipc: IPC_FD });
const spy = subscribeSpyTo(command.messages.outgoing);
const promise = command.send({});
spy.getFirstValue().onSent();
await expect(promise).resolves.toBeUndefined();
});

it('rejects when onSent callback is called with an argument', async () => {
const { command } = createCommand({ ipc: IPC_FD });
const spy = subscribeSpyTo(command.messages.outgoing);
const promise = command.send({});
spy.getFirstValue().onSent('foo');
await expect(promise).rejects.toBe('foo');
});
});

describe('#kill()', () => {
Expand Down
Loading

0 comments on commit b91d0c3

Please sign in to comment.