-
Notifications
You must be signed in to change notification settings - Fork 0
/
concurrency.clj
131 lines (101 loc) · 3.59 KB
/
concurrency.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
(ns concurrency
(:require [clojure.core.async
:as a
:refer [>! <! >!! <!! go chan buffer close! thread
alt! alts! alts!! timeout go-loop]]))
;; Future
;; Long running tasks can be wrapped into a future
(defn long-process [name ms]
(println "Starting long process: " name)
(println "Sleeping for " ms "ms")
(Thread/sleep ms)
(println "Done sleeping"))
(defn infinite-process [name]
(println "Starting infinite process: " name)
(loop []
(println "To infinity and beyond")
(Thread/sleep 10000)
(recur)))
(def some-long-process-a
(future (long-process "long-process-A" 10000)))
(def some-infinite-process
(future (infinite-process "Infinity")))
;; In the case we want to stop the future prematurely we can use future-cancel
;; Using future-cancel to stop thread
(future-cancel some-long-process-a)
(future-cancel some-infinite-process)
;; Using promises to handle start / stop
(defn play-music! []
(let [p (promise)]
(future
(while (= (@p 0 ::timeout) ::timeout) ;; "::" is to ensure this var is in this namespace
(println "I am playing music")))
#(deliver p ::stop))) ;; Recall we are using a macro reader here
(let [stop-playing (play-music!)]
(println "Imagine you are on the train")
(println "Reached destination")
(stop-playing))
;; core.async way
;; chan - Channels, used to communicate messages
;; messages can be put (>!!) into channel or take (<!!) from channel
;; go - creates new process. Runs concurrently on a separate thread.
;; 1) when trying to put a message on a channel or take a message off of it, wait and do nothing until the put or take succeeds
;; 2) when the put or take succeeds, continue executing.
(def simple-channel (chan)) ;; Declares a channel called simple-channel
(go (println (<! simple-channel))) ;; When message is taken from simple channel, print it
(>!! simple-channel "Welcome to my simple channel") ;; Put message into simple-channel
;; buffers
;; Restricts buffer size of channel
(def simple-buffer-channel (chan 2)) ;; Declare with buffer of 2
(defn set-interval
[f time-in-ms]
(let [stop (chan)] ;; Declares a channel called stop
(go-loop []
(alt!
(timeout time-in-ms) (do (<!! (thread (f)))
(recur))
stop (println "Stopped synchronizing")))
stop))
(defn infinity-and-beyond []
(println "To infinity and beyond"))
(def job (set-interval #(println "Howdy") 2000))
(close! job)
;; thread
;; acts like a future, use when your process takes a long time before putting or taking
;; ^ creates new thread and execute process
(thread (println (<!! simple-channel)))
(>!! simple-channel "Hello my simple channel")
(go-loop [seconds [1 2 3]]
(println "Hello world: " seconds)
(timeout 1000))
(def simple-go (go
(while (not (<!! simple-channel))
(println "Inside channel")
(timeout 5000))
(println "stop go")))
(def simple-thread (thread
(while (not (<!! simple-channel))
(println "Inside thread")
(timeout 5000))
(println "stop thread")))
(>!! simple-channel true)
(>!! simple-channel false)
(defn start []
(let [ctl (chan 0)]
(go-loop []
(alt!
(timeout 2000) (do
(prn "tick")
(recur))
ctl (prn "Exit ")))
ctl))
(let [w (start)]
(Thread/sleep 7000)
(>!! w "stop"))
(def c (chan 1))
(go-loop []
(let [x (<! c)]
(println "Got a value in this loop:" x))
(recur))
(>!! c "hello")
(close! c)