Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ var ErrConnNotFound = errors.New("no connection in context")
//go:embed initdb.d/02_create_flows_table.sql
var createFlowsTableScript string

func Initialize() error {
//go:embed initdb.d/03_alter_flows_table_tenant_id.sql
var alterFlowsTableTenantIDScript string

func Initialize(defaultTenantID string) error {
if ch != nil {
return nil
}
Expand All @@ -49,6 +52,11 @@ func Initialize() error {
return err
}

err = chConn.Exec(ctx, fmt.Sprintf(alterFlowsTableTenantIDScript, defaultTenantID))
if err != nil {
return err
}

ch = chConn
return nil
}
Expand Down
1 change: 1 addition & 0 deletions clickhouse/initdb.d/02_create_flows_table.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
CREATE TABLE IF NOT EXISTS flows (
-- Identity
flow_id String,
tenant_id String,
host_id String,
host_name String,
network_id String,
Expand Down
2 changes: 2 additions & 0 deletions clickhouse/initdb.d/03_alter_flows_table_tenant_id.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE flows
ADD COLUMN IF NOT EXISTS tenant_id String DEFAULT '%s';
2 changes: 1 addition & 1 deletion controllers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func updateSettings(w http.ResponseWriter, r *http.Request) {

if currSettings.EnableFlowLogs != req.EnableFlowLogs {
if req.EnableFlowLogs {
err := ch.Initialize()
err := ch.Initialize(servercfg.GetNetmakerTenantID())
if err != nil {
err = fmt.Errorf("failed to enable flow logs: %v", err)
logic.ReturnErrorResponse(w, r, logic.FormatError(err, logic.Internal))
Expand Down
9 changes: 8 additions & 1 deletion pro/controllers/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/gravitl/netmaker/database"
"github.com/gravitl/netmaker/logic"
proLogic "github.com/gravitl/netmaker/pro/logic"
"github.com/gravitl/netmaker/servercfg"
)

func FlowHandlers(r *mux.Router) {
Expand All @@ -22,7 +23,7 @@ func FlowHandlers(r *mux.Router) {
const (
querySelect = `
SELECT
flow_id, host_id, host_name, network_id,
flow_id, tenant_id, host_id, host_name, network_id,
protocol, src_port, dst_port,
icmp_type, icmp_code, direction,
src_ip, src_type, src_entity_id, src_entity_name,
Expand All @@ -40,6 +41,7 @@ LIMIT ? OFFSET ?`
// FlowRow represents a single flow log entry
type FlowRow struct {
FlowID string `ch:"flow_id" json:"flow_id"`
TenantID string `ch:"tenant_id" json:"tenant_id"`
HostID string `ch:"host_id" json:"host_id"`
HostName string `ch:"host_name" json:"host_name"`
NetworkID string `ch:"network_id" json:"network_id"`
Expand Down Expand Up @@ -217,6 +219,11 @@ func handleListFlows(w http.ResponseWriter, r *http.Request) {
args = append(args, srcTypeStr, srcEntity, dstTypeStr, dstEntity)
}

if servercfg.GetNetmakerTenantID() != "" {
whereParts = append(whereParts, "tenant_id = ?")
args = append(args, servercfg.GetNetmakerTenantID())
}

// Pagination
page := parseIntOrDefault(q.Get("page"), 1)
perPage := parseIntOrDefault(q.Get("per_page"), 100)
Expand Down
2 changes: 1 addition & 1 deletion pro/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func InitPro() {
addJitExpiryHookWithEmail()

if proLogic.GetFeatureFlags().EnableFlowLogs && logic.GetServerSettings().EnableFlowLogs {
err := ch.Initialize()
err := ch.Initialize(servercfg.GetNetmakerTenantID())
if err != nil {
logger.Log(0, "error connecting to clickhouse:", err.Error())
}
Expand Down