Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 130 additions & 22 deletions apisix/healthcheck_manager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ local jp = require("jsonpath")
local config_util = require("apisix.core.config_util")

local _M = {}
local working_pool = {} -- resource_path -> {version = ver, checker = checker}
-- resource_path -> {version = ver, checker = checker, checks = checks}
local working_pool = {}
local waiting_pool = {} -- resource_path -> resource_ver

local DELAYED_CLEAR_TIMEOUT = 10
Expand All @@ -44,6 +45,33 @@ end
_M.get_healthchecker_name = get_healthchecker_name


-- Compute the desired set of health-check targets for an upstream config.
-- Returns an ordered array preserving up_conf.nodes order so that targets are
-- always added to a checker deterministically; each entry also carries a
-- "host:port:hostheader" key so the working set can be diffed cheaply against
-- a checker's current targets.
local function compute_targets(up_conf)
local host = up_conf.checks and up_conf.checks.active and up_conf.checks.active.host
local port = up_conf.checks and up_conf.checks.active and up_conf.checks.active.port
local up_hdr = up_conf.pass_host == "rewrite" and up_conf.upstream_host
local use_node_hdr = up_conf.pass_host == "node" or nil

local targets = {}
for _, node in ipairs(up_conf.nodes) do
local host_hdr = up_hdr or (use_node_hdr and node.domain) or nil
local target_port = port or node.port
targets[#targets + 1] = {
host = node.host,
port = target_port,
check_host = host,
host_hdr = host_hdr,
key = node.host .. ":" .. tostring(target_port) .. ":" .. tostring(host_hdr or ""),
}
end
return targets
end


local function create_checker(up_conf)
if not up_conf.checks then
return nil
Expand Down Expand Up @@ -71,25 +99,70 @@ local function create_checker(up_conf)
end

-- Add target nodes
local host = up_conf.checks and up_conf.checks.active and up_conf.checks.active.host
local port = up_conf.checks and up_conf.checks.active and up_conf.checks.active.port
local up_hdr = up_conf.pass_host == "rewrite" and up_conf.upstream_host
local use_node_hdr = up_conf.pass_host == "node" or nil

for _, node in ipairs(up_conf.nodes) do
local host_hdr = up_hdr or (use_node_hdr and node.domain)
local ok, err = checker:add_target(node.host, port or node.port, host,
true, host_hdr)
for _, target in ipairs(compute_targets(up_conf)) do
local ok, err = checker:add_target(target.host, target.port, target.check_host,
true, target.host_hdr)
if not ok then
core.log.error("failed to add healthcheck target: ", node.host, ":",
port or node.port, " err: ", err)
core.log.error("failed to add healthcheck target: ", target.host, ":",
target.port, " err: ", err)
end
end

return checker
end


-- Incrementally reconcile an existing checker's targets to match up_conf.
-- Used when only the upstream nodes changed but the `checks` config did not,
-- so the checker can keep running (and keep its accumulated health state)
-- instead of being destroyed and rebuilt.
local function sync_checker_targets(checker, up_conf)
-- index the desired targets by key so they can be diffed against current
local desired = {}
for _, target in ipairs(compute_targets(up_conf)) do
desired[target.key] = target
end

-- index current targets the same way as desired. Read the authoritative
-- shm target list (the per-worker checker.targets array can lag behind a
-- recent add/remove event).
if not healthcheck then
healthcheck = require("resty.healthcheck")
end
local current = {}
local target_list = healthcheck.get_target_list(get_healthchecker_name(up_conf),
healthcheck_shdict_name) or {}
for _, t in ipairs(target_list) do
-- target_list entries carry hostheader; map it back to our key shape
local key = t.ip .. ":" .. tostring(t.port) .. ":" .. tostring(t.hostheader or "")
current[key] = t
end

-- add targets that are desired but not present
for key, target in pairs(desired) do
if not current[key] then
local ok, err = checker:add_target(target.host, target.port, target.check_host,
true, target.host_hdr)
if not ok then
core.log.error("failed to add healthcheck target: ", target.host, ":",
target.port, " err: ", err)
end
end
end

-- remove targets that are present but no longer desired
for key, t in pairs(current) do
if not desired[key] then
local ok, err = checker:remove_target(t.ip, t.port, t.hostname)
if not ok then
core.log.error("failed to remove healthcheck target: ", t.ip, ":",
t.port, " err: ", err)
end
end
end
end


function _M.fetch_checker(resource_path, resource_ver)
local working_item = working_pool[resource_path]
if working_item and working_item.version == resource_ver then
Expand Down Expand Up @@ -130,10 +203,11 @@ function _M.fetch_node_status(checker, ip, port, hostname)
end


local function add_working_pool(resource_path, resource_ver, checker)
local function add_working_pool(resource_path, resource_ver, checker, checks)
working_pool[resource_path] = {
version = resource_ver,
checker = checker
checker = checker,
checks = checks,
}
end

Expand Down Expand Up @@ -202,22 +276,47 @@ local function timer_create_checker()
goto continue
end

-- if a checker exists then delete it before creating a new one
-- If a checker already exists and the `checks` config is unchanged
-- (only the upstream nodes changed), reconcile its targets in place
-- instead of destroying and rebuilding it. A destroy-and-rebuild
-- leaves `up_checker == nil` for the rebuild window, during which
-- traffic is routed to nodes already known to be unhealthy, and it
-- throws away the checker's accumulated health state.
local existing_checker = working_pool[resource_path]
if existing_checker then
existing_checker.checker:delayed_clear(DELAYED_CLEAR_TIMEOUT)
existing_checker.checker:stop()
core.log.info("releasing existing checker: ", tostring(existing_checker.checker),
" for resource: ", resource_path, " and version: ",
existing_checker.version)
if existing_checker and existing_checker.checker
and not existing_checker.checker.dead
and upstream.checks
and upstream.nodes and #upstream.nodes > 0
and core.table.deep_eq(existing_checker.checks, upstream.checks) then
sync_checker_targets(existing_checker.checker, upstream)
add_working_pool(resource_path, resource_ver, existing_checker.checker,
upstream.checks)
core.log.info("reused checker with incremental targets: ",
tostring(existing_checker.checker), " for resource: ",
resource_path, " and version: ", resource_ver)
goto continue
end

-- The checks config changed (or no checker exists): build a fresh
-- checker first, install it into the working pool, and only then
-- release the old one. Publishing the new checker before stopping
-- the old one ensures fetch_checker never observes a nil gap nor a
-- stopped checker for this resource.
local checker = create_checker(upstream)
if not checker then
goto continue
end
core.log.info("create new checker: ", tostring(checker), " for resource: ",
resource_path, " and version: ", resource_ver)
add_working_pool(resource_path, resource_ver, checker)
add_working_pool(resource_path, resource_ver, checker, upstream.checks)
if existing_checker then
existing_checker.checker.dead = true
existing_checker.checker:delayed_clear(DELAYED_CLEAR_TIMEOUT)
existing_checker.checker:stop()
core.log.info("releasing existing checker: ", tostring(existing_checker.checker),
" for resource: ", resource_path, " and version: ",
existing_checker.version)
end
end

::continue::
Expand Down Expand Up @@ -258,6 +357,15 @@ local function timer_working_pool_check()
" current version: ", current_ver, " item version: ", item.version)
if item.version == current_ver then
need_destroy = false
elseif upstream.checks and upstream.nodes and #upstream.nodes > 0
and core.table.deep_eq(item.checks, upstream.checks) then
-- Version changed but only because of the upstream nodes (and at
-- least one node remains); the `checks` config is identical. Keep
-- the checker alive so timer_create_checker can reconcile its
-- targets incrementally (avoids a destroy-and-rebuild nil window).
-- When the node count drops to 0 we deliberately fall through to
-- destroy the checker, matching the original behaviour.
need_destroy = false
end
end

Expand Down
7 changes: 3 additions & 4 deletions t/node/healthcheck-discovery.t
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ unhealthy TCP increment (1/2) for '127.0.0.1(0.0.0.0:1988)'



=== TEST 2: create new checker when nodes change
=== TEST 2: reuse checker incrementally when nodes change
--- apisix_yaml
routes:
-
Expand Down Expand Up @@ -150,11 +150,10 @@ routes:
}
}
--- grep_error_log eval
qr/(create new checker|releasing existing checker): table/
qr/(create new checker|reused checker with incremental targets): table/
--- grep_error_log_out
create new checker: table
releasing existing checker: table
create new checker: table
reused checker with incremental targets: table
--- timeout: 30


Expand Down
3 changes: 1 addition & 2 deletions t/node/healthcheck-dns.t
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,5 @@ First request status: 200
Second request status: 200
--- error_log
create new checker
releasing existing checker
create new checker
reused checker with incremental targets
--- timeout: 10
149 changes: 149 additions & 0 deletions t/node/healthcheck-incremental-update.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
use t::APISIX 'no_plan';

repeat_each(1);
log_level('warn');
no_root_location();
no_shuffle();

run_tests();

__DATA__

=== TEST 1: node-only change reuses the checker (no destroy-and-rebuild)
--- extra_init_worker_by_lua
local healthcheck = require("resty.healthcheck")
local new = healthcheck.new
healthcheck.new = function(...)
ngx.log(ngx.WARN, "create new checker")
local obj = new(...)
local clear = obj.delayed_clear
obj.delayed_clear = function(...)
ngx.log(ngx.WARN, "clear checker")
return clear(...)
end
return obj
end

--- config
location /t {
content_by_lua_block {
local checks = [[{
"active":{
"http_path":"/hello",
"timeout":1,
"type":"http",
"healthy":{ "interval":1, "successes":1 },
"unhealthy":{ "interval":1, "http_failures":2 }
}
}]]
local function cfg(nodes)
return [[{
"upstream": {
"nodes": ]] .. nodes .. [[,
"type": "roundrobin",
"checks": ]] .. checks .. [[
},
"uri": "/hello"
}]]
end

local t = require("lib.test_admin").test
-- initial config: one node -> creates the checker
assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT,
cfg('{"127.0.0.1:1980": 1}')) < 300)
t('/hello', ngx.HTTP_GET)
ngx.sleep(2)

-- node-only change (checks unchanged): should reconcile in place,
-- NOT create a new checker nor delayed_clear the old one
assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT,
cfg('{"127.0.0.1:1980": 1, "127.0.0.1:1981": 1}')) < 300)
t('/hello', ngx.HTTP_GET)
ngx.sleep(2)
ngx.say("done")
}
}

--- request
GET /t
--- response_body
done
--- no_error_log
clear checker
--- error_log
create new checker
--- timeout: 8



=== TEST 2: checks-config change still rebuilds the checker
--- extra_init_worker_by_lua
local healthcheck = require("resty.healthcheck")
local new = healthcheck.new
healthcheck.new = function(...)
local obj = new(...)
local clear = obj.delayed_clear
obj.delayed_clear = function(...)
ngx.log(ngx.WARN, "clear checker")
return clear(...)
end
return obj
end

--- config
location /t {
content_by_lua_block {
local function cfg(interval)
return [[{
"upstream": {
"nodes": {"127.0.0.1:1980": 1},
"type": "roundrobin",
"checks": {
"active":{
"http_path":"/hello",
"timeout":1,
"type":"http",
"healthy":{ "interval":]] .. interval .. [[, "successes":1 },
"unhealthy":{ "interval":1, "http_failures":2 }
}
}
},
"uri": "/hello"
}]]
end

local t = require("lib.test_admin").test
assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, cfg(1)) < 300)
t('/hello', ngx.HTTP_GET)
ngx.sleep(2)
-- change the checks config -> must rebuild (delayed_clear old checker)
assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, cfg(2)) < 300)
t('/hello', ngx.HTTP_GET)
ngx.sleep(2)
ngx.say("done")
}
}

--- request
GET /t
--- response_body
done
--- error_log
clear checker
--- timeout: 8
Loading
Loading