-
-
Notifications
You must be signed in to change notification settings - Fork 13
/
index.js
122 lines (100 loc) · 2.97 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
'use strict'
const level = require('level')
const { pipeline: pump } = require('readable-stream')
const fs = require('fs')
const net = require('net')
const path = require('path')
const multileveldown = require('multileveldown')
module.exports = function (dir, opts = {}) {
const sockPath = process.platform === 'win32'
? '\\\\.\\pipe\\level-party\\' + path.resolve(dir)
: path.join(dir, 'level-party.sock')
opts = { retry: true, ...opts }
const client = multileveldown.client(opts)
client.open(tryConnect)
function tryConnect () {
if (!client.isOpen()) {
return
}
const socket = net.connect(sockPath)
let connected = false
socket.on('connect', function () {
connected = true
})
// Pass socket as the ref option so we dont hang the event loop.
pump(socket, client.createRpcStream({ ref: socket }), socket, function () {
// TODO: err?
if (!client.isOpen()) {
return
}
const db = level(dir, opts, onopen)
function onopen (err) {
if (err) {
// TODO: This can cause an invisible retry loop that never completes
// and leads to memory leaks.
// TODO: What errors should be retried?
if (connected) {
tryConnect()
} else {
setTimeout(tryConnect, 100)
}
return
}
fs.unlink(sockPath, function (err) {
if (err && err.code !== 'ENOENT') {
// TODO: Is this how to forward errors?
db.emit('error', err)
return
}
if (!client.isOpen()) {
return
}
const sockets = new Set()
const server = net.createServer(function (sock) {
if (sock.unref) {
sock.unref()
}
sockets.add(sock)
pump(sock, multileveldown.server(db), sock, function () {
// TODO: err?
sockets.delete(sock)
})
})
client.close = shutdown
client.emit('leader')
client.forward(db)
server.listen(sockPath, onlistening)
.on('error', function () {
// TODO: Is this how to forward errors?
// TODO: tryConnect()?
db.emit('error', err)
})
function shutdown (cb) {
for (const sock of sockets) {
sock.destroy()
}
server.close(() => {
db.close(cb)
})
}
function onlistening () {
if (server.unref) {
server.unref()
}
if (client.isFlushed()) {
return
}
const sock = net.connect(sockPath)
pump(sock, client.createRpcStream(), sock, function () {
// TODO: err?
})
client.once('flush', function () {
sock.destroy()
})
}
})
}
})
}
return client
}