-
Notifications
You must be signed in to change notification settings - Fork 1
/
buffer.lua
86 lines (66 loc) · 1.46 KB
/
buffer.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
local _M = {}
local skynet = require "skynet"
local lp = require "influxdb.lineproto"
local util = require "influxdb.util"
local str_gsub = string.gsub
local str_rep = string.rep
local str_sub = string.sub
local str_find = string.find
local str_fmt = string.format
local tbl_cat = table.concat
local floor = math.floor
local my_opts
local initted = false
local msg_cnt = 0
local msg_buf = {}
_M.version = "0.2"
local function _do_write(msg)
local proto = my_opts.proto
if proto == 'http' then
return util.write_http(msg, my_opts)
elseif proto == 'udp' then
return util.write_udp(msg)
else
return false, 'unknown proto'
end
end
function _M.clear()
msg_cnt = 0
msg_buf = {}
return true
end
function _M.buffer(data)
local influx_data = {
_measurement = lp.quote_measurement(data.measurement),
_tag_set = lp.build_tag_set(data.tags),
_field_set = lp.build_field_set(data.fields),
_stamp = os.time() * 1000
}
local msg = lp.build_line_proto_stmt(influx_data)
msg_cnt = msg_cnt + 1
msg_buf[msg_cnt] = msg
return true
end
function _M.flush()
local msg = tbl_cat(msg_buf, "\n")
_M.clear()
skynet.fork(_do_write, msg)
end
function _M.init(opts)
if (initted) then
return false, 'already initted'
end
local ok, err = util.validate_options(opts)
if not ok then
ERROR(err)
return false
end
my_opts = opts
initted = true
local proto = my_opts.proto
if proto == 'udp' then
util.init_udp(my_opts)
end
return true
end
return _M