This repository has been archived by the owner on Jul 26, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 404
/
external-secret.js
98 lines (81 loc) · 2.25 KB
/
external-secret.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
'use strict'
const JSONStream = require('json-stream')
/**
* Creates an FIFO queue which you can put to and take from.
* If theres nothing to take it will wait with resolving until
* something is put to the queue.
* @returns {Object} Queue instance with put and take methods
*/
function createEventQueue () {
const queuedEvents = []
const waitingResolvers = []
return {
take: () => queuedEvents.length > 0
? Promise.resolve(queuedEvents.shift())
: new Promise(resolve => waitingResolvers.push(resolve)),
put: (msg) => waitingResolvers.length > 0
? waitingResolvers.shift()(msg)
: queuedEvents.push(msg)
}
}
async function startWatcher ({
kubeClient,
customResourceManifest,
logger,
eventQueue
}) {
const deathQueue = createEventQueue()
try {
while (true) {
logger.debug('Starting watch stream')
const stream = kubeClient
.apis[customResourceManifest.spec.group]
.v1.watch[customResourceManifest.spec.names.plural]
.getStream()
const jsonStream = new JSONStream()
stream.pipe(jsonStream)
jsonStream.on('data', eventQueue.put)
jsonStream.on('error', (err) => {
logger.warn(err, 'Got error on stream')
deathQueue.put('ERROR')
})
jsonStream.on('end', () => {
deathQueue.put('END')
})
await deathQueue.take()
logger.debug('Stopping watch stream')
eventQueue.put({ type: 'DELETED_ALL' })
stream.abort()
}
} catch (err) {
logger.error(err, 'Watcher crashed')
}
}
/**
* Get a stream of external secret events. This implementation uses
* watch and yields as a stream of events.
* @param {Object} kubeClient - Client for interacting with kubernetes cluster.
* @param {Object} customResourceManifest - Custom resource manifest.
* @returns {Object} An async generator that yields externalsecret events.
*/
function getExternalSecretEvents ({
kubeClient,
customResourceManifest,
logger
}) {
return (async function * () {
const eventQueue = createEventQueue()
startWatcher({
kubeClient,
customResourceManifest,
logger,
eventQueue
})
while (true) {
yield await eventQueue.take()
}
}())
}
module.exports = {
getExternalSecretEvents
}