diff --git a/include/fluent-bit/flb_input_chunk.h b/include/fluent-bit/flb_input_chunk.h index 9f37c0a0bd5..f05ee96556a 100644 --- a/include/fluent-bit/flb_input_chunk.h +++ b/include/fluent-bit/flb_input_chunk.h @@ -119,16 +119,35 @@ int flb_input_chunk_write_at(void *data, off_t offset, int flb_input_chunk_append_obj(struct flb_input_instance *in, const char *tag, int tag_len, msgpack_object data); +/* + * Skip input-level metrics (records/bytes total) on this append. Used for + * internal route copies so conditional routing does not inflate the totals + * by the route count. + */ +#define FLB_INPUT_CHUNK_SKIP_INPUT_METRICS (1 << 1) + int flb_input_chunk_append_raw(struct flb_input_instance *in, int event_type, size_t records, const char *tag, size_t tag_len, const void *buf, size_t buf_size); +int flb_input_chunk_append_raw_flags(struct flb_input_instance *in, + int event_type, + int flags, + size_t records, + const char *tag, size_t tag_len, + const void *buf, size_t buf_size); int flb_input_chunk_append_raw_local(struct flb_input_instance *in, int event_type, size_t records, const char *tag, size_t tag_len, const void *buf, size_t buf_size); +int flb_input_chunk_append_raw_local_flags(struct flb_input_instance *in, + int event_type, + int flags, + size_t records, + const char *tag, size_t tag_len, + const void *buf, size_t buf_size); int flb_input_chunk_ring_buffer_enqueue(struct flb_input_instance *in, int event_type, size_t records, diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index 0a5f83ea7dc..54a5e793935 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -2651,6 +2651,7 @@ static int memrb_input_chunk_release_space(struct flb_input_instance *ins, /* Append a RAW MessagPack buffer to the input instance */ static int input_chunk_append_raw(struct flb_input_instance *in, int event_type, + int flags, size_t n_records, const char *tag, size_t tag_len, const void *buf, size_t buf_size) @@ -2793,7 +2794,8 @@ static int input_chunk_append_raw(struct flb_input_instance *in, /* Update 'input' metrics */ #ifdef FLB_HAVE_METRICS - if (ic->total_records > 0) { + /* skip for internal route copies; the batch is counted once elsewhere */ + if (ic->total_records > 0 && !(flags & FLB_INPUT_CHUNK_SKIP_INPUT_METRICS)) { /* timestamp */ ts = cfl_time_now(); @@ -3136,7 +3138,8 @@ void flb_input_chunk_ring_buffer_collector(struct flb_config *ctx, void *data) cr->buf_data, cr->buf_size); } else { - input_chunk_append_raw(cr->ins, cr->event_type, cr->records, + input_chunk_append_raw(cr->ins, cr->event_type, cr->flags, + cr->records, cr->tag, tag_len, cr->buf_data, cr->buf_size); } @@ -3154,6 +3157,17 @@ int flb_input_chunk_append_raw(struct flb_input_instance *in, size_t records, const char *tag, size_t tag_len, const void *buf, size_t buf_size) +{ + return flb_input_chunk_append_raw_flags(in, event_type, 0, records, + tag, tag_len, buf, buf_size); +} + +int flb_input_chunk_append_raw_flags(struct flb_input_instance *in, + int event_type, + int flags, + size_t records, + const char *tag, size_t tag_len, + const void *buf, size_t buf_size) { int ret; @@ -3162,12 +3176,12 @@ int flb_input_chunk_append_raw(struct flb_input_instance *in, * add the data reference to the ring buffer. */ if (flb_input_is_threaded(in)) { - ret = append_to_ring_buffer(in, event_type, 0, records, + ret = append_to_ring_buffer(in, event_type, flags, records, tag, tag_len, buf, buf_size); } else { - ret = input_chunk_append_raw(in, event_type, records, + ret = input_chunk_append_raw(in, event_type, flags, records, tag, tag_len, buf, buf_size); } @@ -3180,7 +3194,18 @@ int flb_input_chunk_append_raw_local(struct flb_input_instance *in, const char *tag, size_t tag_len, const void *buf, size_t buf_size) { - return input_chunk_append_raw(in, event_type, records, + return input_chunk_append_raw(in, event_type, 0, records, + tag, tag_len, buf, buf_size); +} + +int flb_input_chunk_append_raw_local_flags(struct flb_input_instance *in, + int event_type, + int flags, + size_t records, + const char *tag, size_t tag_len, + const void *buf, size_t buf_size) +{ + return input_chunk_append_raw(in, event_type, flags, records, tag, tag_len, buf, buf_size); } diff --git a/src/flb_input_log.c b/src/flb_input_log.c index 0919fca3cd4..8fa7377cee2 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -1135,7 +1135,8 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, size_t tag_len, const void *buf, size_t buf_size, - int local_append) + int local_append, + int *any_appended) { int ret; int appended; @@ -1397,9 +1398,11 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, continue; } + /* internal route copy: skip metrics, the batch is counted once */ if (local_append == FLB_TRUE) { - ret = flb_input_chunk_append_raw_local(ins, + ret = flb_input_chunk_append_raw_local_flags(ins, FLB_INPUT_LOGS, + FLB_INPUT_CHUNK_SKIP_INPUT_METRICS, payload->total_records, payload->tag, flb_sds_len(payload->tag), @@ -1407,8 +1410,9 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, payload->size); } else { - ret = flb_input_chunk_append_raw(ins, + ret = flb_input_chunk_append_raw_flags(ins, FLB_INPUT_LOGS, + FLB_INPUT_CHUNK_SKIP_INPUT_METRICS, payload->total_records, payload->tag, flb_sds_len(payload->tag), @@ -1445,6 +1449,14 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, return -1; } + /* + * Report acceptance so the caller counts the batch once, even if a + * later route copy in this loop fails. + */ + if (any_appended != NULL) { + *any_appended = FLB_TRUE; + } + appended++; } @@ -1473,6 +1485,9 @@ static int input_log_append_processed_internal(struct flb_input_instance *ins, int ret; int conditional_result; int conditional_handled = FLB_FALSE; + int conditional_active = FLB_FALSE; + int base_flags = 0; + int accepted = FLB_FALSE; size_t dummy = 0; const char *base_tag = tag; size_t base_tag_len = tag_len; @@ -1492,29 +1507,76 @@ static int input_log_append_processed_internal(struct flb_input_instance *ins, base_tag_len = strlen(base_tag); } - conditional_result = split_and_append_route_payloads(ins, records, tag, tag_len, - buf, buf_size, - local_append); - if (conditional_result < 0) { - return -1; + /* + * With conditional routing the batch is fanned out into per-route copies + * plus a base append, all skipping input metrics. It is accounted once + * below, after at least one append is accepted, so a fully rejected batch + * is not counted. The non-routed path keeps counting in + * flb_input_chunk_append_raw(). + */ + conditional_active = (cfl_list_size(&ins->routes_direct) > 0 && + input_has_conditional_routes(ins) == FLB_TRUE); + if (conditional_active == FLB_TRUE) { + base_flags = FLB_INPUT_CHUNK_SKIP_INPUT_METRICS; } + conditional_result = split_and_append_route_payloads(ins, records, tag, tag_len, + buf, buf_size, + local_append, &accepted); if (conditional_result > 0) { conditional_handled = FLB_TRUE; } /* - * Always call flb_input_chunk_append_raw to ensure non-conditional routes - * receive data even when conditional routes exist. The conditional routing - * should be additive, not exclusive. + * Base append carries the full buffer for non-conditional routes (routing + * is additive). It is skipped on split failure. */ - if (local_append == FLB_TRUE) { - ret = flb_input_chunk_append_raw_local(ins, FLB_INPUT_LOGS, records, - tag, tag_len, buf, buf_size); + if (conditional_result >= 0) { + if (local_append == FLB_TRUE) { + ret = flb_input_chunk_append_raw_local_flags(ins, FLB_INPUT_LOGS, + base_flags, records, + tag, tag_len, buf, buf_size); + } + else { + ret = flb_input_chunk_append_raw_flags(ins, FLB_INPUT_LOGS, + base_flags, records, + tag, tag_len, buf, buf_size); + } + if (ret == 0) { + accepted = FLB_TRUE; + } } else { - ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records, - tag, tag_len, buf, buf_size); + ret = -1; + } + +#ifdef FLB_HAVE_METRICS + /* count the batch once, now that at least one append was accepted */ + if (conditional_active == FLB_TRUE && accepted == FLB_TRUE && buf_size > 0) { + size_t effective_records = records; + + /* mirror the zero-record recovery in input_chunk_append_raw() */ + if (effective_records == 0) { + effective_records = flb_mp_count_log_records(buf, buf_size); + } + + if (effective_records > 0) { + uint64_t ts = cfl_time_now(); + char *name = (char *) flb_input_name(ins); + + cmt_counter_add(ins->cmt_records, ts, effective_records, 1, + (char *[]) {name}); + cmt_counter_add(ins->cmt_bytes, ts, buf_size, 1, + (char *[]) {name}); + + flb_metrics_sum(FLB_METRIC_N_RECORDS, effective_records, ins->metrics); + flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, ins->metrics); + } + } +#endif + + if (conditional_result < 0) { + return -1; } if (ret == 0 && conditional_handled == FLB_TRUE && base_tag) {