Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: development of Loggly logging plugin #6113

Merged
merged 14 commits into from
Jan 26, 2022
243 changes: 243 additions & 0 deletions apisix/plugins/loggly.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local plugin = require("apisix.plugin")
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local log_util = require("apisix.utils.log-util")
local service_fetch = require("apisix.http.service").get
local ngx = ngx
local tostring = tostring
local tab_concat = table.concat
local udp = ngx.socket.udp

local plugin_name = "loggly"
local batch_processor_manager = bp_manager_mod.new(plugin_name)

local severity = {
EMEGR = 0, -- system is unusable
ALERT = 1, -- action must be taken immediately
CRIT = 2, -- critical conditions
ERR = 3, -- error conditions
WARNING = 4, -- warning conditions
NOTICE = 5, -- normal but significant condition
INFO = 6, -- informational
DEBUG = 7, -- debug-level messages
}

local schema = {
type = "object",
properties = {
customer_token = {type = "string"},
severity = {
type = "string",
default = "INFO",
enum = {"DEBUG", "INFO", "NOTICE", "WARNING", "ERR", "CRIT", "ALERT", "EMEGR",
bisakhmondal marked this conversation as resolved.
Show resolved Hide resolved
"debug", "info", "notice", "warning", "err", "crit", "alert", "emegr"},
description = "base severity log level",
},
include_req_body = {type = "boolean", default = false},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we refactor it to avoid adding the same group of fields to each logger?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And you miss to call log_util.collect_body(conf, ctx).

Copy link
Member Author

@bisakhmondal bisakhmondal Jan 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we refactor it to avoid adding the same group of fields to each logger?

Yes, similar to what we did to batchprocessor. Let's handle that in the next PR.

RE collect_body: Good catch. Adding, Thanks

include_resp_body = {type = "boolean", default = false},
include_resp_body_expr = {
type = "array",
minItems = 1,
items = {
type = "array"
}
},
tags = {
type = "array",
minItems = 1,
items = {
type = "string"
}
},
prefer_name = {type = "boolean", default = true}
},
required = {"customer_token"}
}


local defaults = {
host = "logs-01.loggly.com",
port = 514,
protocol = "syslog",
timeout = 5000
}

local metadata_schema = {
type = "object",
properties = {
host = {
type = "string",
default = defaults.host
},
port = {
type = "integer",
default = defaults.port
},
protocol = {
type = "string",
default = defaults.protocol,
-- more methods coming soon
enum = {"syslog"}
},
timeout = {
type = "integer",
minimum = 1,
default= defaults.timeout
},
log_format = {
type = "object",
}
}
}

local _M = {
version = 0.1,
priority = 411,
name = plugin_name,
schema = batch_processor_manager:wrap_schema(schema),
metadata_schema = metadata_schema
}

function _M.check_schema(conf, schema_type)
if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
end
return core.schema.check(schema, conf)
bisakhmondal marked this conversation as resolved.
Show resolved Hide resolved
end

local function generate_log_message(conf, ctx)
local metadata = plugin.plugin_metadata(plugin_name)
local entry

if metadata and metadata.value.log_format
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's refactor it in the next PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

and core.table.nkeys(metadata.value.log_format) > 0
then
entry = log_util.get_custom_format_log(ctx, metadata.value.log_format)
else
entry = log_util.get_full_log(ngx, conf)
end

if conf.prefer_name then
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we don't need this as we support customizing format?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

if entry.service_id and entry.service_id ~= "" then
local svc = service_fetch(entry.service_id)

if svc and svc.value.name ~= "" then
entry.service_id = svc.value.name
end
end

if ctx.route_name and ctx.route_name ~= "" then
entry.route_id = ctx.route_name
end
end

-- generate rfc5424 compliant syslog event
local json_str, err = core.json.encode(entry)
if not json_str then
core.log.error('error occurred while encoding the data: ', err)
return nil
end

local timestamp = log_util.get_rfc3339_zulu_timestamp()
local taglist = {}
if conf.tags then
for i = 1, #conf.tags do
if not conf.tags[i]:sub(1, 4) ~= "tag=" then
bisakhmondal marked this conversation as resolved.
Show resolved Hide resolved
core.table.insert(taglist, "tag=\"" .. conf.tags[i] .. "\"")
end
end
end
local message = {
-- facility LOG_USER - random user level message
"<".. tostring(8 + severity[conf.severity:upper()]) .. ">1", -- <PRIVAL>1
timestamp, -- timestamp
ctx.var.host or "-", -- hostname
"apisix", -- appname
ctx.var.pid, -- proc-id
"-", -- msgid
"[" .. conf.customer_token .. "@41058 " .. tab_concat(taglist, " ") .. "]",
json_str
}

return tab_concat(message, " ")
end

local function send_data_over_udp(message)
local metadata = plugin.plugin_metadata(plugin_name)
core.log.info("metadata: ", core.json.delay_encode(metadata))

if not metadata then
core.log.info("received nil metadata: using metadata defaults: ",
core.json.delay_encode(defaults, true))
metadata = {}
metadata.value = defaults
end
local err_msg
local res = true
local sock = udp()
local host, port = metadata.value.host, metadata.value.port
sock:settimeout(metadata.value.timeout)

core.log.info("sending a batch logs to ", host, ":", port)

local ok, err = sock:setpeername(host, port)

if not ok then
return false, "failed to connect to UDP server: host[" .. host
.. "] port[" .. tostring(port) .. "] err: " .. err
end

ok, err = sock:send(message)
if not ok then
res = false
err_msg = "failed to send data to UDP server: host[" .. host
.. "] port[" .. tostring(port) .. "] err:" .. err
end

ok, err = sock:close()
if not ok then
core.log.error("failed to close the UDP connection, host[",
host, "] port[", port, "] ", err)
end

return res, err_msg
end

local function handle_log(entries)
local ok, err
for i = 1, #entries do
ok, err = send_data_over_udp(entries[i])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to check the return value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if in case, the batch processor fails to process the entries (network error or something like that) the batch will be retried with a retry policy.
ref:

function execute_func(premature, self, batch)
if premature then
return
end
local ok, err = self.func(batch.entries, self.batch_max_size)
if not ok then
core.log.error("Batch Processor[", self.name,
"] failed to process entries: ", err)
batch.retry_count = batch.retry_count + 1
if batch.retry_count <= self.max_retry_count then
schedule_func_exec(self, self.retry_delay,
batch)
else
core.log.error("Batch Processor[", self.name,"] exceeded ",
"the max_retry_count[", batch.retry_count,
"] dropping the entries")
end
return
end
core.log.debug("Batch Processor[", self.name,
"] successfully processed the entries")
end

However, seeing your comment make me think that we can migrate to a fail-fast approach. It has other benefits.

    for i = 1, #entries do
        local ok, err = send_data_over_udp(entries[i])
        if not ok then
            return false, err
        end
    end
    return true

Thank you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look like we need to remove the sent entries? Otherwise, they will be retried.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's an issue here. The way batchprocessor is written, it is meant for consuming each batch in one go (transactional). But in the case of loggly, only bulk events can be sent through HTTP endpoints. So I can think of three potential resolution

  1. if err and i==1 -> then return false else true | if some entries has been sent that's ok. we are loosing the rest of the entries. [some entries will be dropped]
  2. use the current behaviour. where some entries might be retried again [duplocation].
  3. Update batch processor to have this fine-grained control
    function execute_func(premature, self, batch)
    if premature then
    return
    end
    local ok, err = self.func(batch.entries, self.batch_max_size)
    if not ok then
    core.log.error("Batch Processor[", self.name,
    "] failed to process entries: ", err)
    batch.retry_count = batch.retry_count + 1
    if batch.retry_count <= self.max_retry_count then
    schedule_func_exec(self, self.retry_delay,
    batch)
    else
    core.log.error("Batch Processor[", self.name,"] exceeded ",
    "the max_retry_count[", batch.retry_count,
    "] dropping the entries")
    end
    return
    end
    core.log.debug("Batch Processor[", self.name,
    "] successfully processed the entries")
    end

update

local ok, err = self.func(batch.entries, self.batch_max_size)

-- to this
local ok, err, n = self.func(batch.entries, self.batch_max_size)
 -- shorten self.entries based on n

which one do you think would be good? cc @spacewander

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here n denotes the number of processed entries, and the rest of the entries should be retried.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

end
return ok, err
end

function _M.log(conf, ctx)
local log_data = generate_log_message(conf, ctx)
if not log_data then
return
end

if batch_processor_manager:add_entry(conf, log_data) then
return
end

batch_processor_manager:add_entry_to_new_processor(conf, log_data, ctx, handle_log)
end

return _M
1 change: 1 addition & 0 deletions conf/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ plugins: # plugin list (sorted by priority)
- prometheus # priority: 500
- datadog # priority: 495
- echo # priority: 412
- loggly # priority: 411
- http-logger # priority: 410
- splunk-hec-logging # priority: 409
- skywalking-logger # priority: 408
Expand Down
Binary file added docs/assets/images/plugin/loggly-dashboard.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 2 additions & 1 deletion docs/en/latest/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@
"plugins/error-log-logger",
"plugins/sls-logger",
"plugins/google-cloud-logging",
"plugins/splunk-hec-logging"
"plugins/splunk-hec-logging",
"plugins/loggly"
]
},
{
Expand Down
Loading