Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to write a transducer? #519

Open
episage opened this issue May 3, 2018 · 10 comments
Open

How to write a transducer? #519

episage opened this issue May 3, 2018 · 10 comments

Comments

@episage
Copy link

episage commented May 3, 2018

Hi, I want to implement Blip messaging protocol (https://github.com/couchbaselabs/BLIP-Cocoa/blob/master/Docs/BLIP%20Protocol.md, it's like TCP over WebSockets).

The data stream looks like this:
-a-b-c-d-e->
and the output are messages
-----M----->
One message consist of many data chunks.
It is specified by the protocol when the chunks are combined and outputted as a Message.

How can I implement something like this using most?
I guess it should be a transducer.
I worked with nodejs Transform stream and it worked perfectly.
What's the eqivalent of https://nodejs.org/api/stream.html#stream_class_stream_transform ?

I had an attempt to write a transducer using the following code copied from transducer-js but I have no idea how to proceed further:

var most = require('most')
var mns = require('most-node-streams');

function Filter(f, xform) {
    this.xform = xform;
    this.f = f;
}

Filter.prototype['@@transducer/init'] = function () {
    return this.xform['@@transducer/init']();
};

Filter.prototype['@@transducer/result'] = function (v) {
    return this.xform['@@transducer/result'](v);
};

Filter.prototype['@@transducer/step'] = function (res, input) {
    if (this.f(input)) {
        return this.xform['@@transducer/step'](res, input);
    }
    return res;
};

var x = mns
    .fromStream(sourceFile.pipe(csvParser))
    .transduce(Filter(true /* testing... */));

most.forEach(console.log, x);

The code basically throws an error that xf is not defined. It's not.
I want something simple like NodeJS .pipe. Look:

var rawCandlesStream = sourceFile
    .pipe(csvParser)
    .pipe(new StringArrayToCandleTransformer())
@episage episage changed the title How to write transducer? How to write a transducer? May 3, 2018
@davidchase
Copy link
Collaborator

Hi @episage the blip protocol sounds interesting couple of questions for you to help us out.

So it seems like you are trying to create a duplex like stream and not a transducer based on your above comment. Seems like a subject https://github.com/mostjs-community/subject could be more suitable.

A transducer is a composable and efficient way of transforming data without an intermediate step. Where a duplex or transform stream is one that has both properties of being readable and writable which seems what you want ?

Also out of personal curiosity if you got it to work with the Nodejs transform class why change it in favor most?

Let me know if I’m understanding you correctly :)

@episage
Copy link
Author

episage commented May 3, 2018

Hi @davidchase
I chose most because

  • it works in browser (NodeJS Transform stream does not)
  • it outperforms Highland and other Stream-related libs.

The Blip messaging protocol is two-way Stream, so Duplex stream is appropriate as you suggest.

However I would also like to try most in my other project (simpler use case) which is concerned about transforming Candle OHLC data (https://en.wikipedia.org/wiki/Open-high-low-close_chart) in CSV form (example below), into daily/weekly/etc candles. In this case, I only need to .pipe(..).pipe(..).pipe(..) like you would do with Unix | or with NodeJS TransformStream (example below).

Local time,Open,High,Low,Close,Volume
09.04.2018 23:40:01.000 GMT+0310,1.23205,1.23289,1.23184,1.23252,16424.760000000002
10.04.2018 03:00:00.000 GMT+0200,1.23253,1.23268,1.23066,1.23185,33297.33999999999
10.04.2018 07:00:00.000 GMT+0200,1.23185,1.23351,1.23027,1.23248,53227.63000000002
10.04.2018 11:00:00.000 GMT+0200,1.23248,1.23776,1.23176,1.23487,58090.08
10.04.2018 15:00:00.000 GMT+0200,1.23481,1.23679,1.23243,1.23401,65132.11
10.04.2018 19:00:00.000 GMT+0200,1.23403,1.23612,1.23399,1.23555,40928.79999999999
10.04.2018 23:00:00.000 GMT+0200,1.23554,1.23650,1.23503,1.23609,15862.190000000004
11.04.2018 03:00:00.000 GMT+0200,1.23609,1.23642,1.23526,1.23638,23260.639999999996
11.04.2018 07:00:00.000 GMT+0200,1.23638,1.23872,1.23621,1.23772,46406.53999999997
11.04.2018 11:00:00.000 GMT+0200,1.23770,1.23964,1.23622,1.23753,54517.72999999999
11.04.2018 15:00:00.000 GMT+0200,1.23753,1.23929,1.23682,1.23745,119133.14930000002
11.04.2018 19:00:00.000 GMT+0200,1.23745,1.23780,1.23470,1.23666,41480.29000000001
11.04.2018 23:00:00.000 GMT+0200,1.23664,1.23797,1.23636,1.23758,15883.460000000003
12.04.2018 03:00:00.000 GMT+0200,1.23758,1.23782,1.23569,1.23582,28334.51999999999
12.04.2018 07:00:00.000 GMT+0200,1.23583,1.23739,1.23450,1.23457,47950.840000000004
12.04.2018 11:00:00.000 GMT+0200,1.23458,1.23515,1.23120,1.23155,65715.01000000001
12.04.2018 15:00:00.000 GMT+0200,1.23154,1.23359,1.22993,1.23343,57357.40999999999
12.04.2018 19:00:00.000 GMT+0200,1.23343,1.23395,1.23246,1.23264,32618.57000000001
12.04.2018 23:00:00.000 GMT+0200,1.23263,1.23328,1.23246,1.23285,14083.900000000001

Here is an example that works in NodeJS:

var rawCandlesStream = sourceFile
    .pipe(csvParser)
    .pipe(new StringArrayToCandleTransformer())
// rawCandlesStream.pipe(new ObjectStreamToStringStream('CSV ')).pipe(process.stdout);

var ticksStream = rawCandlesStream
    .pipe(new CandleToTicksTransformer())
// ticksStream.pipe(new ObjectStreamToStringStream('    ')).pipe(process.stdout);

var h4Stream = ticksStream
    .pipe(new TicksToCandleTransformer(Timeframes.H4))
// h4Stream.pipe(new ObjectStreamToStringStream('H4: ')).pipe(process.stdout);

var csvStream = h4Stream
    .pipe(new CollectCandlesTransformer())
    .pipe(new CandleToStringArrayTransformer())
    .pipe(CsvStringifier());
csvStream.pipe(process.stdout);

I hope you get the idea.

I looked at https://github.com/mostjs-community/subject but to be honest I understand nothing from it. I have no FP background but the description: Returns an tuple containing a AttachSink and a Stream tells me literally nothing. In short, I have no idea how to plug it into the .pipe(..).pipe(..) architecture.

@episage
Copy link
Author

episage commented May 3, 2018

An even simpler approximation would be answer to question: "How to write a GZIP compressor with most?"

@episage
Copy link
Author

episage commented May 3, 2018

@briancavalier
Copy link
Member

Hi @episage. It's unfortunate that the word stream is quite overloaded. If I had it to do over again, I'd not use it to describe the data structure that most provides. I'm sorry for any confusion it's caused.

Most streams and node streams have different goals and intended use cases. For example, most streams are targeted primarily at discrete events (e.g. mouse clicks), and node streams are targeted primarily at chunked data. Both can be used for other purposes, of course, but you'll start feeling the tension as you get further from each's intended use case. They also have different APIs: pipe() is a node stream API, and it's not a goal of most to support it, just as until() and awaitPromises() are most APIs, and it's unlikely such things would be commonly useful for chunked data stream

I'd recommend looking for a chunked data stream abstraction that fits your use case and works in browsers. Perhaps there is a polyfill or other userland implementation of the w3c streams proposal that would be helpful for your use cases.

@episage
Copy link
Author

episage commented May 4, 2018

Um, I'm sorry to hear that :(

I expected all the streams to be created equal.
I wonder why most doesn't want Stream composition? via pipe or something similar?
I think it's very natural way of "streaming" if you come from Unix background. or any background, I mean if you have a stream of water and you pipe it through to another stream... It's very visual and easy to understand. on the contray to transducer.

Also, I read a topic on GH with argument about lodash, lodash/fp, most, ramda.
I have no idea what these people use that full-flown FP for if you can't process Stream data like I explained above, then what do you use it for? lol

I think maintainer should put a note on the front page. I definitely got lost in the Stream/FP/fantasy-land

@episage
Copy link
Author

episage commented May 4, 2018

@briancavalier You mention that most is used for discrete events. How about https://github.com/dominictarr/event-stream ?
What is the tension you are talking about?

@davidchase
Copy link
Collaborator

DominicTarr’s event stream is another wrapper over Nodejs streams. He has quite a few them (check out his pull-stream) as a lot of folks wanted a nicer api to deal with when using Nodejs streams.

In my experience tension comes into play when use a library like most of which has no way of handling back pressure nor really should and we use in place of Nodejs streams and try to use Nodejs streams for things other than data process

Also, I read a topic on GH with argument about lodash, lodash/fp, most, ramda.
I have no idea what these people use that full-flown FP for if you can't process Stream data like I explained above

Do you have a link? I’ve used Ramda with Nodejs streams and had no issues

@josgraha
Copy link

... How about https://github.com/dominictarr/event-stream ?
What is the tension you are talking about?

@episage what did you end up going with to bucket your tick data?

Regarding transducers, it's nothing crazy, pretty good description and demo by Brian Lonsdorf here
https://www.youtube.com/watch?v=JZSoPZUoR58

@episage
Copy link
Author

episage commented Oct 23, 2018

I ended up using https://www.timescale.com/ (postgresql plugin really) to bucked the tick data. Look at
https://blog.timescale.com/analyzing-ethereum-bitcoin-and-1200-cryptocurrencies-using-postgresql-3958b3662e51 for examples.
Much faster than processing ticks in node.

Thanks for the video link, I watched it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants