-
Notifications
You must be signed in to change notification settings - Fork 1
/
worker-pool.scm
51 lines (42 loc) · 1.5 KB
/
worker-pool.scm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
;; worker-pools example, based on https://gobyexample.com/worker-pools
;; time csi -s worker-pool.scm tells us we spend 1 second doing a
;; 5-second job.
(import gochan
miscmacros
(only (chicken random) pseudo-random-integer)
srfi-18 srfi-1)
(define (info . args) (apply print (cons (current-thread) (cons " " args))))
(define (worker jobs results)
(let loop ()
(gochan-select
((jobs -> job) ;; no fail-flag here means we break (loop) when jobs is closed
(thread-sleep! (/ (pseudo-random-integer 1000) 1000))
(if (= 0 (pseudo-random-integer 100))
(info "crash!")
(gochan-send results 'my-result))
(loop))))
(info "worker exit"))
(define jobs (gochan 100)) ;; allow filling jobs queue
(define res (gochan 0)) ;; block workers until we get their result
;; dispatch worker threads
(repeat 5 (go (worker jobs res)))
;; queue jobs
(repeat* 100 (gochan-send jobs it))
(info "all jobs enqueued")
;; allow workers to exit cleanly (`ok` will be #f)
(gochan-close jobs)
;; show progress every 250ms
(define progress (gochan-tick 250))
;; don't hang forever if something goes wrong
(define timeout (gochan-after (+ 1000 (* (/ 100 5) 1000))))
(let loop ((done 0))
(if (< done 100)
(gochan-select
((res -> msg)
(loop (add1 done)))
((progress -> _)
(print* done " / " 100 " \r")
(loop done))
((timeout -> _)
(info "should have completed all jobs by now, sombody crashed :-(")))
(info "all jobs completed")))