Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(loggly): support http/s bulk sending for batch logs #6212

Merged
merged 5 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 85 additions & 19 deletions apisix/plugins/loggly.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ local core = require("apisix.core")
local plugin = require("apisix.plugin")
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local log_util = require("apisix.utils.log-util")
local path = require("pl.path")
local http = require("resty.http")
local ngx = ngx
local tostring = tostring
local pairs = pairs
Expand Down Expand Up @@ -76,6 +78,12 @@ local schema = {
-- we prevent of having `tag=` prefix
pattern = "^(?!tag=)[ -~]*",
},
default = {"apisix"}
},
ssl_verify = {
-- applicable for https protocol
type = "boolean",
default = true
},
},
required = {"customer_token"}
Expand Down Expand Up @@ -104,8 +112,8 @@ local metadata_schema = {
protocol = {
type = "string",
default = defaults.protocol,
-- more methods coming soon
enum = {"syslog"}
-- in case of http and https, we use bulk endpoints
enum = {"syslog", "http", "https"}
},
timeout = {
type = "integer",
Expand Down Expand Up @@ -158,13 +166,17 @@ local function generate_log_message(conf, ctx)
entry = log_util.get_full_log(ngx, conf)
end

-- generate rfc5424 compliant syslog event
local json_str, err = core.json.encode(entry)
if not json_str then
core.log.error('error occurred while encoding the data: ', err)
return nil
end

if metadata.value.protocol ~= "syslog" then
return json_str
end

-- generate rfc5424 compliant syslog event
local timestamp = log_util.get_rfc3339_zulu_timestamp()
local taglist = {}
if conf.tags then
Expand All @@ -188,24 +200,13 @@ local function generate_log_message(conf, ctx)
end


local function send_data_over_udp(message)
local metadata = plugin.plugin_metadata(plugin_name)
core.log.info("metadata: ", core.json.delay_encode(metadata))

if not metadata then
core.log.info("received nil metadata: using metadata defaults: ",
core.json.delay_encode(defaults, true))
metadata = {}
metadata.value = defaults
end
local function send_data_over_udp(message, metadata)
local err_msg
local res = true
local sock = udp()
local host, port = metadata.value.host, metadata.value.port
sock:settimeout(metadata.value.timeout)

core.log.info("sending a batch logs to ", host, ":", port)

local ok, err = sock:setpeername(host, port)

if not ok then
Expand All @@ -232,13 +233,72 @@ local function send_data_over_udp(message)
end


local function send_bulk_over_http(message, metadata, conf)
local endpoint = path.join(metadata.value.host, "bulk", conf.customer_token, "tag", "bulk")
local has_prefix = core.string.has_prefix(metadata.value.host, "http")
if not has_prefix then
if metadata.value.protocol == "http" then
endpoint = "http://" .. endpoint
else
endpoint = "https://" .. endpoint
end
end

local httpc = http.new()
httpc:set_timeout(metadata.value.timeout)
local res, err = httpc:request_uri(endpoint, {
ssl_verify = conf.ssl_verify,
method = "POST",
body = message,
headers = {
["Content-Type"] = "application/json",
["X-LOGGLY-TAG"] = conf.tags
},
})

if not res then
return false, "failed to write log to loggly, " .. err
end

if res.status ~= 200 then
local body = core.json.decode(res.body)
if not body then
return false, "failed to send log to loggly, http status code: " .. res.status
else
return false, "failed to send log to loggly, http status code: " .. res.status
.. " response body: ".. res.body
end
end

return true
end


local handle_http_payload

local function handle_log(entries)
for i = 1, #entries do
local ok, err = send_data_over_udp(entries[i])
if not ok then
return false, err
local metadata = plugin.plugin_metadata(plugin_name)
core.log.info("metadata: ", core.json.delay_encode(metadata))

if not metadata then
core.log.info("received nil metadata: using metadata defaults: ",
core.json.delay_encode(defaults, true))
metadata = {}
metadata.value = defaults
end
core.log.info("sending a batch logs to ", metadata.value.host)

if metadata.value.protocol == "syslog" then
for i = 1, #entries do
local ok, err = send_data_over_udp(entries[i], metadata)
if not ok then
return false, err
end
end
else
return handle_http_payload(entries, metadata)
end

return true
end

Expand All @@ -249,6 +309,12 @@ function _M.log(conf, ctx)
return
end

handle_http_payload = function (entries, metadata)
-- loggly bulk endpoint expects entries concatenated in newline("\n")
local message = tab_concat(entries, "\n")
return send_bulk_over_http(message, metadata, conf)
end

if batch_processor_manager:add_entry(conf, log_data) then
return
end
Expand Down
13 changes: 11 additions & 2 deletions docs/en/latest/plugins/loggly.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ To generate a Customer Token, head over to `<your assigned subdomain>/loggly.com
| Name | Type | Requirement | Default | Valid | Description |
| ----------- | ------ | ----------- | ------- | ----- | ---------------------------------------------------------------------- |
| host | string | optional | "logs-01.loggly.com" | | The host address endpoint where logs are being sent. |
| port | integer | optional | 514 | | Loggly host port to make a connection request. |
| port | integer | optional | 514 | | Loggly port (for "syslog" protocol only) to make a connection request. |
| timeout | integer | optional | 5000 | | Loggly send data request timeout in milliseconds. |
| protocol | string | optional | "syslog" | | Protocol through which the logs are sent to Loggly from APISIX (currently supported protocol : "syslog") |
| protocol | string | optional | "syslog" | [ "syslog" , "http", "https" ] | Protocol through which the logs are sent to Loggly from APISIX (currently supported protocol : "syslog", "http", "https") |
| log_format | object | optional | nil | | Log format declared as key value pair in JSON format. Only string is supported in the `value` part. If the value starts with `$`, it means to get [`APISIX` variables](../apisix-variable.md) or [Nginx variable](http://nginx.org/en/docs/varindex.html). If it is nil or empty object, APISIX generates full log info. |

## How To Enable
Expand Down Expand Up @@ -100,6 +100,15 @@ curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f13
}'
```

We support [Syslog](https://documentation.solarwinds.com/en/success_center/loggly/content/admin/streaming-syslog-without-using-files.htm), [HTTP/S](https://documentation.solarwinds.com/en/success_center/loggly/content/admin/http-bulk-endpoint.htm) (bulk endpoint) protocols to send log events to Loggly. By default, in APISIX side, the protocol is set to "syslog". It lets you send RFC5424 compliant syslog events with some fine-grained control (log severity mapping based on upstream HTTP response code). But HTTP/S bulk endpoint is great to send larger batches of log events with faster transmission speed. If you wish to update it, just update the metadata

```shell
curl http://127.0.0.1:9080/apisix/admin/plugin_metadata/loggly -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"protocol": "http"
}'
```

### Minimal configuration

```shell
Expand Down
78 changes: 70 additions & 8 deletions t/plugin/loggly.t
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,26 @@ _EOC_
$block->set_value("extra_stream_config", $stream_config);
}

my $http_config = $block->http_config // <<_EOC_;

server {
listen 10420;

location /loggly/bulk/tok/tag/bulk {
content_by_lua_block {
ngx.req.read_body()
local data = ngx.req.get_body_data()
local headers = ngx.req.get_headers()
ngx.log(ngx.ERR, "loggly body: ", data)
ngx.log(ngx.ERR, "loggly tags: " .. require("toolkit.json").encode(headers["X-LOGGLY-TAG"]))
ngx.say("ok")
}
}
}
_EOC_

$block->set_value("http_config", $http_config);

if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
$block->set_value("no_error_log", "[error]");
}
Expand Down Expand Up @@ -214,8 +234,8 @@ opentracing
--- grep_error_log eval
qr/message received: .+?(?= \{)/
--- grep_error_log_out eval
qr/message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[test-token\@41058 ]
message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[test-token\@41058 ]/
qr/message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[test-token\@41058 tag="apisix"]
message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[test-token\@41058 tag="apisix"]/



Expand Down Expand Up @@ -318,7 +338,7 @@ opentracing
--- grep_error_log eval
qr/message received: .+?(?= \{)/
--- grep_error_log_out eval
qr/message received: <10>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[token-1\@41058 ]/
qr/message received: <10>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[token-1\@41058 tag="apisix"]/



Expand All @@ -341,7 +361,7 @@ opentracing
--- grep_error_log eval
qr/message received: [ -~]+/
--- grep_error_log_out eval
qr/message received: <10>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[token-1\@41058 ] \{"apisix_latency":[\d.]*,"client_ip":"127\.0\.0\.1","latency":[\d.]*,"request":\{"headers":\{"content-type":"application\/x-www-form-urlencoded","host":"127\.0\.0\.1:1984","user-agent":"lua-resty-http\/[\d.]* \(Lua\) ngx_lua\/[\d]*"\},"method":"GET","querystring":\{\},"size":[\d]+,"uri":"\/opentracing","url":"http:\/\/127\.0\.0\.1:1984\/opentracing"\},"response":\{"headers":\{"connection":"close","content-type":"text\/plain","server":"APISIX\/[\d.]+","transfer-encoding":"chunked"\},"size":[\d]*,"status":200\},"route_id":"1","server":\{"hostname":"[ -~]*","version":"[\d.]+"\},"service_id":"","start_time":[\d]*,"upstream":"127\.0\.0\.1:1982","upstream_latency":[\d]*\}/
qr/message received: <10>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[token-1\@41058 tag="apisix"] \{"apisix_latency":[\d.]*,"client_ip":"127\.0\.0\.1","latency":[\d.]*,"request":\{"headers":\{"content-type":"application\/x-www-form-urlencoded","host":"127\.0\.0\.1:1984","user-agent":"lua-resty-http\/[\d.]* \(Lua\) ngx_lua\/[\d]*"\},"method":"GET","querystring":\{\},"size":[\d]+,"uri":"\/opentracing","url":"http:\/\/127\.0\.0\.1:1984\/opentracing"\},"response":\{"headers":\{"connection":"close","content-type":"text\/plain","server":"APISIX\/[\d.]+","transfer-encoding":"chunked"\},"size":[\d]*,"status":200\},"route_id":"1","server":\{"hostname":"[ -~]*","version":"[\d.]+"\},"service_id":"","start_time":[\d]*,"upstream":"127\.0\.0\.1:1982","upstream_latency":[\d]*\}/



Expand Down Expand Up @@ -389,7 +409,7 @@ opentracing
--- grep_error_log eval
qr/message received: [ -~]+/
--- grep_error_log_out eval
qr/message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[tok\@41058 ] \{"apisix_latency":[\d.]*,"client_ip":"127\.0\.0\.1","latency":[\d.]*,"request":\{"headers":\{"content-type":"application\/x-www-form-urlencoded","host":"127\.0\.0\.1:1984","user-agent":"lua-resty-http\/[\d.]* \(Lua\) ngx_lua\/[\d]*"\},"method":"GET","querystring":\{\},"size":[\d]+,"uri":"\/opentracing","url":"http:\/\/127\.0\.0\.1:1984\/opentracing"\},"response":\{"body":"opentracing\\n","headers":\{"connection":"close","content-type":"text\/plain","server":"APISIX\/[\d.]+","transfer-encoding":"chunked"\},"size":[\d]*,"status":200\},"route_id":"1","server":\{"hostname":"[ -~]*","version":"[\d.]+"\},"service_id":"","start_time":[\d]*,"upstream":"127\.0\.0\.1:1982","upstream_latency":[\d]*\}/
qr/message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[tok\@41058 tag="apisix"] \{"apisix_latency":[\d.]*,"client_ip":"127\.0\.0\.1","latency":[\d.]*,"request":\{"headers":\{"content-type":"application\/x-www-form-urlencoded","host":"127\.0\.0\.1:1984","user-agent":"lua-resty-http\/[\d.]* \(Lua\) ngx_lua\/[\d]*"\},"method":"GET","querystring":\{\},"size":[\d]+,"uri":"\/opentracing","url":"http:\/\/127\.0\.0\.1:1984\/opentracing"\},"response":\{"body":"opentracing\\n","headers":\{"connection":"close","content-type":"text\/plain","server":"APISIX\/[\d.]+","transfer-encoding":"chunked"\},"size":[\d]*,"status":200\},"route_id":"1","server":\{"hostname":"[ -~]*","version":"[\d.]+"\},"service_id":"","start_time":[\d]*,"upstream":"127\.0\.0\.1:1982","upstream_latency":[\d]*\}/



Expand Down Expand Up @@ -442,7 +462,7 @@ opentracing
--- grep_error_log eval
qr/message received: [ -~]+/
--- grep_error_log_out eval
qr/message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[tok\@41058 ] \{"apisix_latency":[\d.]*,"client_ip":"127\.0\.0\.1","latency":[\d.]*,"request":\{"headers":\{"content-type":"application\/x-www-form-urlencoded","host":"127\.0\.0\.1:1984","user-agent":"lua-resty-http\/[\d.]* \(Lua\) ngx_lua\/[\d]*"\},"method":"GET","querystring":\{"bar":"bar"\},"size":[\d]+,"uri":"\/opentracing\?bar=bar","url":"http:\/\/127\.0\.0\.1:1984\/opentracing\?bar=bar"\},"response":\{"body":"opentracing\\n","headers":\{"connection":"close","content-type":"text\/plain","server":"APISIX\/[\d.]+","transfer-encoding":"chunked"\},"size":[\d]*,"status":200\},"route_id":"1","server":\{"hostname":"[ -~]*","version":"[\d.]+"\},"service_id":"","start_time":[\d]*,"upstream":"127\.0\.0\.1:1982","upstream_latency":[\d]*\}/
qr/message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[tok\@41058 tag="apisix"] \{"apisix_latency":[\d.]*,"client_ip":"127\.0\.0\.1","latency":[\d.]*,"request":\{"headers":\{"content-type":"application\/x-www-form-urlencoded","host":"127\.0\.0\.1:1984","user-agent":"lua-resty-http\/[\d.]* \(Lua\) ngx_lua\/[\d]*"\},"method":"GET","querystring":\{"bar":"bar"\},"size":[\d]+,"uri":"\/opentracing\?bar=bar","url":"http:\/\/127\.0\.0\.1:1984\/opentracing\?bar=bar"\},"response":\{"body":"opentracing\\n","headers":\{"connection":"close","content-type":"text\/plain","server":"APISIX\/[\d.]+","transfer-encoding":"chunked"\},"size":[\d]*,"status":200\},"route_id":"1","server":\{"hostname":"[ -~]*","version":"[\d.]+"\},"service_id":"","start_time":[\d]*,"upstream":"127\.0\.0\.1:1982","upstream_latency":[\d]*\}/



Expand All @@ -466,7 +486,7 @@ opentracing
--- grep_error_log eval
qr/message received: [ -~]+/
--- grep_error_log_out eval
qr/message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[tok\@41058 ] \{"apisix_latency":[\d.]*,"client_ip":"127\.0\.0\.1","latency":[\d.]*,"request":\{"headers":\{"content-type":"application\/x-www-form-urlencoded","host":"127\.0\.0\.1:1984","user-agent":"lua-resty-http\/[\d.]* \(Lua\) ngx_lua\/[\d]*"\},"method":"GET","querystring":\{"foo":"bar"\},"size":[\d]+,"uri":"\/opentracing\?foo=bar","url":"http:\/\/127\.0\.0\.1:1984\/opentracing\?foo=bar"\},"response":\{"headers":\{"connection":"close","content-type":"text\/plain","server":"APISIX\/[\d.]+","transfer-encoding":"chunked"\},"size":[\d]*,"status":200\},"route_id":"1","server":\{"hostname":"[ -~]*","version":"[\d.]+"\},"service_id":"","start_time":[\d]*,"upstream":"127\.0\.0\.1:1982","upstream_latency":[\d]*\}/
qr/message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[tok\@41058 tag="apisix"] \{"apisix_latency":[\d.]*,"client_ip":"127\.0\.0\.1","latency":[\d.]*,"request":\{"headers":\{"content-type":"application\/x-www-form-urlencoded","host":"127\.0\.0\.1:1984","user-agent":"lua-resty-http\/[\d.]* \(Lua\) ngx_lua\/[\d]*"\},"method":"GET","querystring":\{"foo":"bar"\},"size":[\d]+,"uri":"\/opentracing\?foo=bar","url":"http:\/\/127\.0\.0\.1:1984\/opentracing\?foo=bar"\},"response":\{"headers":\{"connection":"close","content-type":"text\/plain","server":"APISIX\/[\d.]+","transfer-encoding":"chunked"\},"size":[\d]*,"status":200\},"route_id":"1","server":\{"hostname":"[ -~]*","version":"[\d.]+"\},"service_id":"","start_time":[\d]*,"upstream":"127\.0\.0\.1:1982","upstream_latency":[\d]*\}/



Expand Down Expand Up @@ -509,4 +529,46 @@ opentracing
--- grep_error_log eval
qr/message received: [ -~]+/
--- grep_error_log_out eval
qr/message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[tok\@41058 ] \{"client":"[\d.]+","host":"[\d.]+","route_id":"1"\}/
qr/message received: <14>1 [\d\-T:.]+Z [\d.]+ apisix [\d]+ - \[tok\@41058 tag="apisix"] \{"client":"[\d.]+","host":"[\d.]+","route_id":"1"\}/



=== TEST 12: loggly http protocol
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/plugin_metadata/loggly',
ngx.HTTP_PUT,
{
host = ngx.var.server_addr .. ":10420/loggly",
protocol = "http",
log_format = {
["route_id"] = "$route_id",
}
}
)

if code >= 300 then
ngx.status = code
ngx.say("fail")
return
end
ngx.say(body)

local code, _, body = t("/opentracing", "GET")
if code >= 300 then
ngx.status = code
ngx.say("fail")
return
end
ngx.print(body)
}
}
--- wait: 2
--- response_body
passed
opentracing
--- error_log
loggly body: {"route_id":"1"}
loggly tags: "apisix"