Skip to content

Commit

Permalink
Don't pick tried endpoint & count the latest in ewma balancer
Browse files Browse the repository at this point in the history
fixes #6632
  • Loading branch information
spacewander committed Dec 18, 2020
1 parent f1124aa commit e118ebc
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 7 deletions.
51 changes: 44 additions & 7 deletions rootfs/etc/nginx/lua/balancer/ewma.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ local string = string
local tonumber = tonumber
local setmetatable = setmetatable
local string_format = string.format
local table_insert = table.insert
local ngx_log = ngx.log
local INFO = ngx.INFO

Expand Down Expand Up @@ -105,10 +106,15 @@ local function get_or_update_ewma(upstream, rtt, update)
end


local function get_upstream_name(upstream)
return upstream.address .. ":" .. upstream.port
end


local function score(upstream)
-- Original implementation used names
-- Endpoints don't have names, so passing in IP:Port as key instead
local upstream_name = upstream.address .. ":" .. upstream.port
local upstream_name = get_upstream_name(upstream)
return get_or_update_ewma(upstream_name, 0, false)
end

Expand All @@ -135,6 +141,7 @@ local function pick_and_score(peers, k)
lowest_score_index, lowest_score = i, new_score
end
end

return peers[lowest_score_index], lowest_score
end

Expand All @@ -146,7 +153,7 @@ local function calculate_slow_start_ewma(self)
local endpoints_count = 0

for _, endpoint in pairs(self.peers) do
local endpoint_string = endpoint.address .. ":" .. endpoint.port
local endpoint_string = get_upstream_name(endpoint)
local ewma = ngx.shared.balancer_ewma:get(endpoint_string)

if ewma then
Expand All @@ -170,20 +177,50 @@ function _M.balance(self)
if #peers > 1 then
local k = (#peers < PICK_SET_SIZE) and #peers or PICK_SET_SIZE
local peer_copy = util.deepcopy(peers)
endpoint, ewma_score = pick_and_score(peer_copy, k)

local tried_endpoints
if not ngx.ctx.balancer_ewma_tried_endpoints then
tried_endpoints = {}
ngx.ctx.balancer_ewma_tried_endpoints = tried_endpoints
else
tried_endpoints = ngx.ctx.balancer_ewma_tried_endpoints
end

local filtered_peers
for _, peer in ipairs(peer_copy) do
if not tried_endpoints[get_upstream_name(peer)] then
if not filtered_peers then
filtered_peers = {}
end
table_insert(filtered_peers, peer)
end
end

if not filtered_peers then
ngx.log(ngx.WARN, "all endpoints have been retried")
filtered_peers = peer_copy
end

if #filtered_peers > 1 then
endpoint, ewma_score = pick_and_score(filtered_peers, k)
else
endpoint, ewma_score = filtered_peers[1], score(filtered_peers[1])
end

tried_endpoints[get_upstream_name(endpoint)] = true
end

ngx.var.balancer_ewma_score = ewma_score

-- TODO(elvinefendi) move this processing to _M.sync
return endpoint.address .. ":" .. endpoint.port
return get_upstream_name(endpoint)
end

function _M.after_balance(_)
local response_time = tonumber(split.get_first_value(ngx.var.upstream_response_time)) or 0
local connect_time = tonumber(split.get_first_value(ngx.var.upstream_connect_time)) or 0
local response_time = tonumber(split.get_last_value(ngx.var.upstream_response_time)) or 0
local connect_time = tonumber(split.get_last_value(ngx.var.upstream_connect_time)) or 0
local rtt = connect_time + response_time
local upstream = split.get_first_value(ngx.var.upstream_addr)
local upstream = split.get_last_value(ngx.var.upstream_addr)

if util.is_blank(upstream) then
return
Expand Down
30 changes: 30 additions & 0 deletions rootfs/etc/nginx/lua/test/balancer/ewma_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ describe("Balancer ewma", function()
assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr))
assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr))
end)

it("updates EWMA stats with the latest result", function()
ngx.var = { upstream_addr = "10.10.10.1:8080, 10.10.10.2:8080", upstream_connect_time = "0.05, 0.02", upstream_response_time = "0.2, 0.1" }

instance:after_balance()

local weight = math.exp(-5 / 10)
local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight)

assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get("10.10.10.2:8080"))
assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get("10.10.10.2:8080"))
end)
end)

describe("balance()", function()
Expand Down Expand Up @@ -96,6 +108,24 @@ describe("Balancer ewma", function()
assert.equal("10.10.10.3:8080", peer)
assert.are.equals(0.16240233988393523723, ngx.var.balancer_ewma_score)
end)

it("doesn't pick the tried endpoint while retry", function()
local two_endpoints_backend = util.deepcopy(backend)
table.remove(two_endpoints_backend.endpoints, 2)
local two_endpoints_instance = balancer_ewma:new(two_endpoints_backend)

local peer = two_endpoints_instance:balance()
assert.equal("10.10.10.1:8080", peer)
end)

it("all the endpoints are tried, pick the one with lowest score", function()
local two_endpoints_backend = util.deepcopy(backend)
table.remove(two_endpoints_backend.endpoints, 2)
local two_endpoints_instance = balancer_ewma:new(two_endpoints_backend)

local peer = two_endpoints_instance:balance()
assert.equal("10.10.10.3:8080", peer)
end)
end)

describe("sync()", function()
Expand Down
15 changes: 15 additions & 0 deletions rootfs/etc/nginx/lua/test/util/split.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
local split = require("util.split")


describe("split", function()
it("get_last_value", function()
for _, case in ipairs({
{"127.0.0.1:26157 : 127.0.0.1:26158", "127.0.0.1:26158"},
{"127.0.0.1:26157, 127.0.0.1:26158", "127.0.0.1:26158"},
{"127.0.0.1:26158", "127.0.0.1:26158"},
}) do
local last = split.get_last_value(case[1])
assert.equal(case[2], last)
end
end)
end)
6 changes: 6 additions & 0 deletions rootfs/etc/nginx/lua/util/split.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ function _M.get_first_value(var)
return t[1]
end

function _M.get_last_value(var)
local t = _M.split_upstream_var(var) or {}
if #t == 0 then return nil end
return t[#t]
end

-- http://nginx.org/en/docs/http/ngx_http_upstream_module.html#example
-- CAVEAT: nginx is giving out : instead of , so the docs are wrong
-- 127.0.0.1:26157 : 127.0.0.1:26157 , ngx.var.upstream_addr
Expand Down

0 comments on commit e118ebc

Please sign in to comment.