This repository has been archived by the owner on Apr 16, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
async_events.rb
93 lines (81 loc) · 2.65 KB
/
async_events.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
require 'faye/websocket'
require 'thread'
require 'json'
require 'erb'
# https://github.com/heroku-examples/ruby-websockets-chat-demo/blob/master/middlewares/chat_backend.rb
# https://github.com/curoverse/arvados/blob/master/services/api/lib/eventbus.rb
class AsyncEvents
KEEPALIVE_TIME = 1 # in seconds
CHANNEL = 'chatdemo'
RESET_CHANNEL = 'pg_restart'
def initialize(app)
@app = app
@clients = []
@mutex = Mutex.new
@bgthread = false
end
def call(env)
return @app.call(env) unless Faye::WebSocket.websocket?(env)
setup_listener
ws = Faye::WebSocket.new(env, nil, { ping: KEEPALIVE_TIME })
ws.on :open do |event|
p [:open, ws.object_id]
@clients << ws
end
ws.on :message do |event|
p [:message, event.data]
data = JSON.parse event.data
message = Message.create author: data['author'], body: data['body']
p data
end
ws.on :close do |event|
p [:close, ws.object_id, event.code, event.reason]
@clients.delete(ws)
ws = nil
end
# Return async Rack response
ws.rack_response
end
def setup_listener
# Start up thread to monitor the Postgres database, if none exists already.
@mutex.synchronize do
unless @bgthread
@bgthread = true
Thread.new do
# from http://stackoverflow.com/questions/16405520/postgres-listen-notify-rails
ActiveRecord::Base.connection_pool.with_connection do |connection|
conn = connection.instance_variable_get(:@connection)
begin
conn.async_exec "LISTEN #{RESET_CHANNEL}"
conn.async_exec "LISTEN #{CHANNEL}"
catch(:break_loop) do
loop do
conn.wait_for_notify do |channel, pid, payload|
p [:send, channel, payload]
# Break out of the loop if receive a restart (like on Rails reload)
throw :break_loop if channel == RESET_CHANNEL
@clients.each {|ws| ws.send(sanitize payload) }
end
end
end
rescue => error
p [:error, error]
ensure
# Don't want the connection to still be listening once we return
# it to the pool - could result in weird behavior for the next
# thread to check it out.
conn.async_exec "UNLISTEN *"
end
end
@bgthread = false
end
end
end
end
private
def sanitize(message)
json = JSON.parse(message)
json.each {|key, value| json[key] = ERB::Util.html_escape(value) }
JSON.generate(json)
end
end