Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 90 additions & 14 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,21 @@ static flb_sds_t es_get_id_value(struct flb_elasticsearch *ctx,
return tmp_str;
}

static int es_action_line_value_is_safe(const char *value, size_t len)
{
size_t i;
unsigned char c;

for (i = 0; i < len; i++) {
c = (unsigned char) value[i];
if (c == '\n' || c == '\r' || c == '"' || c == '\\' || c < 0x20) {
return FLB_FALSE;
}
}

return FLB_TRUE;
}

static int compose_index_header(struct flb_elasticsearch *ctx,
int es_index_custom_len,
char *logstash_index, size_t logstash_index_size,
Expand Down Expand Up @@ -293,6 +308,10 @@ static int elasticsearch_format(struct flb_config *config,
int len;
int map_size;
int index_len = 0;
int write_op_update = FLB_FALSE;
int write_op_upsert = FLB_FALSE;
int id_key_required = FLB_FALSE;
int id_key_safe;
size_t s = 0;
size_t off = 0;
size_t off_prev = 0;
Expand Down Expand Up @@ -345,10 +364,16 @@ static int elasticsearch_format(struct flb_config *config,
return -1;
}

/* Copy logstash prefix if logstash format is enabled */
if (ctx->logstash_format == FLB_TRUE) {
strncpy(logstash_index, ctx->logstash_prefix, sizeof(logstash_index));
logstash_index[sizeof(logstash_index) - 1] = '\0';
if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0) {
write_op_update = FLB_TRUE;
}
else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) {
write_op_upsert = FLB_TRUE;
}

if (ctx->ra_id_key && ctx->generate_id == FLB_FALSE &&
(write_op_update == FLB_TRUE || write_op_upsert == FLB_TRUE)) {
id_key_required = FLB_TRUE;
}

/*
Expand Down Expand Up @@ -403,21 +428,29 @@ static int elasticsearch_format(struct flb_config *config,
map = *log_event.body;
map_size = map.via.map.size;

/* Copy logstash prefix for the per-record fallback path. */
if (ctx->logstash_format == FLB_TRUE) {
strncpy(logstash_index, ctx->logstash_prefix, sizeof(logstash_index));
logstash_index[sizeof(logstash_index) - 1] = '\0';
}

es_index_custom_len = 0;
if (ctx->logstash_prefix_key) {
flb_sds_t v = flb_ra_translate(ctx->ra_prefix_key,
(char *) tag, tag_len,
map, NULL);
if (v) {
len = flb_sds_len(v);
if (len > 128) {
len = 128;
memcpy(logstash_index, v, 128);
}
else {
memcpy(logstash_index, v, len);
if (es_action_line_value_is_safe(v, len) == FLB_TRUE) {
if (len > 128) {
len = 128;
memcpy(logstash_index, v, 128);
}
else {
memcpy(logstash_index, v, len);
}
es_index_custom_len = len;
}
es_index_custom_len = len;
flb_sds_destroy(v);
}
}
Expand Down Expand Up @@ -538,7 +571,15 @@ static int elasticsearch_format(struct flb_config *config,
}
if (ctx->ra_id_key) {
id_key_str = es_get_id_value(ctx ,&map);
if (id_key_str) {
id_key_safe = FLB_FALSE;

if (id_key_str &&
es_action_line_value_is_safe(id_key_str,
flb_sds_len(id_key_str)) == FLB_TRUE) {
id_key_safe = FLB_TRUE;
}

if (id_key_safe == FLB_TRUE) {
if (ctx->suppress_type_name) {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
Expand All @@ -553,6 +594,35 @@ static int elasticsearch_format(struct flb_config *config,
ctx->es_action,
es_index, ctx->type, id_key_str);
}
}
else if (id_key_required == FLB_TRUE) {
flb_plg_warn(ctx->ins,
"skipping record with missing or unsafe Id_Key value");
if (id_key_str) {
flb_sds_destroy(id_key_str);
id_key_str = NULL;
}
msgpack_sbuffer_destroy(&tmp_sbuf);
continue;
}
else if (ctx->generate_id == FLB_FALSE && id_key_str) {
if (ctx->suppress_type_name) {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_WITHOUT_TYPE,
ctx->es_action,
es_index);
}
else {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT,
ctx->es_action,
es_index, ctx->type);
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if (id_key_str) {
flb_sds_destroy(id_key_str);
id_key_str = NULL;
}
Expand All @@ -570,13 +640,13 @@ static int elasticsearch_format(struct flb_config *config,
}

out_buf_len = flb_sds_len(out_buf);
if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0) {
if (write_op_update == FLB_TRUE) {
tmp_buf = out_buf;
out_buf = flb_sds_create_len(NULL, out_buf_len = out_buf_len + sizeof(ES_BULK_UPDATE_OP_BODY) - 2);
out_buf_len = snprintf(out_buf, out_buf_len, ES_BULK_UPDATE_OP_BODY, tmp_buf);
flb_sds_destroy(tmp_buf);
}
else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) {
else if (write_op_upsert == FLB_TRUE) {
tmp_buf = out_buf;
out_buf = flb_sds_create_len(NULL, out_buf_len = out_buf_len + sizeof(ES_BULK_UPSERT_OP_BODY) - 2);
out_buf_len = snprintf(out_buf, out_buf_len, ES_BULK_UPSERT_OP_BODY, tmp_buf);
Expand Down Expand Up @@ -845,6 +915,12 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk,
FLB_OUTPUT_RETURN(FLB_ERROR);
}

if (out_size == 0) {
flb_free(out_buf);
flb_upstream_conn_release(u_conn);
FLB_OUTPUT_RETURN(FLB_OK);
}

pack = (char *) out_buf;
pack_size = out_size;

Expand Down
112 changes: 92 additions & 20 deletions plugins/out_opensearch/opensearch.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,21 @@ static flb_sds_t os_get_id_value(struct flb_opensearch *ctx,
return tmp_str;
}

static int os_action_line_value_is_safe(const char *value, size_t len)
{
size_t i;
unsigned char c;

for (i = 0; i < len; i++) {
c = (unsigned char) value[i];
if (c == '\n' || c == '\r' || c == '"' || c == '\\' || c < 0x20) {
return FLB_FALSE;
}
}

return FLB_TRUE;
}

static int compose_index_header(struct flb_opensearch *ctx,
int index_custom_len,
char *logstash_index, size_t logstash_index_size,
Expand Down Expand Up @@ -283,6 +298,8 @@ static int opensearch_format(struct flb_config *config,
int index_len = 0;
int write_op_update = FLB_FALSE;
int write_op_upsert = FLB_FALSE;
int id_key_required = FLB_FALSE;
int id_key_safe;
flb_sds_t ra_index = NULL;
size_t s = 0;
char *index = NULL;
Expand Down Expand Up @@ -330,10 +347,16 @@ static int opensearch_format(struct flb_config *config,
return -1;
}

/* Copy logstash prefix if logstash format is enabled */
if (ctx->logstash_format == FLB_TRUE) {
strncpy(logstash_index, ctx->logstash_prefix, sizeof(logstash_index));
logstash_index[sizeof(logstash_index) - 1] = '\0';
if (strcasecmp(ctx->write_operation, FLB_OS_WRITE_OP_UPDATE) == 0) {
write_op_update = FLB_TRUE;
}
else if (strcasecmp(ctx->write_operation, FLB_OS_WRITE_OP_UPSERT) == 0) {
write_op_upsert = FLB_TRUE;
}

if (ctx->ra_id_key && ctx->generate_id == FLB_FALSE &&
(write_op_update == FLB_TRUE || write_op_upsert == FLB_TRUE)) {
id_key_required = FLB_TRUE;
}

/*
Expand Down Expand Up @@ -393,22 +416,30 @@ static int opensearch_format(struct flb_config *config,
map = *log_event.body;
map_size = map.via.map.size;

/* Copy logstash prefix for the per-record fallback path. */
if (ctx->logstash_format == FLB_TRUE) {
strncpy(logstash_index, ctx->logstash_prefix, sizeof(logstash_index));
logstash_index[sizeof(logstash_index) - 1] = '\0';
}

index_custom_len = 0;
if (ctx->logstash_prefix_key) {
flb_sds_t v = flb_ra_translate(ctx->ra_prefix_key,
(char *) tag, tag_len,
map, NULL);
if (v) {
len = flb_sds_len(v);
if (len > 128) {
len = 128;
memcpy(logstash_index, v, 128);
}
else {
memcpy(logstash_index, v, len);
}
if (os_action_line_value_is_safe(v, len) == FLB_TRUE) {
if (len > 128) {
len = 128;
memcpy(logstash_index, v, 128);
}
else {
memcpy(logstash_index, v, len);
}

index_custom_len = len;
index_custom_len = len;
}
flb_sds_destroy(v);
}
}
Expand Down Expand Up @@ -492,6 +523,11 @@ static int opensearch_format(struct flb_config *config,
if (!ra_index) {
flb_plg_warn(ctx->ins, "invalid index translation from record accessor pattern, default to static index");
}
else if (os_action_line_value_is_safe(ra_index,
flb_sds_len(ra_index)) == FLB_FALSE) {
flb_sds_destroy(ra_index);
ra_index = NULL;
}
else {
index = ra_index;
}
Expand Down Expand Up @@ -566,7 +602,15 @@ static int opensearch_format(struct flb_config *config,
}
if (ctx->ra_id_key) {
id_key_str = os_get_id_value(ctx ,&map);
if (id_key_str) {
id_key_safe = FLB_FALSE;

if (id_key_str &&
os_action_line_value_is_safe(id_key_str,
flb_sds_len(id_key_str)) == FLB_TRUE) {
id_key_safe = FLB_TRUE;
}

if (id_key_safe == FLB_TRUE) {
if (ctx->suppress_type_name) {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
Expand All @@ -581,6 +625,35 @@ static int opensearch_format(struct flb_config *config,
ctx->action,
index, ctx->type, id_key_str);
}
}
else if (id_key_required == FLB_TRUE) {
flb_plg_warn(ctx->ins,
"skipping record with missing or unsafe Id_Key value");
if (id_key_str) {
flb_sds_destroy(id_key_str);
id_key_str = NULL;
}
msgpack_sbuffer_destroy(&tmp_sbuf);
continue;
}
else if (ctx->generate_id == FLB_FALSE && id_key_str) {
if (ctx->suppress_type_name) {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
OS_BULK_INDEX_FMT_NO_TYPE,
ctx->action,
index);
}
else {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
OS_BULK_INDEX_FMT,
ctx->action,
index, ctx->type);
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if (id_key_str) {
flb_sds_destroy(id_key_str);
id_key_str = NULL;
}
Expand Down Expand Up @@ -613,13 +686,6 @@ static int opensearch_format(struct flb_config *config,
return -1;
}

if (strcasecmp(ctx->write_operation, FLB_OS_WRITE_OP_UPDATE) == 0) {
write_op_update = FLB_TRUE;
}
else if (strcasecmp(ctx->write_operation, FLB_OS_WRITE_OP_UPSERT) == 0) {
write_op_upsert = FLB_TRUE;
}

/* UPDATE | UPSERT */
if (write_op_update) {
flb_sds_cat_safe(&bulk,
Expand Down Expand Up @@ -913,6 +979,12 @@ static void cb_opensearch_flush(struct flb_event_chunk *event_chunk,
FLB_OUTPUT_RETURN(FLB_ERROR);
}

if (out_size == 0) {
flb_sds_destroy(out_buf);
flb_upstream_conn_release(u_conn);
FLB_OUTPUT_RETURN(FLB_OK);
}

pack = (char *) out_buf;
pack_size = out_size;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
service:
flush: 1
grace: 1
log_level: info
http_server: on
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}

pipeline:
inputs:
- name: dummy
tag: out_es_id_key_ndjson
dummy: '{"doc_id":"legit\"\n{\"delete\":{\"_index\":\"audit-logs\",\"_id\":\"critical-audit-record-12345\"}}\n{\"create\":{\"_index\":\"pad\",\"_id\":\"x","message":"id-key injection"}'
samples: 1

outputs:
- name: es
match: out_es_id_key_ndjson
host: 127.0.0.1
port: ${TEST_SUITE_HTTP_PORT}
index: fluent-bit
suppress_type_name: on
id_key: doc_id
retry_limit: 0
Loading
Loading