Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduces write/read and correct stream type transforming #33

Merged
merged 6 commits into from
Nov 23, 2021

Conversation

f1ames
Copy link
Contributor

@f1ames f1ames commented Nov 18, 2021

This PR covers two closely related features:

  • Introduce .write() and .read() methods.
  • Make sure stream transforms results in correctly typed stream instances.

This PR is based on task/simplify-ifca-drain branch which does a bit of a cleanup in IFCA class so you may want to look there as well.

Also, please accept my apologies for adding multiple changes at once (in single commits and this PR), but since the whole PR is logically a single thing it was really hard to separate into smaller, reasonable pieces.

Write, read and related methods

Assumptions

The .write() and .read() methods are for more manual stream data control, they allow writing and reading directly to/from any given stream. This also means there should be an ability to end the stream manually and control it's flow (pause/resume). And so also .end(), .pause() and .resume() methods were introduced.

When it comes to public API one of the most important things to consider is when one can write/read to/from a stream instance. As transform functions (like .map(), .filter(), .batch(), etc) always returns a new instance of a stream, it's important to know which stream instance can be written to and to which instance one can write.

After some brainstorming the assumptions are as below. Considering the following code:

const stream1 = new DataStream<number>({ maxParallel });
const stream2 = stream1.map(chunk => `foo${ chunk }`);
const stream3 = stream2.batch(chunk => chunk.endsWith("1"));
const stream4 = stream3.map(chunk => chunk.join(""));

We have concluded that:

  • One should be able to write to any stream instance created through transforms (so stream1, stream2, stream3, stream4). This effectively writes to entire transformations chain so value will always got through all the transforms.
  • One should be able to end, pause and resume on any stream instance created through transforms (so stream1, stream2, stream3, stream4). This will have effect on entire streams chain.
  • One should be able to read only from the last instance created (so stream4). Reading from other instances should throw an error.

Implementation

The .write() and .read() methods are just a proxy to the same methods from stream internal IFCA instance and so exposing those methods were required. Since each written value should go through all the transforms, .write() always needs to write to first stream IFCA (entry point for all transformations) and .read() should read from the last stream IFCA (exit point after all transformations are applied). This required introducing some glueing mechanisms between related streams (and its internal IFCAs) and thus IFCAChain was introduced (see below).

Also since we would like to have a full control over each stream instance created through transforms, each transform returns new instances of a stream. Before, we would mutate instances if possible, however since it is the same object underneath it resulted in read/wrote permissions inconsistencies.

Transforming streams and its IN/OUT types

Assumptions

As basic stream class (and some derived will be too) is a generic one (DataStream<IN, OUT>) it means it consumes only values of IN type and produces values of OUT type only. This means any chainable transform (which returns instance of DataStream) like .map() or .batch() changes OUT type of this returned stream. This requires returning new instance,

Implementation

First of all, are transform methods were changed to always return a new stream instance. This is done through helper method .createChildStream() which allows subclasses to override it and have full control what type of instance is created (see #28).

Each transform creates a new stream instance, but internally the transform chain may be the same - it may be a single IFCA instance. OTOH ordering transforms (like .batch() of .flatMap()) requires creating new IFCA instance. And so multiple transforms may result in just a single IFCA internally or multiple connected IFCAs. This is again handled by IFCAChain class which manages internal IFCA connections.

Let's get back to transform types for a second. When it comes to input/output of a single transform:

  1. chunk -> chunk (e.g. .map())
  2. chunk -> maybe chunk (e.g. .filter())
  3. chunk -> 0 to many chunks (e.g. flatMap())

And transforms also may need to receive chunks in input order or not, so we have:

  1. Regular transforms (e.g. .map())
  2. "Ordering" transforms (e.g. .batch())

At the current implementation, IFCA internally can handle 1, 2, and 4. 3 and 5 needs to be handled outside of IFCA instance. This means that streams created by transform methods of type 1, 2, 4 will internally use the same IFCA instance and of type 3 and 5 will need to create new IFCA instance. This is important to understand new types of IFCA that are created and chained by 3 and 5 transform types. For example, .flatMap():

flatMap<NEW_OUT, ARGS extends any[] = []>(
    callback: TransformFunction<OUT, AnyIterable<NEW_OUT>, ARGS>,
    ...args: ARGS
): DataStream<IN, NEW_OUT> {
    this.ifcaChain.add<NEW_OUT, NEW_OUT>(this.options);
    const newStream = this.createChildStream<NEW_OUT>();
    ...
    return newStream;
}

does this.ifcaChain.add<NEW_OUT, NEW_OUT>(this.options);. Such methods will always create new IFCAs which initially have both IN and OUT type the same. This is related to a fact that its transform function is run outside of IFCA, so it writes its result to new IFCA instance (which is then used by newly created stream instance).

IFCA Chain

As mentioned above, IFCAChain is kind of a glue which is used to solve both things described above. As for how stream internal IFCA are changed through transforms, see 9254854.

Other changes

@f1ames f1ames force-pushed the feature/stream-transfroms branch 2 times, most recently from 4aecd56 to 1ca5621 Compare November 22, 2021 11:24
@f1ames f1ames changed the title Introduces write/read and correct stream type transforming (WIP) Introduces write/read and correct stream type transforming Nov 22, 2021
@f1ames f1ames marked this pull request as ready for review November 22, 2021 11:30
@scramjet-bot
Copy link

@jan-warchol
Copy link
Contributor

Let's get back to transform types for a second. When it comes to input/output of a single transform:
[...]
chunk -> 0 to many chunks (e.g. batch())

I think you meant flatmap()? batch() works the other way round, it takes 1 or more input chunks and produces 1 output chunk.

@f1ames
Copy link
Contributor Author

f1ames commented Nov 22, 2021

Let's get back to transform types for a second. When it comes to input/output of a single transform:
[...]
chunk -> 0 to many chunks (e.g. batch())

I think you meant flatmap()? batch() works the other way round, it takes 1 or more input chunks and produces 1 output chunk.

Yes, thanks for pointing that out, corrected 👍

@jan-warchol
Copy link
Contributor

could we extract at least commit "Simplify IFCA drain resolving; extract ProcessingQueue" from this PR? From what I understand it stands on its own and it would make reviewing the rest easier.

@f1ames f1ames changed the base branch from task/ifca-cleanup to task/simplify-ifca-drain November 22, 2021 14:55
@f1ames
Copy link
Contributor Author

f1ames commented Nov 22, 2021

could we extract at least commit "Simplify IFCA drain resolving; extract ProcessingQueue" from this PR? From what I understand it stands on its own and it would make reviewing the rest easier.

You got me there 😄 🙈 I've extracted changes related to mentioned commit to a separate PR.

@f1ames
Copy link
Contributor Author

f1ames commented Nov 22, 2021

After we talked F2F with @jan-warchol, we have concluded that OUT type on IFCAChain doesn't really makes sense - since it cannot be determined on the beginning and it doesn't change with transforms. So I have moved it to be determined at a read time, see 203ff56.

@f1ames
Copy link
Contributor Author

f1ames commented Nov 22, 2021

There is still one inconsistency with types that I see. Each ifcaInstance.addTransform returns IFCA with new OUT type. This means that, for example, calling .map() results in newly typed IFCA internally (which is ignored for now):

map<NEW_OUT, ARGS extends any[] = []>(
    callback: TransformFunction<OUT, NEW_OUT, ARGS>,
    ...args: ARGS
): DataStream<IN, NEW_OUT> {
    if (args?.length) {
        this.ifca.addTransform(this.injectArgsToCallback<NEW_OUT, typeof args>(callback, args));
    } else {
        this.ifca.addTransform(callback);
    }

    return this.createChildStream<NEW_OUT>();
}

Since we use mutation to change IFCA type, it simply works because this is the same instance underneath and also types are correct. But TBH, to make it code really correct (in the context of generic types) it should be something like:

map<NEW_OUT, ARGS extends any[] = []>(
    callback: TransformFunction<OUT, NEW_OUT, ARGS>,
    ...args: ARGS
): DataStream<IN, NEW_OUT> {
    let newIfca: IFCA<IN, NEW_OUT, any>;

    if (args?.length) {
        newIfca = this.ifca.addTransform(this.injectArgsToCallback<NEW_OUT, typeof args>(callback, args));
    } else {
        newIfca = this.ifca.addTransform(callback);
    }

    this.ifcaChain.swap(newIfca); // swap previous IFCA with newly type one

    return this.createChildStream<NEW_OUT>();
}

But after transpiling to JS, it would be changing this to this (so exactly same object) underneath. So this is additional function call which does really nothing, and I'm not really happy about that (neither am I about the current state).

Any ideas? 🤔

@f1ames
Copy link
Contributor Author

f1ames commented Nov 23, 2021

Regarding the latest comment - #33 (comment), I have slightly modified IFCAChain so it now exposes both .create() (renamed from add) and .add(ifca) methods. The second one is used to add mutated IFCA to a chain (and it internally checks if such instance was already added). See b161400.

I think, this irons out the last inconsistency that was there regarding to types changes handling.

Base automatically changed from task/simplify-ifca-drain to main November 23, 2021 11:59
The 'IFCAChain' class is an additional layer between stream instances
and IFCA instances. It allows for easier operating on IFCA instances
by related stream instances and also sharing such IFCA instances
between multiple stream instances.

It was introduced to allow writing to any intermediatte stream instance
created during transforms and to allow chaining and sharing IFCAs
between such instances.
Utilize 'IFCAChain' so  interemediatte streams with shared IFCA
can be created easier. It also allows writing/reading on any
intermediatte stream to use correct IFCA instance - first in the chain
for writing, last in the chain for reading.

Apart from that, new stream instance is always created for each
transform which allows to manage readability/writiability (and similar)
separately.
@f1ames
Copy link
Contributor Author

f1ames commented Nov 23, 2021

Rebased onto latest main.

@f1ames f1ames merged commit f3e8d46 into main Nov 23, 2021
@f1ames f1ames deleted the feature/stream-transfroms branch November 23, 2021 12:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants