From 55c2d19c30f7fa8211da9b25eccb813a7be894a1 Mon Sep 17 00:00:00 2001 From: VishalDalwadi Date: Mon, 6 Apr 2026 11:16:23 +0530 Subject: [PATCH 1/3] fix(go): add tenant id to flow logs; --- clickhouse/clickhouse.go | 10 +++++++++- clickhouse/initdb.d/02_create_flows_table.sql | 1 + .../initdb.d/03_alter_flows_table_tenant_id.sql | 2 ++ controllers/server.go | 2 +- grpc/flow/flow.pb.go | 15 ++++++++++++--- grpc/flow/flow.proto | 2 ++ pro/controllers/flows.go | 6 ++++++ pro/initialize.go | 2 +- 8 files changed, 34 insertions(+), 6 deletions(-) create mode 100644 clickhouse/initdb.d/03_alter_flows_table_tenant_id.sql diff --git a/clickhouse/clickhouse.go b/clickhouse/clickhouse.go index a2562a3ec..73660d6f7 100644 --- a/clickhouse/clickhouse.go +++ b/clickhouse/clickhouse.go @@ -23,7 +23,10 @@ var ErrConnNotFound = errors.New("no connection in context") //go:embed initdb.d/02_create_flows_table.sql var createFlowsTableScript string -func Initialize() error { +//go:embed initdb.d/03_alter_flows_table_tenant_id.sql +var alterFlowsTableTenantIDScript string + +func Initialize(defaultTenantID string) error { if ch != nil { return nil } @@ -49,6 +52,11 @@ func Initialize() error { return err } + err = chConn.Exec(ctx, fmt.Sprintf(alterFlowsTableTenantIDScript, defaultTenantID)) + if err != nil { + return err + } + ch = chConn return nil } diff --git a/clickhouse/initdb.d/02_create_flows_table.sql b/clickhouse/initdb.d/02_create_flows_table.sql index adc9616c3..038da4007 100644 --- a/clickhouse/initdb.d/02_create_flows_table.sql +++ b/clickhouse/initdb.d/02_create_flows_table.sql @@ -1,6 +1,7 @@ CREATE TABLE IF NOT EXISTS flows ( -- Identity flow_id String, + tenant_id String, host_id String, host_name String, network_id String, diff --git a/clickhouse/initdb.d/03_alter_flows_table_tenant_id.sql b/clickhouse/initdb.d/03_alter_flows_table_tenant_id.sql new file mode 100644 index 000000000..bbd267784 --- /dev/null +++ b/clickhouse/initdb.d/03_alter_flows_table_tenant_id.sql @@ -0,0 +1,2 @@ +ALTER TABLE flows +ADD COLUMN IF NOT EXISTS tenant_id String DEFAULT '%s'; \ No newline at end of file diff --git a/controllers/server.go b/controllers/server.go index 649f370b6..e3373c683 100644 --- a/controllers/server.go +++ b/controllers/server.go @@ -272,7 +272,7 @@ func updateSettings(w http.ResponseWriter, r *http.Request) { if currSettings.EnableFlowLogs != req.EnableFlowLogs { if req.EnableFlowLogs { - err := ch.Initialize() + err := ch.Initialize(servercfg.GetNetmakerTenantID()) if err != nil { err = fmt.Errorf("failed to enable flow logs: %v", err) logic.ReturnErrorResponse(w, r, logic.FormatError(err, logic.Internal)) diff --git a/grpc/flow/flow.pb.go b/grpc/flow/flow.pb.go index 5b2fe542e..e0f6795bf 100644 --- a/grpc/flow/flow.pb.go +++ b/grpc/flow/flow.pb.go @@ -291,7 +291,8 @@ type FlowEvent struct { // Version used by ClickHouse for merging. // Must be strictly increasing for START → DESTROY. // Usually equal to the netclient event timestamp (ms). - Version int64 `protobuf:"varint,21,opt,name=version,proto3" json:"version,omitempty"` + Version int64 `protobuf:"varint,21,opt,name=version,proto3" json:"version,omitempty"` + TenantId string `protobuf:"bytes,22,opt,name=tenant_id,json=tenantId,proto3" json:"tenant_id,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -473,6 +474,13 @@ func (x *FlowEvent) GetVersion() int64 { return 0 } +func (x *FlowEvent) GetTenantId() string { + if x != nil { + return x.TenantId + } + return "" +} + // * // Envelope sent by netclients containing multiple FlowEvents. type FlowEnvelope struct { @@ -582,7 +590,7 @@ const file_grpc_flow_flow_proto_rawDesc = "" + "\x02ip\x18\x01 \x01(\tR\x02ip\x122\n" + "\x04type\x18\x02 \x01(\x0e2\x1e.netmaker.flow.ParticipantTypeR\x04type\x12\x0e\n" + "\x02id\x18\x03 \x01(\tR\x02id\x12\x12\n" + - "\x04name\x18\x04 \x01(\tR\x04name\"\xc1\x05\n" + + "\x04name\x18\x04 \x01(\tR\x04name\"\xde\x05\n" + "\tFlowEvent\x12,\n" + "\x04type\x18\x01 \x01(\x0e2\x18.netmaker.flow.EventTypeR\x04type\x12\x17\n" + "\aflow_id\x18\x02 \x01(\tR\x06flowId\x12\x17\n" + @@ -608,7 +616,8 @@ const file_grpc_flow_flow_proto_rawDesc = "" + "\fpackets_sent\x18\x12 \x01(\x04R\vpacketsSent\x12!\n" + "\fpackets_recv\x18\x13 \x01(\x04R\vpacketsRecv\x12\x16\n" + "\x06status\x18\x14 \x01(\rR\x06status\x12\x18\n" + - "\aversion\x18\x15 \x01(\x03R\aversion\"@\n" + + "\aversion\x18\x15 \x01(\x03R\aversion\x12\x1b\n" + + "\ttenant_id\x18\x16 \x01(\tR\btenantId\"@\n" + "\fFlowEnvelope\x120\n" + "\x06events\x18\x01 \x03(\v2\x18.netmaker.flow.FlowEventR\x06events\">\n" + "\fFlowResponse\x12\x18\n" + diff --git a/grpc/flow/flow.proto b/grpc/flow/flow.proto index 6e3874b84..08df11011 100644 --- a/grpc/flow/flow.proto +++ b/grpc/flow/flow.proto @@ -111,6 +111,8 @@ message FlowEvent { * Usually equal to the netclient event timestamp (ms). */ int64 version = 21; + + string tenant_id = 22; } // ============================================================ diff --git a/pro/controllers/flows.go b/pro/controllers/flows.go index 7d8f06370..3e94d2dcc 100644 --- a/pro/controllers/flows.go +++ b/pro/controllers/flows.go @@ -13,6 +13,7 @@ import ( "github.com/gravitl/netmaker/database" "github.com/gravitl/netmaker/logic" proLogic "github.com/gravitl/netmaker/pro/logic" + "github.com/gravitl/netmaker/servercfg" ) func FlowHandlers(r *mux.Router) { @@ -217,6 +218,11 @@ func handleListFlows(w http.ResponseWriter, r *http.Request) { args = append(args, srcTypeStr, srcEntity, dstTypeStr, dstEntity) } + if servercfg.GetNetmakerTenantID() != "" { + whereParts = append(whereParts, "tenant_id = ?") + args = append(args, servercfg.GetNetmakerTenantID()) + } + // Pagination page := parseIntOrDefault(q.Get("page"), 1) perPage := parseIntOrDefault(q.Get("per_page"), 100) diff --git a/pro/initialize.go b/pro/initialize.go index 9228637f3..3e04fc588 100644 --- a/pro/initialize.go +++ b/pro/initialize.go @@ -118,7 +118,7 @@ func InitPro() { addJitExpiryHookWithEmail() if proLogic.GetFeatureFlags().EnableFlowLogs && logic.GetServerSettings().EnableFlowLogs { - err := ch.Initialize() + err := ch.Initialize(servercfg.GetNetmakerTenantID()) if err != nil { logger.Log(0, "error connecting to clickhouse:", err.Error()) } From 0eb19cf8d1b5861242a5895a5ea60ec664edbe6c Mon Sep 17 00:00:00 2001 From: VishalDalwadi Date: Mon, 6 Apr 2026 11:22:19 +0530 Subject: [PATCH 2/3] fix(go): remove tenant id from flow events; --- grpc/flow/flow.pb.go | 15 +++------------ grpc/flow/flow.proto | 2 -- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/grpc/flow/flow.pb.go b/grpc/flow/flow.pb.go index e0f6795bf..5b2fe542e 100644 --- a/grpc/flow/flow.pb.go +++ b/grpc/flow/flow.pb.go @@ -291,8 +291,7 @@ type FlowEvent struct { // Version used by ClickHouse for merging. // Must be strictly increasing for START → DESTROY. // Usually equal to the netclient event timestamp (ms). - Version int64 `protobuf:"varint,21,opt,name=version,proto3" json:"version,omitempty"` - TenantId string `protobuf:"bytes,22,opt,name=tenant_id,json=tenantId,proto3" json:"tenant_id,omitempty"` + Version int64 `protobuf:"varint,21,opt,name=version,proto3" json:"version,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -474,13 +473,6 @@ func (x *FlowEvent) GetVersion() int64 { return 0 } -func (x *FlowEvent) GetTenantId() string { - if x != nil { - return x.TenantId - } - return "" -} - // * // Envelope sent by netclients containing multiple FlowEvents. type FlowEnvelope struct { @@ -590,7 +582,7 @@ const file_grpc_flow_flow_proto_rawDesc = "" + "\x02ip\x18\x01 \x01(\tR\x02ip\x122\n" + "\x04type\x18\x02 \x01(\x0e2\x1e.netmaker.flow.ParticipantTypeR\x04type\x12\x0e\n" + "\x02id\x18\x03 \x01(\tR\x02id\x12\x12\n" + - "\x04name\x18\x04 \x01(\tR\x04name\"\xde\x05\n" + + "\x04name\x18\x04 \x01(\tR\x04name\"\xc1\x05\n" + "\tFlowEvent\x12,\n" + "\x04type\x18\x01 \x01(\x0e2\x18.netmaker.flow.EventTypeR\x04type\x12\x17\n" + "\aflow_id\x18\x02 \x01(\tR\x06flowId\x12\x17\n" + @@ -616,8 +608,7 @@ const file_grpc_flow_flow_proto_rawDesc = "" + "\fpackets_sent\x18\x12 \x01(\x04R\vpacketsSent\x12!\n" + "\fpackets_recv\x18\x13 \x01(\x04R\vpacketsRecv\x12\x16\n" + "\x06status\x18\x14 \x01(\rR\x06status\x12\x18\n" + - "\aversion\x18\x15 \x01(\x03R\aversion\x12\x1b\n" + - "\ttenant_id\x18\x16 \x01(\tR\btenantId\"@\n" + + "\aversion\x18\x15 \x01(\x03R\aversion\"@\n" + "\fFlowEnvelope\x120\n" + "\x06events\x18\x01 \x03(\v2\x18.netmaker.flow.FlowEventR\x06events\">\n" + "\fFlowResponse\x12\x18\n" + diff --git a/grpc/flow/flow.proto b/grpc/flow/flow.proto index 08df11011..6e3874b84 100644 --- a/grpc/flow/flow.proto +++ b/grpc/flow/flow.proto @@ -111,8 +111,6 @@ message FlowEvent { * Usually equal to the netclient event timestamp (ms). */ int64 version = 21; - - string tenant_id = 22; } // ============================================================ From f998a160e081398dc4b266b6962f0dc6d9c42dee Mon Sep 17 00:00:00 2001 From: VishalDalwadi Date: Mon, 6 Apr 2026 15:28:21 +0530 Subject: [PATCH 3/3] feat(go): add tenant id to list flow records query; --- pro/controllers/flows.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pro/controllers/flows.go b/pro/controllers/flows.go index 3e94d2dcc..40bf32a03 100644 --- a/pro/controllers/flows.go +++ b/pro/controllers/flows.go @@ -23,7 +23,7 @@ func FlowHandlers(r *mux.Router) { const ( querySelect = ` SELECT - flow_id, host_id, host_name, network_id, + flow_id, tenant_id, host_id, host_name, network_id, protocol, src_port, dst_port, icmp_type, icmp_code, direction, src_ip, src_type, src_entity_id, src_entity_name, @@ -41,6 +41,7 @@ LIMIT ? OFFSET ?` // FlowRow represents a single flow log entry type FlowRow struct { FlowID string `ch:"flow_id" json:"flow_id"` + TenantID string `ch:"tenant_id" json:"tenant_id"` HostID string `ch:"host_id" json:"host_id"` HostName string `ch:"host_name" json:"host_name"` NetworkID string `ch:"network_id" json:"network_id"`