Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

manifold.stream/on-closed never called upstream #82

Closed
stijnopheide opened this issue May 27, 2016 · 5 comments
Closed

manifold.stream/on-closed never called upstream #82

stijnopheide opened this issue May 27, 2016 · 5 comments

Comments

@stijnopheide
Copy link

Manifold's map, filter, ... operators don't call the on-closed handler of the source immediately. Only when a value is put into it.

e.g.

(let [s1 (s/stream)
      _ (s/on-closed s1 #(println "closed s1"))
      out (s/map inc s1)]
  (s/close! out))
=> nil

If you put a value in s1 it does call on-closed

(let [s1 (s/stream)
      _ (s/on-closed s1 #(println "closed s1"))
      out (s/map inc s1)]
  (s/close! out)
  (s/put! s1 1))
closed s1
=> << true >>

I'm not sure if this is an issue, but I have 2 questions here:

  1. Why is on-closed not called immediately on the upstream source?
  2. Why is put! returning << true >>?

I came across this behaviour when investigating an issue I ran into with concat:

(let [s1 (s/stream)
      s2 (s/stream)
      _ (s/on-closed s1 #(println "closed s1"))
      _ (s/on-closed s2 #(println "closed s2"))
      out (s/concat (s/->source [s1 s2]))]
  (s/close! out)
  [(s/put! s1 1) (s/put! s2 2) (s/take! out)])
=> [<< true >> << true >> << nil >>]

Here, on-closed on the 2 sources is never called. Is this intended behaviour?

The use case is the following: watch a directory for new files by concat'ing a stream of currently existing files and a stream attached to a hawk watch. If the resulting stream gets closed, the hawk watch should be stopped (by registering it as a callback on on-closed for the hawk stream), but is never called.

@stijnopheide
Copy link
Author

Or maybe this should be better phrased as a question: How can I ensure everything upstream is being closed?

@ztellman
Copy link
Collaborator

This is the result of an bug in concat which closes the stream-of-streams, but not the individual streams themselves. I'm pushing a fix.

@ztellman
Copy link
Collaborator

To be clear, there is also the issue where on-closed isn't triggered immediately, but only once a put! or take! fails. This is a design decision, as it makes the lifecycle simpler to reason about in the implementation, but I think it is a consistent source of confusion for people using the library. I'll need to give this a bit more thought, but for now I plan to leave things as they are.

@danielcompton
Copy link
Member

I was planning on using the on-closed event on a stream to bubble back upwards to close a RethinkDB changefeed query (a query that sends down changes to a table). However I think the current behaviour would mean that the changefeed would only be closed after RethinkDB tried to send another change down the line. It's not the end of the world, as you only need one TCP connection to run many queries over, so you're just leaving the changefeed active on the server, but it is a little unclean.

@stijnopheide
Copy link
Author

Thanks for the concat fix @ztellman. On the issue when on-closed is triggered, I'm in the same situation as @danielcompton: just some extra resources being used, so not a big deal. However, what might be more troubling is that the put! immediately after a close! still returns successfully. Only then puts will start to return false.

(let [s1 (s/stream)
      _ (s/on-closed s1 #(println "closed s1"))
      out (s/map inc s1)]
  (s/close! out)
  [(s/put! s1 1) (s/put! s1 2)])
closed s1
=> [<< true >> << false >>]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants