Skip to content

Commit

Permalink
feat: adaptors between notifiers and async iterables (#1340)
Browse files Browse the repository at this point in the history
  • Loading branch information
erights authored Jul 30, 2020
1 parent 582ffc7 commit b67d21a
Show file tree
Hide file tree
Showing 11 changed files with 523 additions and 160 deletions.
1 change: 1 addition & 0 deletions packages/notifier/exports.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
import './src/types';
4 changes: 3 additions & 1 deletion packages/notifier/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"homepage": "https://github.com/Agoric/agoric-sdk#readme",
"dependencies": {
"@agoric/assert": "^0.0.8",
"@agoric/eventual-send": "^0.9.3",
"@agoric/produce-promise": "^0.1.3"
},
"devDependencies": {
Expand Down Expand Up @@ -77,7 +78,8 @@
"no-unused-expressions": "off",
"no-loop-func": "off",
"no-inner-declarations": "off",
"import/prefer-default-export": "off"
"import/prefer-default-export": "off",
"import/no-extraneous-dependencies": "off"
}
},
"eslintIgnore": [
Expand Down
157 changes: 157 additions & 0 deletions packages/notifier/src/asyncIterableAdaptor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// @ts-check
// eslint-disable-next-line spaced-comment
/// <reference types="ses"/>

import { E } from '@agoric/eventual-send';
// eslint-disable-next-line import/no-cycle
import { makeNotifierKit } from './notifier';

import './types';

/**
* Adaptor from a notifierP to an async iterable.
* The notifierP can be any object that has an eventually invokable
* `getUpdateSince` method that behaves according to the notifier
* spec. This can be a notifier, a promise for a local or remote
* notfier, or a presence of a remote notifier.
*
* It is also used internally by notifier.js so that a notifier itself is an
* async iterable.
*
* An async iterable is an object with a `[Symbol.asyncIterator]()` method
* that returns an async iterator. The async iterator we return here has only
* a `next()` method, without the optional `return` and `throw` methods. The
* omitted methods, if present, would be used by the for/await/of loop to
* inform the iterator of early termination. But this adaptor would not do
* anything useful in reaction to this notification.
*
* An async iterator's `next()` method returns a promise for an iteration
* result. An iteration result is a record with `value` and `done` properties.
*
* The purpose of building on the notifier protocol is to have a lossy
* adaptor, where intermediate results can be missed in favor of more recent
* results which are therefore less stale. See
* https://github.com/Agoric/documentation/blob/master/main/distributed-programming.md#notifiers
*
* @template T
* @param {PromiseOrNot<BaseNotifier<T>>} notifierP
* @returns {AsyncIterable<T>}
*/
export const makeAsyncIterableFromNotifier = notifierP => {
return harden({
[Symbol.asyncIterator]: () => {
/** @type {UpdateCount} */
let localUpdateCount;
/** @type {Promise<{value: T, done: boolean}> | undefined} */
let myIterationResultP;
return harden({
next: () => {
if (!myIterationResultP) {
// In this adaptor, once `next()` is called and returns an
// unresolved promise, `myIterationResultP`, and until
// `myIterationResultP` is fulfilled with an
// iteration result, further `next()` calls will return the same
// `myIterationResultP` promise again without asking the notifier
// for more updates. If there's already an unanswered ask in the
// air, all further asks should just reuse the result of that one.
//
// This reuse behavior is only needed for code that uses the async
// iterator protocol explicitly. When this async iterator is
// consumed by a for/await/of loop, `next()` will only be called
// after the promise for the previous iteration result has
// fulfilled. If it fulfills with `done: true`, the for/await/of
// loop will never call `next()` again.
//
// See
// https://2ality.com/2016/10/asynchronous-iteration.html#queuing-next()-invocations
// for an explicit use that sends `next()` without waiting.
myIterationResultP = E(notifierP)
.getUpdateSince(localUpdateCount)
.then(({ value, updateCount }) => {
localUpdateCount = updateCount;
const done = localUpdateCount === undefined;
if (!done) {
// Once the outstanding question has been answered, stop
// using that answer, so any further `next()` questions
// cause a new `getUpdateSince` request.
//
// But only if more answers are expected. Once the notifier
// is `done`, that was the last answer so reuse it forever.
myIterationResultP = undefined;
}
return harden({ value, done });
});
}
return myIterationResultP;
},
});
},
});
};

/**
* This reads from `asyncIteratable` updating `updater` with each successive
* value. The `updater` the same API as the `updater` of a notifier kit,
* but can simply be an observer to react to these updates. As an observer,
* the `updater` may only be interested in certain occurrences (`updateState`,
* `finish`, `fail`), so for convenience, `updateFromIterable` feature
* tests for those methods before calling them.
*
* @template T
* @param {Partial<Updater<T>>} updater
* @param {AsyncIterable<T>} asyncIterable
* @returns {Promise<undefined>}
*/
// See https://github.com/Agoric/agoric-sdk/issues/1345 for why
// `updateFromIterable` currently needs a local `asyncIterable` rather than
// a possibly remote `asyncIterableP`.
export const updateFromIterable = (updater, asyncIterable) => {
const iterator = asyncIterable[Symbol.asyncIterator]();
return new Promise(ack => {
const recur = () => {
E.when(iterator.next()).then(
({ value, done }) => {
if (done) {
updater.finish && updater.finish(value);
ack();
} else {
updater.updateState && updater.updateState(value);
recur();
}
},
reason => {
updater.fail && updater.fail(reason);
ack();
},
);
};
recur();
});
};

/**
* Adaptor from async iterable to notifier.
*
* @template T
* @param {AsyncIterable<T>} asyncIterable
* @returns {Notifier<T>}
*/
export const makeNotifierFromAsyncIterable = asyncIterable => {
const { notifier, updater } = makeNotifierKit();
updateFromIterable(updater, asyncIterable);
return notifier;
};

/**
* As updates come in from the possibly remote `notifierP`, update
* the local `updater`. Since the updates come from a notifier, they
* are lossy, i.e., once a more recent state can be reported, less recent
* states are assumed irrelevant and dropped.
*
* @template T
* @param {Partial<Updater<T>>} updater
* @param {PromiseOrNot<Notifier<T>>} notifierP
* @returns {Promise<undefined>}
*/
export const updateFromNotifier = (updater, notifierP) =>
updateFromIterable(updater, makeAsyncIterableFromNotifier(notifierP));
Loading

0 comments on commit b67d21a

Please sign in to comment.