diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index ba9eb76aa95..29c0c6c9d54 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -64,6 +64,10 @@ #define DEFAULT_MAX_HTTP_BUFFER_SIZE "10485760" static int fleet_cur_chdir(struct flb_in_calyptia_fleet_config *ctx); +static flb_sds_t fleet_gendir(struct flb_in_calyptia_fleet_config *ctx, time_t timestamp); +static int fleet_mkdir(struct flb_in_calyptia_fleet_config *ctx, time_t timestamp); +static time_t fleet_config_path_timestamp(struct flb_in_calyptia_fleet_config *ctx, + const char *path); static int get_calyptia_files(struct flb_in_calyptia_fleet_config *ctx, time_t timestamp); static void in_calyptia_fleet_destroy(struct flb_in_calyptia_fleet_config *ctx); @@ -333,6 +337,76 @@ static int rename_file(const char *old_path, const char *new_path) #endif } +static int copy_file_atomic(struct flb_in_calyptia_fleet_config *ctx, + const char *source_path, + const char *target_path) +{ + FILE *target_file; + flb_sds_t temp_path; + char *buf; + size_t size; + size_t written; + int ret; + int close_ret; + + ret = flb_utils_read_file((char *) source_path, &buf, &size); + if (ret != 0) { + flb_plg_warn(ctx->ins, "unable to read file for migration: %s", source_path); + return FLB_FALSE; + } + + temp_path = flb_sds_create_size(strlen(target_path) + 5); + if (temp_path == NULL) { + flb_free(buf); + return FLB_FALSE; + } + + if (flb_sds_printf(&temp_path, "%s.tmp", target_path) == NULL) { + flb_sds_destroy(temp_path); + flb_free(buf); + return FLB_FALSE; + } + + target_file = fopen(temp_path, "wb"); + if (target_file == NULL) { + flb_errno(); + flb_plg_warn(ctx->ins, "unable to open migration temp file: %s", temp_path); + flb_sds_destroy(temp_path); + flb_free(buf); + return FLB_FALSE; + } + + written = fwrite(buf, 1, size, target_file); + flb_free(buf); + + if (written != size) { + flb_plg_warn(ctx->ins, "truncated write during migration: %s", temp_path); + fclose(target_file); + unlink(temp_path); + flb_sds_destroy(temp_path); + return FLB_FALSE; + } + + close_ret = fclose(target_file); + if (close_ret != 0) { + flb_errno(); + flb_plg_warn(ctx->ins, "unable to close migration temp file: %s", temp_path); + unlink(temp_path); + flb_sds_destroy(temp_path); + return FLB_FALSE; + } + + if (rename_file(temp_path, target_path) != FLB_TRUE) { + flb_plg_warn(ctx->ins, "unable to move migrated config into place: %s", target_path); + unlink(temp_path); + flb_sds_destroy(temp_path); + return FLB_FALSE; + } + + flb_sds_destroy(temp_path); + return FLB_TRUE; +} + /** * Update ref_name's ref file to contain config_path. * If the ref file does not exist it is created, otherwise its contents are @@ -414,12 +488,103 @@ static int fleet_config_set_ref(struct flb_in_calyptia_fleet_config *ctx, return FLB_TRUE; } -static flb_sds_t time_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, time_t t) +flb_sds_t time_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, time_t t) { char s_last_modified[32]; + flb_sds_t cfgname; snprintf(s_last_modified, sizeof(s_last_modified), "%lld", (long long)t); - return fleet_config_filename(ctx, s_last_modified); + + if (ctx->fleet_config_legacy_format) { + return fleet_config_filename(ctx, s_last_modified); + } + + cfgname = fleet_gendir(ctx, t); + if (cfgname == NULL) { + return NULL; + } + + if (flb_sds_printf(&cfgname, PATH_SEPARATOR "config.yaml") == NULL) { + flb_sds_destroy(cfgname); + return NULL; + } + + return cfgname; +} + +static char *fleet_config_timestamp_name(struct flb_in_calyptia_fleet_config *ctx, char *path) +{ + char *fname; + char *sep; + + fname = basename(path); + + if (!ctx->fleet_config_legacy_format && strcmp(fname, "config.yaml") == 0) { + sep = strrchr(path, PATH_SEPARATOR[0]); + if (sep == NULL) { + return NULL; + } + + *sep = '\0'; + fname = basename(path); + } + + return fname; +} + +static time_t fleet_config_timestamp_value(struct flb_in_calyptia_fleet_config *ctx, + char *fname, + int allow_bare) +{ + char *end; + long long val; + + if (ctx == NULL || fname == NULL) { + return 0; + } + + errno = 0; + val = strtoll(fname, &end, 10); + if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN)) || + (errno != 0 && val == 0) || + val <= 0) { + return 0; + } + + if (ctx->fleet_config_legacy_format) { + if (strcmp(end, ".conf") == 0) { + return (time_t) val; + } + } + else if (strcmp(end, ".yaml") == 0 || (allow_bare == FLB_TRUE && *end == '\0')) { + return (time_t) val; + } + + return 0; +} + +static int is_owned_nested_fleet_config(struct flb_in_calyptia_fleet_config *ctx, + const char *path) +{ + char realname[CALYPTIA_MAX_DIR_SIZE] = {0}; + char *fname; + int ret; + + if (ctx == NULL || path == NULL || ctx->fleet_config_legacy_format) { + return FLB_FALSE; + } + + ret = snprintf(realname, sizeof(realname), "%s", path); + if (ret < 0 || (size_t) ret >= sizeof(realname)) { + return FLB_FALSE; + } + + fname = basename(realname); + if (strcmp(fname, "config.yaml") != 0) { + return FLB_FALSE; + } + + return fleet_config_path_timestamp(ctx, path) > 0 ? FLB_TRUE : FLB_FALSE; } static int is_new_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg) @@ -510,38 +675,170 @@ static int is_old_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct */ static time_t fleet_config_path_timestamp(struct flb_in_calyptia_fleet_config *ctx, const char *path) { + char realname[CALYPTIA_MAX_DIR_SIZE] = {0}; + flb_sds_t base_dir; char *fname; - char *end; - long long val; + char *sep; + int ret; + time_t timestamp; if (path == NULL || ctx == NULL) { return 0; } - fname = strrchr(path, PATH_SEPARATOR[0]); + ret = snprintf(realname, sizeof(realname), "%s", path); + if (ret < 0 || (size_t) ret >= sizeof(realname)) { + return 0; + } + + base_dir = generate_base_fleet_directory(ctx); + if (base_dir == NULL) { + return 0; + } - if (fname == NULL) { + timestamp = 0; + fname = basename(realname); + if (!ctx->fleet_config_legacy_format && strcmp(fname, "config.yaml") == 0) { + sep = strrchr(realname, PATH_SEPARATOR[0]); + if (sep == NULL) { + goto out; + } + *sep = '\0'; + + sep = strrchr(realname, PATH_SEPARATOR[0]); + if (sep == NULL) { + goto out; + } + fname = sep + 1; + *sep = '\0'; + } + else { + sep = strrchr(realname, PATH_SEPARATOR[0]); + if (sep == NULL) { + goto out; + } + fname = sep + 1; + *sep = '\0'; + } + + if (strcmp(realname, base_dir) == 0) { + timestamp = fleet_config_timestamp_value(ctx, fname, FLB_TRUE); + } + +out: + flb_sds_destroy(base_dir); + return timestamp; +} + +static time_t flat_yaml_fleet_config_timestamp(struct flb_in_calyptia_fleet_config *ctx, + const char *path) +{ + char realname[CALYPTIA_MAX_DIR_SIZE] = {0}; + flb_sds_t base_dir; + char *fname; + char *sep; + char *end; + long long val; + int ret; + + if (path == NULL || ctx == NULL || ctx->fleet_config_legacy_format) { return 0; } - fname++; + ret = snprintf(realname, sizeof(realname), "%s", path); + if (ret < 0 || (size_t) ret >= sizeof(realname)) { + return 0; + } + fname = basename(realname); errno = 0; val = strtoll(fname, &end, 10); - if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN)) || (errno != 0 && val == 0)) { + if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN)) || + (errno != 0 && val == 0) || + val <= 0 || + strcmp(end, ".yaml") != 0) { return 0; } - if (ctx->fleet_config_legacy_format) { - if (strcmp(end, ".conf") == 0) { - return (time_t)val; + sep = strrchr(realname, PATH_SEPARATOR[0]); + if (sep == NULL) { + return 0; + } + *sep = '\0'; + + base_dir = generate_base_fleet_directory(ctx); + if (base_dir == NULL) { + return 0; + } + + ret = strcmp(realname, base_dir) == 0 ? FLB_TRUE : FLB_FALSE; + flb_sds_destroy(base_dir); + + return ret == FLB_TRUE ? (time_t) val : 0; +} + +static int migrate_flat_yaml_fleet_config_ref(struct flb_in_calyptia_fleet_config *ctx, + const char *ref_name, + flb_sds_t *config_path) +{ + flb_sds_t migrated_path; + time_t timestamp; + struct stat st; + int copied; + int ref_updated; + + if (ctx == NULL || ref_name == NULL || config_path == NULL || *config_path == NULL) { + return FLB_TRUE; + } + + timestamp = flat_yaml_fleet_config_timestamp(ctx, *config_path); + if (timestamp <= 0) { + return FLB_TRUE; + } + + migrated_path = time_fleet_config_filename(ctx, timestamp); + if (migrated_path == NULL) { + return FLB_FALSE; + } + + copied = FLB_FALSE; + if (stat(migrated_path, &st) == 0) { + if (!S_ISREG(st.st_mode)) { + flb_plg_warn(ctx->ins, "migrated config path is not a regular file: %s", + migrated_path); + flb_sds_destroy(migrated_path); + return FLB_TRUE; } } - else if (strcmp(end, ".yaml") == 0) { - return (time_t)val; + else { + if (fleet_mkdir(ctx, timestamp) != 0) { + flb_plg_warn(ctx->ins, "unable to create migrated config directory"); + flb_sds_destroy(migrated_path); + return FLB_TRUE; + } + + if (copy_file_atomic(ctx, *config_path, migrated_path) == FLB_FALSE) { + flb_sds_destroy(migrated_path); + return FLB_TRUE; + } + copied = FLB_TRUE; } - return 0; + ref_updated = fleet_config_set_ref(ctx, ref_name, migrated_path); + if (ref_updated == FLB_FALSE) { + flb_plg_warn(ctx->ins, "unable to update migrated config reference: %s", ref_name); + } + else if (copied == FLB_TRUE) { + if (unlink(*config_path) != 0) { + flb_plg_debug(ctx->ins, "unable to delete migrated flat config: %s", *config_path); + } + } + + flb_plg_info(ctx->ins, "using migrated fleet config path: %s", migrated_path); + flb_sds_destroy(*config_path); + *config_path = migrated_path; + + return FLB_TRUE; } static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg) @@ -643,13 +940,22 @@ static int parse_config_name_timestamp(struct flb_in_calyptia_fleet_config *ctx, long timestamp; char realname[CALYPTIA_MAX_DIR_SIZE] = {0}; char *fname; + int ret; if (ctx == NULL || config_timestamp == NULL || cfgpath == NULL) { return FLB_FALSE; } - snprintf(realname, sizeof(realname), "%s", cfgpath); - fname = basename(realname); + ret = snprintf(realname, sizeof(realname), "%s", cfgpath); + if (ret < 0 || (size_t) ret >= sizeof(realname)) { + return FLB_FALSE; + } + + fname = fleet_config_timestamp_name(ctx, realname); + if (fname == NULL) { + return FLB_FALSE; + } + flb_plg_debug(ctx->ins, "parsing configuration timestamp from path: %s", fname); errno = 0; @@ -1117,7 +1423,7 @@ static int check_timestamp_is_newer(struct flb_in_calyptia_fleet_config *ctx, ti return FLB_FALSE; } - file_extension = ctx->fleet_config_legacy_format ? "*.conf" : "*.yaml"; + file_extension = ctx->fleet_config_legacy_format ? "*.conf" : "*"; if (flb_sds_printf(&glob_pattern, "%s" PATH_SEPARATOR "%s", base_dir, file_extension) == NULL) { flb_sds_destroy(glob_pattern); flb_sds_destroy(base_dir); @@ -1218,9 +1524,16 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, } else { flb_plg_info(ctx->ins, "creating config file with timestamp %lld", - (long long)last_modified); + (long long)last_modified); } } + + if (!ctx->fleet_config_legacy_format && fleet_mkdir(ctx, last_modified) != 0) { + flb_plg_error(ctx->ins, "unable to create fleet config directory"); + ret = -1; + goto client_error; + } + fname = time_fleet_config_filename(ctx, last_modified); } else { @@ -1430,7 +1743,8 @@ static struct cfl_array *read_glob_win(const char *path, struct cfl_array *list) ret = stat(buf, &st); - if (ret == 0 && (st.st_mode & S_IFMT) == S_IFREG) { + if (ret == 0 && ((st.st_mode & S_IFMT) == S_IFREG || + (st.st_mode & S_IFMT) == S_IFDIR)) { cfl_array_append_string(list, buf); } } while (FindNextFileA(hnd, &data) != 0); @@ -1507,7 +1821,9 @@ static int calyptia_config_delete_by_ref(struct flb_in_calyptia_fleet_config *ct { struct cfl_array *confs; flb_sds_t config_path; + flb_sds_t ref_filename; char *ext; + char *fname; int idx; struct stat entry_stat; const char *entry_path; @@ -1521,6 +1837,18 @@ static int calyptia_config_delete_by_ref(struct flb_in_calyptia_fleet_config *ct return FLB_FALSE; } + fname = strrchr(config_path, PATH_SEPARATOR[0]); + if (fname != NULL && is_owned_nested_fleet_config(ctx, config_path) == FLB_TRUE) { + *fname = '\0'; + flb_plg_info(ctx->ins, "deleting config directory: %s", config_path); + + if (delete_dir(config_path) == FLB_FALSE) { + flb_plg_warn(ctx->ins, "unable to delete config directory: %s", config_path); + } + + goto delete_ref; + } + /* Replace the extension with a glob (e.g. "/a/b.yaml" -> "/a/b*") */ ext = strrchr(config_path, '.'); if (ext == NULL) { @@ -1562,8 +1890,11 @@ static int calyptia_config_delete_by_ref(struct flb_in_calyptia_fleet_config *ct } } + cfl_array_destroy(confs); + +delete_ref: /* Delete the reference file itself */ - flb_sds_t ref_filename = fleet_config_ref_filename(ctx, ref_name); + ref_filename = fleet_config_ref_filename(ctx, ref_name); if (ref_filename != NULL) { flb_plg_info(ctx->ins, "deleting config ref file: %s", ref_filename); if (unlink(ref_filename) != 0) { @@ -1572,7 +1903,6 @@ static int calyptia_config_delete_by_ref(struct flb_in_calyptia_fleet_config *ct flb_sds_destroy(ref_filename); } - cfl_array_destroy(confs); flb_sds_destroy(config_path); return FLB_TRUE; } @@ -2178,11 +2508,15 @@ static int fleet_mkdir(struct flb_in_calyptia_fleet_config *ctx, time_t timestam { int ret = -1; flb_sds_t fleetcurdir; + struct stat st; fleetcurdir = fleet_gendir(ctx, timestamp); if (fleetcurdir != NULL) { - if (flb_utils_mkdir(fleetcurdir, 0700) == 0) { + if (stat(fleetcurdir, &st) == 0 && S_ISDIR(st.st_mode)) { + ret = 0; + } + else if (flb_utils_mkdir(fleetcurdir, 0700) == 0) { ret = 0; } flb_sds_destroy(fleetcurdir); @@ -2221,6 +2555,16 @@ static int load_fleet_config(struct flb_in_calyptia_fleet_config *ctx) config_path = fleet_config_deref(ctx, "cur"); if (config_path == NULL) { config_path = fleet_config_deref(ctx, "old"); + if (migrate_flat_yaml_fleet_config_ref(ctx, "old", &config_path) == FLB_FALSE) { + flb_sds_destroy(config_path); + return FLB_FALSE; + } + } + else { + if (migrate_flat_yaml_fleet_config_ref(ctx, "cur", &config_path) == FLB_FALSE) { + flb_sds_destroy(config_path); + return FLB_FALSE; + } } if (config_path != NULL) { diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.h b/plugins/in_calyptia_fleet/in_calyptia_fleet.h index 45a017a907e..44439b8f15f 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.h +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.h @@ -79,6 +79,7 @@ struct reload_ctx { }; flb_sds_t fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, char *fname); +flb_sds_t time_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, time_t t); #define legacy_new_fleet_config_filename(a) fleet_config_filename((a), "new") #define legacy_cur_fleet_config_filename(a) fleet_config_filename((a), "cur") diff --git a/tests/integration/scenarios/in_calyptia_fleet/tests/test_in_calyptia_fleet_001.py b/tests/integration/scenarios/in_calyptia_fleet/tests/test_in_calyptia_fleet_001.py new file mode 100644 index 00000000000..f8e16026664 --- /dev/null +++ b/tests/integration/scenarios/in_calyptia_fleet/tests/test_in_calyptia_fleet_001.py @@ -0,0 +1,666 @@ +import base64 +import json +import threading +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from urllib.parse import urlparse + +from utils.data_utils import read_file +from utils.network import find_available_port, wait_for_port_to_be_free +from utils.test_service import FluentBitTestService + + +FLEET_ID = "test-fleet" +PROJECT_ID = "test-project" +MACHINE_ID = "test-machine" +PARSER_FILE = "parser--linux.yaml" +LAST_MODIFIED = "Thu, 18 June 2026 17:00:00 GMT" +API_KEY = ( + base64.b64encode(json.dumps({"ProjectID": PROJECT_ID}).encode("utf-8")) + .decode("ascii") + .rstrip("=") + + ".signature" +) + +PARSER_CONFIG = """ +parsers: + - name: fleet_json + format: json +""".lstrip() + + +class FleetAPIHandler(BaseHTTPRequestHandler): + def do_GET(self): + parsed = urlparse(self.path) + self.server.requests.append( + { + "path": parsed.path, + "query": parsed.query, + "headers": dict(self.headers), + } + ) + + if parsed.path == "/v1/search": + self._send_response( + 200, + json.dumps([{"id": FLEET_ID}]), + "application/json", + ) + return + + if parsed.path == f"/v1/fleets/{FLEET_ID}/config": + self._send_response( + 200, + self.server.fleet_config, + "application/x-yaml", + {"Last-modified": self.server.last_modified}, + ) + return + + if parsed.path == f"/v1/fleets/{FLEET_ID}/files": + self._send_response( + 200, + json.dumps(self.server.fleet_files), + "application/json", + ) + return + + self._send_response(404, "not found", "text/plain") + + def log_message(self, format, *args): + return + + def _send_response(self, status, body, content_type, extra_headers=None): + body_bytes = body.encode("utf-8") + + self.send_response(status) + self.send_header("Content-Type", content_type) + self.send_header("Content-Length", str(len(body_bytes))) + + for key, value in (extra_headers or {}).items(): + self.send_header(key, value) + + self.end_headers() + self.wfile.write(body_bytes) + + +class FleetAPIServer: + def __init__(self, port, fleet_config, fleet_files=None): + self.port = port + self.httpd = ThreadingHTTPServer(("127.0.0.1", port), FleetAPIHandler) + self.httpd.fleet_config = fleet_config + self.httpd.fleet_files = fleet_files or [] + self.httpd.last_modified = LAST_MODIFIED + self.httpd.requests = [] + self.thread = threading.Thread(target=self.httpd.serve_forever, daemon=True) + + def __enter__(self): + self.thread.start() + return self + + def __exit__(self, exc_type, exc, tb): + self.httpd.shutdown() + self.httpd.server_close() + self.thread.join(timeout=5) + wait_for_port_to_be_free(self.port, timeout=5) + + @property + def requests(self): + return self.httpd.requests + + +def _yaml_string(value): + return json.dumps(str(value)) + + +def _fleet_base_dir(cache_dir): + return Path(cache_dir) / MACHINE_ID / FLEET_ID + + +def _fleet_files_payload(): + encoded = base64.b64encode(PARSER_CONFIG.encode("utf-8")).decode("ascii") + return [{"name": PARSER_FILE, "contents": encoded}] + + +def _bootstrap_config(cache_dir, api_port, *, legacy=False, interval_sec="3600"): + legacy_value = "on" if legacy else "off" + + return f""" +service: + flush: 1 + grace: 1 + log_level: info + http_server: on + http_port: ${{FLUENT_BIT_HTTP_MONITORING_PORT}} + +customs: + - name: calyptia + api_key: {API_KEY} + calyptia_host: 127.0.0.1 + calyptia_port: "{api_port}" + calyptia_tls: off + calyptia_tls.verify: off + fleet_name: {FLEET_ID} + machine_id: {MACHINE_ID} + fleet.config_dir: {_yaml_string(cache_dir)} + fleet_config_legacy_format: {legacy_value} + fleet.interval_sec: "{interval_sec}" + fleet.interval_nsec: "0" +""".lstrip() + + +def _custom_config(cache_dir, api_port, *, legacy=False, interval_sec="3600"): + legacy_value = "on" if legacy else "off" + + return f""" +customs: + - name: calyptia + api_key: {API_KEY} + calyptia_host: 127.0.0.1 + calyptia_port: "{api_port}" + calyptia_tls: off + calyptia_tls.verify: off + fleet_name: {FLEET_ID} + machine_id: {MACHINE_ID} + fleet.config_dir: {_yaml_string(cache_dir)} + fleet_config_legacy_format: {legacy_value} + fleet.interval_sec: "{interval_sec}" + fleet.interval_nsec: "0" +""".lstrip() + + +def _fleet_config( + cache_dir, + api_port, + marker, + *, + include_custom=False, + include_path=PARSER_FILE, + interval_sec="3600", +): + custom = ( + _custom_config(cache_dir, api_port, interval_sec=interval_sec) + "\n" + if include_custom else "" + ) + + return f""" +{custom} +service: + flush: 1 + grace: 1 + log_level: info + http_server: on + http_port: ${{FLUENT_BIT_HTTP_MONITORING_PORT}} + +includes: + - {_yaml_string(include_path)} + +pipeline: + inputs: + - name: dummy + tag: fleet.test + dummy: '{{"message":"{marker}"}}' + samples: 1 + rate: 1 + + outputs: + - name: stdout + match: '*' + format: json_lines +""".lstrip() + + +def _legacy_fleet_config(marker): + dummy = json.dumps({"message": marker}) + + return f""" +[INPUT] + Name dummy + Tag fleet.test + Dummy {dummy} + Samples 1 + Rate 1 + +[OUTPUT] + Name stdout + Match * + Format json_lines +""".lstrip() + + +def _write_bootstrap_config(tmp_path, cache_dir, api_port, *, legacy=False, interval_sec="3600"): + config_path = tmp_path / "bootstrap.yaml" + config_path.write_text( + _bootstrap_config(cache_dir, api_port, legacy=legacy, interval_sec=interval_sec), + encoding="utf-8", + ) + return config_path + + +def _write_old_cache(cache_dir, api_port, ref_name, marker, *, target_marker=None): + timestamp = 1234567890 + base_dir = _fleet_base_dir(cache_dir) + timestamp_dir = base_dir / str(timestamp) + flat_config = base_dir / f"{timestamp}.yaml" + nested_config = timestamp_dir / "config.yaml" + parser_config = timestamp_dir / PARSER_FILE + + timestamp_dir.mkdir(parents=True) + flat_config.write_text( + _fleet_config(cache_dir, api_port, marker, include_custom=True), + encoding="utf-8", + ) + parser_config.write_text(PARSER_CONFIG, encoding="utf-8") + + if target_marker is not None: + nested_config.write_text( + _fleet_config(cache_dir, api_port, target_marker, include_custom=True), + encoding="utf-8", + ) + + (base_dir / f"{ref_name}.ref").write_text(f"{flat_config}\n", encoding="utf-8") + + return { + "base_dir": base_dir, + "flat_config": flat_config, + "nested_config": nested_config, + "parser_config": parser_config, + "ref_file": base_dir / f"{ref_name}.ref", + } + + +def _write_nested_cache(cache_dir, api_port, ref_name, marker): + timestamp = 1234567890 + base_dir = _fleet_base_dir(cache_dir) + timestamp_dir = base_dir / str(timestamp) + nested_config = timestamp_dir / "config.yaml" + parser_config = timestamp_dir / PARSER_FILE + + timestamp_dir.mkdir(parents=True) + nested_config.write_text( + _fleet_config(cache_dir, api_port, marker, include_custom=True), + encoding="utf-8", + ) + parser_config.write_text(PARSER_CONFIG, encoding="utf-8") + (base_dir / f"{ref_name}.ref").write_text(f"{nested_config}\n", encoding="utf-8") + + return { + "base_dir": base_dir, + "nested_config": nested_config, + "parser_config": parser_config, + "ref_file": base_dir / f"{ref_name}.ref", + } + + +def _write_base_config_ref(cache_dir, api_port, ref_name, config_name, marker): + base_dir = _fleet_base_dir(cache_dir) + config_path = base_dir / config_name + parser_config = base_dir / PARSER_FILE + + base_dir.mkdir(parents=True) + config_path.write_text( + _fleet_config(cache_dir, api_port, marker, include_custom=True), + encoding="utf-8", + ) + parser_config.write_text(PARSER_CONFIG, encoding="utf-8") + (base_dir / f"{ref_name}.ref").write_text(f"{config_path}\n", encoding="utf-8") + + return { + "base_dir": base_dir, + "config_path": config_path, + "parser_config": parser_config, + "ref_file": base_dir / f"{ref_name}.ref", + } + + +def _write_external_config_ref(tmp_path, cache_dir, api_port, ref_name, marker): + base_dir = _fleet_base_dir(cache_dir) + external_dir = tmp_path / "external-config" + config_path = external_dir / "1234567890.yaml" + parser_config = external_dir / PARSER_FILE + + base_dir.mkdir(parents=True) + external_dir.mkdir() + config_path.write_text( + _fleet_config(cache_dir, api_port, marker, include_custom=True), + encoding="utf-8", + ) + parser_config.write_text(PARSER_CONFIG, encoding="utf-8") + (base_dir / f"{ref_name}.ref").write_text(f"{config_path}\n", encoding="utf-8") + + return { + "base_dir": base_dir, + "config_path": config_path, + "parser_config": parser_config, + "ref_file": base_dir / f"{ref_name}.ref", + } + + +def _write_external_nested_config_ref( + tmp_path, + cache_dir, + api_port, + ref_name, + marker, + *, + interval_sec="3600", +): + base_dir = _fleet_base_dir(cache_dir) + external_dir = tmp_path / "external-config" + config_path = external_dir / "config.yaml" + parser_config = external_dir / PARSER_FILE + sentinel = external_dir / "keep.txt" + + base_dir.mkdir(parents=True) + external_dir.mkdir() + config_path.write_text( + _fleet_config(cache_dir, api_port, marker, include_custom=True, interval_sec=interval_sec), + encoding="utf-8", + ) + parser_config.write_text(PARSER_CONFIG, encoding="utf-8") + sentinel.write_text("keep this file\n", encoding="utf-8") + (base_dir / f"{ref_name}.ref").write_text(f"{config_path}\n", encoding="utf-8") + + return { + "base_dir": base_dir, + "config_path": config_path, + "external_dir": external_dir, + "parser_config": parser_config, + "sentinel": sentinel, + "ref_file": base_dir / f"{ref_name}.ref", + } + + +def _write_blocked_migration_cache(cache_dir, api_port, ref_name, marker): + timestamp = 1234567890 + base_dir = _fleet_base_dir(cache_dir) + flat_config = base_dir / f"{timestamp}.yaml" + timestamp_blocker = base_dir / str(timestamp) + parser_config = base_dir / PARSER_FILE + + base_dir.mkdir(parents=True) + flat_config.write_text( + _fleet_config(cache_dir, api_port, marker, include_custom=True), + encoding="utf-8", + ) + parser_config.write_text(PARSER_CONFIG, encoding="utf-8") + timestamp_blocker.write_text("not a directory\n", encoding="utf-8") + (base_dir / f"{ref_name}.ref").write_text(f"{flat_config}\n", encoding="utf-8") + + return { + "base_dir": base_dir, + "flat_config": flat_config, + "timestamp_blocker": timestamp_blocker, + "parser_config": parser_config, + "ref_file": base_dir / f"{ref_name}.ref", + } + + +def _wait_for_log_contains(service, text, timeout=30): + def _read_matching_log(): + log_text = read_file(service.flb.log_file) + if text in log_text: + return log_text + return None + + return service.wait_for_condition( + _read_matching_log, + timeout=timeout, + interval=0.5, + description=f"log text {text!r}", + ) + + +def _assert_no_include_error(log_text): + assert "yaml error" not in log_text + assert f"including file '{PARSER_FILE}'" not in log_text + + +def _run_service(config_path, marker, timeout=30): + service = FluentBitTestService(str(config_path)) + + try: + service.start() + log_text = _wait_for_log_contains(service, marker, timeout=timeout) + finally: + service.stop() + + _assert_no_include_error(log_text) + return log_text + + +def _read_ref_path(ref_file): + return Path(ref_file.read_text(encoding="utf-8").strip()) + + +def test_fleet_bootstrap_numeric_config_yaml_path_fetches_immediately(tmp_path): + api_port = find_available_port() + cache_dir = tmp_path / "fleet-cache" + marker = "numeric-bootstrap-path-fetch-ok" + bootstrap_dir = tmp_path / "2026" + bootstrap_dir.mkdir() + config_path = bootstrap_dir / "config.yaml" + config_path.write_text(_bootstrap_config(cache_dir, api_port), encoding="utf-8") + fleet_config = _fleet_config(cache_dir, api_port, marker) + + with FleetAPIServer(api_port, fleet_config, _fleet_files_payload()): + log_text = _run_service(config_path, marker) + + nested_configs = list(_fleet_base_dir(cache_dir).glob("*/config.yaml")) + assert len(nested_configs) == 1 + assert _read_ref_path(_fleet_base_dir(cache_dir) / "cur.ref") == nested_configs[0] + assert marker in log_text + + +def test_fleet_yaml_download_places_config_next_to_fleet_files(tmp_path): + api_port = find_available_port() + cache_dir = tmp_path / "fleet-cache" + marker = "fresh-fleet-relative-include-ok" + config_path = _write_bootstrap_config(tmp_path, cache_dir, api_port) + fleet_config = _fleet_config(cache_dir, api_port, marker) + + with FleetAPIServer(api_port, fleet_config, _fleet_files_payload()) as api: + log_text = _run_service(config_path, marker) + + nested_configs = list(_fleet_base_dir(cache_dir).glob("*/config.yaml")) + assert len(nested_configs) == 1 + assert (nested_configs[0].parent / PARSER_FILE).is_file() + assert _read_ref_path(_fleet_base_dir(cache_dir) / "cur.ref") == nested_configs[0] + assert any(request["path"] == f"/v1/fleets/{FLEET_ID}/config" for request in api.requests) + assert any(request["path"] == f"/v1/fleets/{FLEET_ID}/files" for request in api.requests) + assert marker in log_text + + +def test_fleet_yaml_download_preserves_absolute_includes(tmp_path): + api_port = find_available_port() + cache_dir = tmp_path / "fleet-cache" + marker = "fresh-fleet-absolute-include-ok" + absolute_parser = tmp_path / PARSER_FILE + absolute_parser.write_text(PARSER_CONFIG, encoding="utf-8") + config_path = _write_bootstrap_config(tmp_path, cache_dir, api_port) + fleet_config = _fleet_config( + cache_dir, + api_port, + marker, + include_path=absolute_parser, + ) + + with FleetAPIServer(api_port, fleet_config) as api: + log_text = _run_service(config_path, marker) + + nested_configs = list(_fleet_base_dir(cache_dir).glob("*/config.yaml")) + assert len(nested_configs) == 1 + assert not (nested_configs[0].parent / PARSER_FILE).exists() + assert _read_ref_path(_fleet_base_dir(cache_dir) / "cur.ref") == nested_configs[0] + assert any(request["path"] == f"/v1/fleets/{FLEET_ID}/config" for request in api.requests) + assert marker in log_text + + +def test_fleet_legacy_download_keeps_flat_conf_ref(tmp_path): + api_port = find_available_port() + cache_dir = tmp_path / "fleet-cache" + marker = "legacy-fleet-flat-conf-ok" + config_path = _write_bootstrap_config(tmp_path, cache_dir, api_port, legacy=True) + + with FleetAPIServer(api_port, _legacy_fleet_config(marker)) as api: + log_text = _run_service(config_path, marker) + + base_dir = _fleet_base_dir(cache_dir) + flat_configs = [path for path in base_dir.glob("*.conf") if path.name != "header.conf"] + assert len(flat_configs) == 1 + assert not list(base_dir.glob("*/config.yaml")) + assert _read_ref_path(base_dir / "cur.ref") == flat_configs[0] + assert any(request["query"] == "format=ini&config_format=ini" for request in api.requests) + assert marker in log_text + + +def test_fleet_startup_migrates_flat_cur_ref_for_relative_includes(tmp_path): + api_port = find_available_port() + cache_dir = tmp_path / "fleet-cache" + marker = "cur-ref-migrated-relative-include-ok" + paths = _write_old_cache(cache_dir, api_port, "cur", marker) + config_path = _write_bootstrap_config(tmp_path, cache_dir, api_port) + + log_text = _run_service(config_path, marker) + + assert paths["nested_config"].is_file() + assert not paths["flat_config"].exists() + assert _read_ref_path(paths["ref_file"]) == paths["nested_config"] + assert marker in log_text + + +def test_fleet_startup_keeps_nested_cur_ref_for_relative_includes(tmp_path): + api_port = find_available_port() + cache_dir = tmp_path / "fleet-cache" + marker = "nested-cur-ref-relative-include-ok" + paths = _write_nested_cache(cache_dir, api_port, "cur", marker) + config_path = _write_bootstrap_config(tmp_path, cache_dir, api_port) + + log_text = _run_service(config_path, marker) + + assert paths["nested_config"].is_file() + assert _read_ref_path(paths["ref_file"]) == paths["nested_config"] + assert marker in log_text + + +def test_fleet_startup_migrates_flat_old_ref_fallback(tmp_path): + api_port = find_available_port() + cache_dir = tmp_path / "fleet-cache" + marker = "old-ref-migrated-relative-include-ok" + paths = _write_old_cache(cache_dir, api_port, "old", marker) + config_path = _write_bootstrap_config(tmp_path, cache_dir, api_port) + + log_text = _run_service(config_path, marker) + + assert paths["nested_config"].is_file() + assert not (paths["base_dir"] / "cur.ref").exists() + assert not paths["flat_config"].exists() + assert _read_ref_path(paths["ref_file"]) == paths["nested_config"] + assert marker in log_text + + +def test_fleet_startup_uses_existing_nested_config_when_ref_is_flat(tmp_path): + api_port = find_available_port() + cache_dir = tmp_path / "fleet-cache" + flat_marker = "flat-config-should-not-load" + nested_marker = "existing-nested-config-relative-include-ok" + paths = _write_old_cache( + cache_dir, + api_port, + "cur", + flat_marker, + target_marker=nested_marker, + ) + config_path = _write_bootstrap_config(tmp_path, cache_dir, api_port) + + log_text = _run_service(config_path, nested_marker) + + assert paths["nested_config"].is_file() + assert paths["flat_config"].exists() + assert _read_ref_path(paths["ref_file"]) == paths["nested_config"] + assert nested_marker in log_text + assert flat_marker not in log_text + + +def test_fleet_startup_keeps_external_flat_yaml_ref(tmp_path): + api_port = find_available_port() + cache_dir = tmp_path / "fleet-cache" + marker = "external-flat-ref-relative-include-ok" + paths = _write_external_config_ref(tmp_path, cache_dir, api_port, "cur", marker) + config_path = _write_bootstrap_config(tmp_path, cache_dir, api_port) + + log_text = _run_service(config_path, marker) + + assert paths["config_path"].is_file() + assert paths["parser_config"].is_file() + assert not (paths["base_dir"] / "1234567890" / "config.yaml").exists() + assert _read_ref_path(paths["ref_file"]) == paths["config_path"] + assert marker in log_text + + +def test_fleet_update_keeps_external_config_yaml_parent(tmp_path): + api_port = find_available_port() + cache_dir = tmp_path / "fleet-cache" + old_marker = "external-nested-config-initial-ok" + new_marker = "external-nested-config-update-ok" + paths = _write_external_nested_config_ref( + tmp_path, + cache_dir, + api_port, + "cur", + old_marker, + interval_sec="1", + ) + config_path = _write_bootstrap_config(tmp_path, cache_dir, api_port, interval_sec="1") + fleet_config = _fleet_config(cache_dir, api_port, new_marker) + service = FluentBitTestService(str(config_path)) + + with FleetAPIServer(api_port, fleet_config, _fleet_files_payload()): + try: + service.start() + log_text = _wait_for_log_contains(service, new_marker, timeout=45) + service.wait_for_condition( + lambda: not (paths["base_dir"] / "old.ref").exists(), + timeout=45, + interval=0.5, + description="old fleet ref cleanup", + ) + finally: + service.stop() + + _assert_no_include_error(log_text) + assert paths["external_dir"].is_dir() + assert paths["sentinel"].is_file() + assert paths["parser_config"].is_file() + assert new_marker in log_text + + +def test_fleet_startup_keeps_non_timestamp_yaml_ref(tmp_path): + api_port = find_available_port() + cache_dir = tmp_path / "fleet-cache" + marker = "manual-yaml-ref-relative-include-ok" + paths = _write_base_config_ref(cache_dir, api_port, "cur", "manual.yaml", marker) + config_path = _write_bootstrap_config(tmp_path, cache_dir, api_port) + + log_text = _run_service(config_path, marker) + + assert paths["config_path"].is_file() + assert paths["parser_config"].is_file() + assert not (paths["base_dir"] / "manual" / "config.yaml").exists() + assert _read_ref_path(paths["ref_file"]) == paths["config_path"] + assert marker in log_text + + +def test_fleet_startup_keeps_flat_ref_when_migration_target_is_blocked(tmp_path): + api_port = find_available_port() + cache_dir = tmp_path / "fleet-cache" + marker = "blocked-migration-flat-ref-relative-include-ok" + paths = _write_blocked_migration_cache(cache_dir, api_port, "cur", marker) + config_path = _write_bootstrap_config(tmp_path, cache_dir, api_port) + + log_text = _run_service(config_path, marker) + + assert paths["flat_config"].is_file() + assert paths["timestamp_blocker"].is_file() + assert _read_ref_path(paths["ref_file"]) == paths["flat_config"] + assert marker in log_text diff --git a/tests/runtime/in_calyptia_fleet_test.c b/tests/runtime/in_calyptia_fleet_test.c index c9cd8cb1da2..56c03abf521 100644 --- a/tests/runtime/in_calyptia_fleet_test.c +++ b/tests/runtime/in_calyptia_fleet_test.c @@ -6,6 +6,7 @@ #include "../../plugins/in_calyptia_fleet/in_calyptia_fleet.h" flb_sds_t fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, char *fname); +flb_sds_t time_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, time_t t); int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx); /* Test context structure */ @@ -114,8 +115,45 @@ static void test_in_fleet_format() { cleanup_test_context(t_ctx); } +static void test_in_fleet_timestamped_config_format(void) +{ + struct test_context *t_ctx; + char expectedValue[CALYPTIA_MAX_DIR_SIZE]; + flb_sds_t value; + int ret; + + t_ctx = init_test_context(); + TEST_CHECK(t_ctx != NULL); + + ret = sprintf(expectedValue, "%s/%s/%s/123.conf", + FLEET_DEFAULT_CONFIG_DIR, t_ctx->ctx->machine_id, t_ctx->ctx->fleet_name); + TEST_CHECK(ret > 0); + + value = time_fleet_config_filename(t_ctx->ctx, 123); + TEST_CHECK(value != NULL); + TEST_MSG("time_fleet_config_filename expected=%s got=%s", expectedValue, value); + TEST_CHECK(value && strcmp(value, expectedValue) == 0); + flb_sds_destroy(value); + value = NULL; + + t_ctx->ctx->fleet_config_legacy_format = FLB_FALSE; + + ret = sprintf(expectedValue, "%s/%s/%s/123/config.yaml", + FLEET_DEFAULT_CONFIG_DIR, t_ctx->ctx->machine_id, t_ctx->ctx->fleet_name); + TEST_CHECK(ret > 0); + + value = time_fleet_config_filename(t_ctx->ctx, 123); + TEST_CHECK(value != NULL); + TEST_MSG("time_fleet_config_filename expected=%s got=%s", expectedValue, value); + TEST_CHECK(value && strcmp(value, expectedValue) == 0); + flb_sds_destroy(value); + + cleanup_test_context(t_ctx); +} + /* Define test list */ TEST_LIST = { {"in_calyptia_fleet_format", test_in_fleet_format}, + {"in_calyptia_fleet_timestamped_config_format", test_in_fleet_timestamped_config_format}, {NULL, NULL} -}; \ No newline at end of file +};