diff --git a/ai.go b/ai.go index 4e6b09c9..0d9a9bbf 100644 --- a/ai.go +++ b/ai.go @@ -44,8 +44,8 @@ import ( var standalone bool // var model = "gpt-5-mini" -//var model = "gpt-5-mini" -var model = "gpt-5.4-nano" +var model = "gpt-5-mini" +//var model = "gpt-5.4-nano" //var model = "gpt-5.2-codex" var fallbackModel = "" @@ -1541,53 +1541,20 @@ func FixJSONNewlines(input string) string { } func FixContentOutput(contentOutput string) string { - if strings.Contains(contentOutput, "```json") { - // Handle ```json - start := strings.Index(contentOutput, "```json") - end := strings.Index(contentOutput, "```") - if start != -1 { - end = strings.Index(contentOutput[start+7:], "```") - - // Shift it so the index is at the correct place - end = end + start + 7 - } - - if start != -1 && end != -1 { - newend := end + 7 - newstart := start + 7 - - log.Printf("[INFO] Found ``` in content. Start: %d, end: %d", start, end) - - if newend > len(contentOutput) { - newend = end - } - - if newend > len(contentOutput) { - newend = len(contentOutput) - } - - if newstart > len(contentOutput) { - newstart = start - } - - if newstart > len(contentOutput) { - newstart = len(contentOutput) - } - - contentOutput = contentOutput[start+7 : newend] - } - } - - if strings.Contains(contentOutput, "```") { - start := strings.Index(contentOutput, "```") - end := strings.Index(contentOutput[start+3:], "```") - if start != -1 { - end = strings.Index(contentOutput[start+3:], "```") - end = end + start + 3 + // Safely extract content from ```json or ``` blocks + if start := strings.Index(contentOutput, "```json"); start != -1 { + start += 7 // skip ```json + if end := strings.Index(contentOutput[start:], "```"); end != -1 { + contentOutput = contentOutput[start : start+end] + } else { + contentOutput = contentOutput[start:] // Unmatched, take the rest } - - if start != -1 && end != -1 { - contentOutput = contentOutput[start+3 : end+3] + } else if start := strings.Index(contentOutput, "```"); start != -1 { + start += 3 // skip ``` + if end := strings.Index(contentOutput[start:], "```"); end != -1 { + contentOutput = contentOutput[start : start+end] + } else { + contentOutput = contentOutput[start:] // Unmatched, take the rest } } @@ -8749,7 +8716,7 @@ data_filter: } // Maps OpenAI -> Result struct so we can handle it - resultMapping := ActionResult{} + resultMapping = ActionResult{} err = json.Unmarshal(body, &resultMapping) if err != nil { log.Printf("[ERROR] AI Agent (2): Failed unmarshalling response into decisions. Response from sending AI Agent request to %s: %d - '%s'. Err: %s", fullUrl, llmStatusCode, string(body), err) @@ -8872,7 +8839,7 @@ data_filter: } // Parse the outputMap.Result to OpenAI response - choicesString := "" + // choicesString = "" bodyMap, ok := outputMap.Body.(map[string]interface{}) if !ok { log.Printf("[ERROR][%s] AI Agent: Failed to convert body to MAP in AI Agent response. Raw response: %s", execution.ExecutionId, string(resultMapping.Result)) diff --git a/blobs.go b/blobs.go index 77216fd0..6ad550a8 100644 --- a/blobs.go +++ b/blobs.go @@ -22,8 +22,8 @@ import ( func IsShuffleApp(app WorkflowApp) bool { parsedAppname := strings.ReplaceAll(strings.ToLower(app.Name), " ", "_") - skipAuthAppnames := []string{"openai", "shuffle_datastore", "shuffle_workflows", "shuffle_detection", "shuffle_sensors", "shuffle_monitors", "shuffle_host_monitors", "shuffles_app_management"} - skipAuthAppIds := []string{"5d19dd82517870c68d40cacad9b5ca91", "b82668d868f6dc7ac1dc14caa92c674b", "b598b078fd5c531699fca803c172ce72", "afda48b8d1f7dc7ac3caae87b2c072e9", "7f12d725c356677d28db042170444448", "48a954b9440b3913b8a2620e57b94a75", "605e31b19889e38f179fab112297eb42"} + skipAuthAppnames := []string{"openai", "shuffle_datastore", "shuffle_workflows", "shuffle_detection", "shuffle_sensors", "shuffle_monitors", "shuffle_host_monitors", "shuffle_apps"} + skipAuthAppIds := []string{"5d19dd82517870c68d40cacad9b5ca91", "b82668d868f6dc7ac1dc14caa92c674b", "b598b078fd5c531699fca803c172ce72", "afda48b8d1f7dc7ac3caae87b2c072e9", "7f12d725c356677d28db042170444448", "48a954b9440b3913b8a2620e57b94a75", "7db43ccd25261967b095cfbd467a75cc"} isShuffleApp := false if project.Environment == "cloud" && len(app.ID) > 0 { diff --git a/shared.go b/shared.go index 1ad2670e..f25418d3 100644 --- a/shared.go +++ b/shared.go @@ -37769,13 +37769,32 @@ func GetWorkflowMinimal(resp http.ResponseWriter, request *http.Request) { return } - // Fetch workflow - workflow, err := GetWorkflow(ctx, workflowId) - if err != nil { - log.Printf("[WARNING] Failed getting workflow %s: %s", workflowId, err) - resp.WriteHeader(404) - resp.Write([]byte(`{"success": false, "reason": "Workflow not found"}`)) - return + // Fetch workflow (try cache first so agent sees its draft) + cacheKey := fmt.Sprintf("workflow_ops_cache_%s", workflowId) + cachedWorkflow, cacheErr := GetCache(ctx, cacheKey) + + var workflow *Workflow + if cacheErr == nil && cachedWorkflow != nil { + if byteData, ok := cachedWorkflow.([]byte); ok { + workflow = &Workflow{} + err := json.Unmarshal(byteData, workflow) + if err != nil { + log.Printf("[WARNING] Failed unmarshaling cached workflow in GetWorkflowMinimal: %s", err) + workflow = nil + } + } + } + + // Fallback to DB if no cache + if workflow == nil { + var err error + workflow, err = GetWorkflow(ctx, workflowId) + if err != nil { + log.Printf("[WARNING] Failed getting workflow %s: %s", workflowId, err) + resp.WriteHeader(404) + resp.Write([]byte(`{"success": false, "reason": "Workflow not found"}`)) + return + } } // Permission check: user owns it OR user is in same org @@ -38239,14 +38258,9 @@ func enrichTriggerFromApp(minTrig *MinimalTrigger, environment string) (Trigger, } } -func broadcastToStream(workflowID string, operation WorkflowOperation, userID string, username string, authHeader string) { - // Convert SetOps operation to StreamOps format - item := "node" // default - switch operation.Op { - case "add_branch", "edit_branch", "delete_branch": - item = "branch" - case "add_condition", "edit_condition", "delete_condition": - item = "condition" +func broadcastBatchToStream(wf *Workflow, operations []WorkflowOperation, tempIDMap map[string]string, userID string, username string, authHeader string) { + if len(operations) == 0 { + return } if len(userID) == 0 { @@ -38256,20 +38270,94 @@ func broadcastToStream(workflowID string, operation WorkflowOperation, userID st username = "agent" } - streamOp := StreamWorkflowOperation{ - Item: item, - Type: operation.Op, - ID: operation.ID, - UserID: userID, - Username: username, - Data: operation.Data, - Timestamp: time.Now().UnixMilli(), + var streamOps []StreamWorkflowOperation + + for _, operation := range operations { + item := "node" // default + opType := "" + switch operation.Op { + case "add_node": + item = "node" + opType = "add" + case "move_node": + item = "node" + opType = "move" + case "edit_node": + item = "node" + opType = "configure" + case "delete_node": + item = "node" + opType = "remove" + case "add_branch": + item = "branch" + opType = "add" + case "edit_branch": + item = "branch" + opType = "configure" + case "delete_branch": + item = "branch" + opType = "remove" + case "save_workflow": + item = "workflow" + opType = "save" + case "set_start_node": + item = "workflow" + opType = "configure" + default: + item = "node" + opType = operation.Op + } + + opID := operation.ID + if realID, exists := tempIDMap[operation.ID]; exists { + opID = realID + } else if realID, exists := tempIDMap[operation.TempID]; exists { + opID = realID + } + + // Extract the ENRICHED node/branch from the workflow instead of using the Minimal payload + // We only need the fully enriched data for CREATING nodes/branches. + // For edits or moves, we just pass the partial payload the agent sent so the UI can patch it locally. + var enrichedData interface{} + if operation.Op == "add_node" { + if operation.NodeType == "action" { + if idx := findActionIndexByID(wf, opID); idx != -1 { + enrichedData = wf.Actions[idx] + } + } else if operation.NodeType == "trigger" { + if idx := findTriggerIndexByID(wf, opID); idx != -1 { + enrichedData = wf.Triggers[idx] + } + } + } else if operation.Op == "add_branch" { + if idx := findBranchIndexByID(wf, opID); idx != -1 { + enrichedData = wf.Branches[idx] + } + } + + var finalData []byte + if enrichedData != nil { + finalData, _ = json.Marshal(enrichedData) + } else { + // Fallback for delete ops where the node is already removed from wf, or if not found + finalData = operation.Data + } + + streamOps = append(streamOps, StreamWorkflowOperation{ + Item: item, + Type: opType, + ID: opID, + UserID: userID, + Username: username, + Data: finalData, + Timestamp: time.Now().UnixMilli(), + }) } - // Marshal to JSON - payload, err := json.Marshal(streamOp) + // Marshal to JSON array + payload, err := json.Marshal(streamOps) if err != nil { - log.Printf("[WARNING] Failed to marshal stream operation for workflow %s: %s", workflowID, err) + log.Printf("[WARNING] Failed to marshal stream operations for workflow %s: %s", wf.ID, err) return } @@ -38286,12 +38374,12 @@ func broadcastToStream(workflowID string, operation WorkflowOperation, userID st } } - streamURL := fmt.Sprintf("%s/api/v1/workflows/%s/stream", baseURL, workflowID) + streamURL := fmt.Sprintf("%s/api/v1/workflows/%s/stream", baseURL, wf.ID) // Create HTTP POST request req, err := http.NewRequest("POST", streamURL, strings.NewReader(string(payload))) if err != nil { - log.Printf("[WARNING] Failed to create stream request for workflow %s: %s", workflowID, err) + log.Printf("[WARNING] Failed to create stream request for workflow %s: %s", wf.ID, err) return } @@ -38305,16 +38393,16 @@ func broadcastToStream(workflowID string, operation WorkflowOperation, userID st client := &http.Client{Timeout: 10 * time.Second} resp, err := client.Do(req) if err != nil { - log.Printf("[WARNING] Failed to broadcast to stream for workflow %s: %s", workflowID, err) + log.Printf("[WARNING] Failed to broadcast to stream for workflow %s: %s", wf.ID, err) return } defer resp.Body.Close() // Log result if resp.StatusCode >= 200 && resp.StatusCode < 300 { - log.Printf("[DEBUG] Streamed operation %s to workflow %s", operation.Op, workflowID) + log.Printf("[DEBUG] Streamed %d operations to workflow %s", len(streamOps), wf.ID) } else { - log.Printf("[WARNING] Stream endpoint returned status %d for workflow %s", resp.StatusCode, workflowID) + log.Printf("[WARNING] Stream endpoint returned status %d for workflow %s", resp.StatusCode, wf.ID) } } @@ -38342,22 +38430,15 @@ func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { // Extract workflow ID from URL location := strings.Split(request.URL.String(), "/") - var workflowID string + var urlWorkflowID string if len(location) > 4 && location[1] == "api" { - workflowID = location[4] - if strings.Contains(workflowID, "?") { - workflowID = strings.Split(workflowID, "?")[0] + urlWorkflowID = location[4] + if strings.Contains(urlWorkflowID, "?") { + urlWorkflowID = strings.Split(urlWorkflowID, "?")[0] } } - if len(workflowID) != 36 { - log.Printf("[WARNING] Invalid workflow ID: %s", workflowID) - resp.WriteHeader(400) - resp.Write([]byte(`{"success": false, "reason": "Invalid workflow ID"}`)) - return - } - - // Parse request + // Parse request first so we can fallback to body's WorkflowID body, err := ioutil.ReadAll(request.Body) if err != nil { log.Printf("[WARNING] Failed reading request body: %s", err) @@ -38376,8 +38457,21 @@ func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { return } + workflowID := urlWorkflowID + if len(workflowID) != 36 { + // Fallback to body's WorkflowID if URL ID is invalid (e.g., %7Bkey%7D from MCP) + if len(setOpsReq.WorkflowID) == 36 { + workflowID = setOpsReq.WorkflowID + } else { + log.Printf("[WARNING] Invalid workflow ID: %s", urlWorkflowID) + resp.WriteHeader(400) + resp.Write([]byte(`{"success": false, "reason": "Invalid workflow ID"}`)) + return + } + } + // Validate request - if setOpsReq.WorkflowID != workflowID { + if setOpsReq.WorkflowID != "" && setOpsReq.WorkflowID != workflowID { resp.WriteHeader(400) resp.Write([]byte(`{"success": false, "reason": "Workflow ID mismatch"}`)) return @@ -38390,7 +38484,7 @@ func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { } // Get workflow (from cache or DB) - cacheKey := fmt.Sprintf("workflow_ops_cache_%s_%s", workflowID, user.Id) + cacheKey := fmt.Sprintf("workflow_ops_cache_%s", workflowID) cachedWorkflow, cacheErr := GetCache(ctx, cacheKey) var workflow *Workflow @@ -38427,7 +38521,14 @@ func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { // Apply operations (all-or-nothing) with temp ID mapping tempIDMap := make(map[string]string) // Maps temp_id → real_id + shouldSaveDB := false + for opIndex, operation := range setOpsReq.Operations { + if operation.Op == "save_workflow" { + shouldSaveDB = true + continue + } + err = applyWorkflowOperationWithMapping(ctx, user, workflow, &operation, tempIDMap) if err != nil { errMsg := fmt.Sprintf(`{"success": false, "reason": "Operation %d failed: %s", "failed_at_op": %d}`, opIndex, err.Error(), opIndex) @@ -38453,6 +38554,16 @@ func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { // Don't fail the request, cache is best-effort } + if shouldSaveDB { + err = SetWorkflow(ctx, *workflow, workflow.ID) + if err != nil { + log.Printf("[ERROR] Failed saving workflow to DB: %s", err) + resp.WriteHeader(500) + resp.Write([]byte(`{"success": false, "reason": "Failed to save workflow to database"}`)) + return + } + } + // Build response minWf := buildMinimalWorkflow(workflow) response := WorkflowSetOpsResponse{ @@ -38473,9 +38584,7 @@ func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { // Broadcast operations to stream endpoint (agent gets response immediately, streaming happens in background) // Extract auth header from incoming request to pass to stream endpoint authHeader := request.Header.Get("Authorization") - for _, operation := range setOpsReq.Operations { - go broadcastToStream(workflowID, operation, user.Id, user.Username, authHeader) - } + go broadcastBatchToStream(workflow, setOpsReq.Operations, tempIDMap, "agent", "Agent", authHeader) if debug{ log.Printf("[INFO] Applied %d operations to workflow %s for user %s", len(setOpsReq.Operations), workflowID, user.Username) @@ -38511,6 +38620,15 @@ func applyWorkflowOperationWithMapping(ctx context.Context, user User, wf *Workf case "delete_condition": return opDeleteCondition(wf, op) + // ====== WORKFLOW OPERATIONS ====== + case "set_start_node": + if realID, exists := tempIDMap[op.ID]; exists { + wf.Start = realID + } else { + wf.Start = op.ID + } + return nil + default: return fmt.Errorf("unknown operation: %s", op.Op) }