diff --git a/rootfs/etc/nginx/lua/balancer/ewma.lua b/rootfs/etc/nginx/lua/balancer/ewma.lua index 8ab5923639..f7cbadf3ae 100644 --- a/rootfs/etc/nginx/lua/balancer/ewma.lua +++ b/rootfs/etc/nginx/lua/balancer/ewma.lua @@ -18,6 +18,7 @@ local string = string local tonumber = tonumber local setmetatable = setmetatable local string_format = string.format +local table_insert = table.insert local ngx_log = ngx.log local INFO = ngx.INFO @@ -105,10 +106,15 @@ local function get_or_update_ewma(upstream, rtt, update) end +local function get_upstream_name(upstream) + return upstream.address .. ":" .. upstream.port +end + + local function score(upstream) -- Original implementation used names -- Endpoints don't have names, so passing in IP:Port as key instead - local upstream_name = upstream.address .. ":" .. upstream.port + local upstream_name = get_upstream_name(upstream) return get_or_update_ewma(upstream_name, 0, false) end @@ -135,6 +141,7 @@ local function pick_and_score(peers, k) lowest_score_index, lowest_score = i, new_score end end + return peers[lowest_score_index], lowest_score end @@ -146,7 +153,7 @@ local function calculate_slow_start_ewma(self) local endpoints_count = 0 for _, endpoint in pairs(self.peers) do - local endpoint_string = endpoint.address .. ":" .. endpoint.port + local endpoint_string = get_upstream_name(endpoint) local ewma = ngx.shared.balancer_ewma:get(endpoint_string) if ewma then @@ -170,20 +177,50 @@ function _M.balance(self) if #peers > 1 then local k = (#peers < PICK_SET_SIZE) and #peers or PICK_SET_SIZE local peer_copy = util.deepcopy(peers) - endpoint, ewma_score = pick_and_score(peer_copy, k) + + local tried_endpoints + if not ngx.ctx.balancer_ewma_tried_endpoints then + tried_endpoints = {} + ngx.ctx.balancer_ewma_tried_endpoints = tried_endpoints + else + tried_endpoints = ngx.ctx.balancer_ewma_tried_endpoints + end + + local filtered_peers + for _, peer in ipairs(peer_copy) do + if not tried_endpoints[get_upstream_name(peer)] then + if not filtered_peers then + filtered_peers = {} + end + table_insert(filtered_peers, peer) + end + end + + if not filtered_peers then + ngx.log(ngx.WARN, "all endpoints have been retried") + filtered_peers = peer_copy + end + + if #filtered_peers > 1 then + endpoint, ewma_score = pick_and_score(filtered_peers, k) + else + endpoint, ewma_score = filtered_peers[1], score(filtered_peers[1]) + end + + tried_endpoints[get_upstream_name(endpoint)] = true end ngx.var.balancer_ewma_score = ewma_score -- TODO(elvinefendi) move this processing to _M.sync - return endpoint.address .. ":" .. endpoint.port + return get_upstream_name(endpoint) end function _M.after_balance(_) - local response_time = tonumber(split.get_first_value(ngx.var.upstream_response_time)) or 0 - local connect_time = tonumber(split.get_first_value(ngx.var.upstream_connect_time)) or 0 + local response_time = tonumber(split.get_last_value(ngx.var.upstream_response_time)) or 0 + local connect_time = tonumber(split.get_last_value(ngx.var.upstream_connect_time)) or 0 local rtt = connect_time + response_time - local upstream = split.get_first_value(ngx.var.upstream_addr) + local upstream = split.get_last_value(ngx.var.upstream_addr) if util.is_blank(upstream) then return diff --git a/rootfs/etc/nginx/lua/test/balancer/ewma_test.lua b/rootfs/etc/nginx/lua/test/balancer/ewma_test.lua index 47f4fa24ce..a54f6214ad 100644 --- a/rootfs/etc/nginx/lua/test/balancer/ewma_test.lua +++ b/rootfs/etc/nginx/lua/test/balancer/ewma_test.lua @@ -69,6 +69,18 @@ describe("Balancer ewma", function() assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr)) assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr)) end) + + it("updates EWMA stats with the latest result", function() + ngx.var = { upstream_addr = "10.10.10.1:8080, 10.10.10.2:8080", upstream_connect_time = "0.05, 0.02", upstream_response_time = "0.2, 0.1" } + + instance:after_balance() + + local weight = math.exp(-5 / 10) + local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight) + + assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get("10.10.10.2:8080")) + assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get("10.10.10.2:8080")) + end) end) describe("balance()", function() @@ -96,6 +108,24 @@ describe("Balancer ewma", function() assert.equal("10.10.10.3:8080", peer) assert.are.equals(0.16240233988393523723, ngx.var.balancer_ewma_score) end) + + it("doesn't pick the tried endpoint while retry", function() + local two_endpoints_backend = util.deepcopy(backend) + table.remove(two_endpoints_backend.endpoints, 2) + local two_endpoints_instance = balancer_ewma:new(two_endpoints_backend) + + local peer = two_endpoints_instance:balance() + assert.equal("10.10.10.1:8080", peer) + end) + + it("all the endpoints are tried, pick the one with lowest score", function() + local two_endpoints_backend = util.deepcopy(backend) + table.remove(two_endpoints_backend.endpoints, 2) + local two_endpoints_instance = balancer_ewma:new(two_endpoints_backend) + + local peer = two_endpoints_instance:balance() + assert.equal("10.10.10.3:8080", peer) + end) end) describe("sync()", function() diff --git a/rootfs/etc/nginx/lua/test/util/split.lua b/rootfs/etc/nginx/lua/test/util/split.lua new file mode 100644 index 0000000000..3d3a6d7e9a --- /dev/null +++ b/rootfs/etc/nginx/lua/test/util/split.lua @@ -0,0 +1,15 @@ +local split = require("util.split") + + +describe("split", function() + it("get_last_value", function() + for _, case in ipairs({ + {"127.0.0.1:26157 : 127.0.0.1:26158", "127.0.0.1:26158"}, + {"127.0.0.1:26157, 127.0.0.1:26158", "127.0.0.1:26158"}, + {"127.0.0.1:26158", "127.0.0.1:26158"}, + }) do + local last = split.get_last_value(case[1]) + assert.equal(case[2], last) + end + end) +end) diff --git a/rootfs/etc/nginx/lua/util/split.lua b/rootfs/etc/nginx/lua/util/split.lua index 090a7cf7f9..d5400ab577 100644 --- a/rootfs/etc/nginx/lua/util/split.lua +++ b/rootfs/etc/nginx/lua/util/split.lua @@ -18,6 +18,12 @@ function _M.get_first_value(var) return t[1] end +function _M.get_last_value(var) + local t = _M.split_upstream_var(var) or {} + if #t == 0 then return nil end + return t[#t] +end + -- http://nginx.org/en/docs/http/ngx_http_upstream_module.html#example -- CAVEAT: nginx is giving out : instead of , so the docs are wrong -- 127.0.0.1:26157 : 127.0.0.1:26157 , ngx.var.upstream_addr