From 6e862cf5a44bb293b45a97595e9602d3f184a3c3 Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Fri, 19 Jun 2026 16:39:07 +0530 Subject: [PATCH 01/12] fix: update AI model version and enhance JSON content extraction in workflow operations --- ai.go | 67 +++++------------ shared.go | 214 ++++++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 183 insertions(+), 98 deletions(-) diff --git a/ai.go b/ai.go index 4e6b09c9..0d9a9bbf 100644 --- a/ai.go +++ b/ai.go @@ -44,8 +44,8 @@ import ( var standalone bool // var model = "gpt-5-mini" -//var model = "gpt-5-mini" -var model = "gpt-5.4-nano" +var model = "gpt-5-mini" +//var model = "gpt-5.4-nano" //var model = "gpt-5.2-codex" var fallbackModel = "" @@ -1541,53 +1541,20 @@ func FixJSONNewlines(input string) string { } func FixContentOutput(contentOutput string) string { - if strings.Contains(contentOutput, "```json") { - // Handle ```json - start := strings.Index(contentOutput, "```json") - end := strings.Index(contentOutput, "```") - if start != -1 { - end = strings.Index(contentOutput[start+7:], "```") - - // Shift it so the index is at the correct place - end = end + start + 7 - } - - if start != -1 && end != -1 { - newend := end + 7 - newstart := start + 7 - - log.Printf("[INFO] Found ``` in content. Start: %d, end: %d", start, end) - - if newend > len(contentOutput) { - newend = end - } - - if newend > len(contentOutput) { - newend = len(contentOutput) - } - - if newstart > len(contentOutput) { - newstart = start - } - - if newstart > len(contentOutput) { - newstart = len(contentOutput) - } - - contentOutput = contentOutput[start+7 : newend] - } - } - - if strings.Contains(contentOutput, "```") { - start := strings.Index(contentOutput, "```") - end := strings.Index(contentOutput[start+3:], "```") - if start != -1 { - end = strings.Index(contentOutput[start+3:], "```") - end = end + start + 3 + // Safely extract content from ```json or ``` blocks + if start := strings.Index(contentOutput, "```json"); start != -1 { + start += 7 // skip ```json + if end := strings.Index(contentOutput[start:], "```"); end != -1 { + contentOutput = contentOutput[start : start+end] + } else { + contentOutput = contentOutput[start:] // Unmatched, take the rest } - - if start != -1 && end != -1 { - contentOutput = contentOutput[start+3 : end+3] + } else if start := strings.Index(contentOutput, "```"); start != -1 { + start += 3 // skip ``` + if end := strings.Index(contentOutput[start:], "```"); end != -1 { + contentOutput = contentOutput[start : start+end] + } else { + contentOutput = contentOutput[start:] // Unmatched, take the rest } } @@ -8749,7 +8716,7 @@ data_filter: } // Maps OpenAI -> Result struct so we can handle it - resultMapping := ActionResult{} + resultMapping = ActionResult{} err = json.Unmarshal(body, &resultMapping) if err != nil { log.Printf("[ERROR] AI Agent (2): Failed unmarshalling response into decisions. Response from sending AI Agent request to %s: %d - '%s'. Err: %s", fullUrl, llmStatusCode, string(body), err) @@ -8872,7 +8839,7 @@ data_filter: } // Parse the outputMap.Result to OpenAI response - choicesString := "" + // choicesString = "" bodyMap, ok := outputMap.Body.(map[string]interface{}) if !ok { log.Printf("[ERROR][%s] AI Agent: Failed to convert body to MAP in AI Agent response. Raw response: %s", execution.ExecutionId, string(resultMapping.Result)) diff --git a/shared.go b/shared.go index 65aca3cc..f17a9afa 100644 --- a/shared.go +++ b/shared.go @@ -37749,13 +37749,32 @@ func GetWorkflowMinimal(resp http.ResponseWriter, request *http.Request) { return } - // Fetch workflow - workflow, err := GetWorkflow(ctx, workflowId) - if err != nil { - log.Printf("[WARNING] Failed getting workflow %s: %s", workflowId, err) - resp.WriteHeader(404) - resp.Write([]byte(`{"success": false, "reason": "Workflow not found"}`)) - return + // Fetch workflow (try cache first so agent sees its draft) + cacheKey := fmt.Sprintf("workflow_ops_cache_%s", workflowId) + cachedWorkflow, cacheErr := GetCache(ctx, cacheKey) + + var workflow *Workflow + if cacheErr == nil && cachedWorkflow != nil { + if byteData, ok := cachedWorkflow.([]byte); ok { + workflow = &Workflow{} + err := json.Unmarshal(byteData, workflow) + if err != nil { + log.Printf("[WARNING] Failed unmarshaling cached workflow in GetWorkflowMinimal: %s", err) + workflow = nil + } + } + } + + // Fallback to DB if no cache + if workflow == nil { + var err error + workflow, err = GetWorkflow(ctx, workflowId) + if err != nil { + log.Printf("[WARNING] Failed getting workflow %s: %s", workflowId, err) + resp.WriteHeader(404) + resp.Write([]byte(`{"success": false, "reason": "Workflow not found"}`)) + return + } } // Permission check: user owns it OR user is in same org @@ -37985,14 +38004,9 @@ func enrichTriggerFromApp(minTrig *MinimalTrigger, environment string) (Trigger, } } -func broadcastToStream(workflowID string, operation WorkflowOperation, userID string, username string, authHeader string) { - // Convert SetOps operation to StreamOps format - item := "node" // default - switch operation.Op { - case "add_branch", "edit_branch", "delete_branch": - item = "branch" - case "add_condition", "edit_condition", "delete_condition": - item = "condition" +func broadcastBatchToStream(wf *Workflow, operations []WorkflowOperation, tempIDMap map[string]string, userID string, username string, authHeader string) { + if len(operations) == 0 { + return } if len(userID) == 0 { @@ -38002,20 +38016,94 @@ func broadcastToStream(workflowID string, operation WorkflowOperation, userID st username = "agent" } - streamOp := StreamWorkflowOperation{ - Item: item, - Type: operation.Op, - ID: operation.ID, - UserID: userID, - Username: username, - Data: operation.Data, - Timestamp: time.Now().UnixMilli(), + var streamOps []StreamWorkflowOperation + + for _, operation := range operations { + item := "node" // default + opType := "" + switch operation.Op { + case "add_node": + item = "node" + opType = "add" + case "move_node": + item = "node" + opType = "move" + case "edit_node": + item = "node" + opType = "configure" + case "delete_node": + item = "node" + opType = "remove" + case "add_branch": + item = "branch" + opType = "add" + case "edit_branch": + item = "branch" + opType = "configure" + case "delete_branch": + item = "branch" + opType = "remove" + case "save_workflow": + item = "workflow" + opType = "save" + case "set_start_node": + item = "workflow" + opType = "configure" + default: + item = "node" + opType = operation.Op + } + + opID := operation.ID + if realID, exists := tempIDMap[operation.ID]; exists { + opID = realID + } else if realID, exists := tempIDMap[operation.TempID]; exists { + opID = realID + } + + // Extract the ENRICHED node/branch from the workflow instead of using the Minimal payload + // We only need the fully enriched data for CREATING nodes/branches. + // For edits or moves, we just pass the partial payload the agent sent so the UI can patch it locally. + var enrichedData interface{} + if operation.Op == "add_node" { + if operation.NodeType == "action" { + if idx := findActionIndexByID(wf, opID); idx != -1 { + enrichedData = wf.Actions[idx] + } + } else if operation.NodeType == "trigger" { + if idx := findTriggerIndexByID(wf, opID); idx != -1 { + enrichedData = wf.Triggers[idx] + } + } + } else if operation.Op == "add_branch" { + if idx := findBranchIndexByID(wf, opID); idx != -1 { + enrichedData = wf.Branches[idx] + } + } + + var finalData []byte + if enrichedData != nil { + finalData, _ = json.Marshal(enrichedData) + } else { + // Fallback for delete ops where the node is already removed from wf, or if not found + finalData = operation.Data + } + + streamOps = append(streamOps, StreamWorkflowOperation{ + Item: item, + Type: opType, + ID: opID, + UserID: userID, + Username: username, + Data: finalData, + Timestamp: time.Now().UnixMilli(), + }) } - // Marshal to JSON - payload, err := json.Marshal(streamOp) + // Marshal to JSON array + payload, err := json.Marshal(streamOps) if err != nil { - log.Printf("[WARNING] Failed to marshal stream operation for workflow %s: %s", workflowID, err) + log.Printf("[WARNING] Failed to marshal stream operations for workflow %s: %s", wf.ID, err) return } @@ -38032,12 +38120,12 @@ func broadcastToStream(workflowID string, operation WorkflowOperation, userID st } } - streamURL := fmt.Sprintf("%s/api/v1/workflows/%s/stream", baseURL, workflowID) + streamURL := fmt.Sprintf("%s/api/v1/workflows/%s/stream", baseURL, wf.ID) // Create HTTP POST request req, err := http.NewRequest("POST", streamURL, strings.NewReader(string(payload))) if err != nil { - log.Printf("[WARNING] Failed to create stream request for workflow %s: %s", workflowID, err) + log.Printf("[WARNING] Failed to create stream request for workflow %s: %s", wf.ID, err) return } @@ -38051,16 +38139,16 @@ func broadcastToStream(workflowID string, operation WorkflowOperation, userID st client := &http.Client{Timeout: 10 * time.Second} resp, err := client.Do(req) if err != nil { - log.Printf("[WARNING] Failed to broadcast to stream for workflow %s: %s", workflowID, err) + log.Printf("[WARNING] Failed to broadcast to stream for workflow %s: %s", wf.ID, err) return } defer resp.Body.Close() // Log result if resp.StatusCode >= 200 && resp.StatusCode < 300 { - log.Printf("[DEBUG] Streamed operation %s to workflow %s", operation.Op, workflowID) + log.Printf("[DEBUG] Streamed %d operations to workflow %s", len(streamOps), wf.ID) } else { - log.Printf("[WARNING] Stream endpoint returned status %d for workflow %s", resp.StatusCode, workflowID) + log.Printf("[WARNING] Stream endpoint returned status %d for workflow %s", resp.StatusCode, wf.ID) } } @@ -38088,22 +38176,15 @@ func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { // Extract workflow ID from URL location := strings.Split(request.URL.String(), "/") - var workflowID string + var urlWorkflowID string if len(location) > 4 && location[1] == "api" { - workflowID = location[4] - if strings.Contains(workflowID, "?") { - workflowID = strings.Split(workflowID, "?")[0] + urlWorkflowID = location[4] + if strings.Contains(urlWorkflowID, "?") { + urlWorkflowID = strings.Split(urlWorkflowID, "?")[0] } } - if len(workflowID) != 36 { - log.Printf("[WARNING] Invalid workflow ID: %s", workflowID) - resp.WriteHeader(400) - resp.Write([]byte(`{"success": false, "reason": "Invalid workflow ID"}`)) - return - } - - // Parse request + // Parse request first so we can fallback to body's WorkflowID body, err := ioutil.ReadAll(request.Body) if err != nil { log.Printf("[WARNING] Failed reading request body: %s", err) @@ -38122,8 +38203,21 @@ func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { return } + workflowID := urlWorkflowID + if len(workflowID) != 36 { + // Fallback to body's WorkflowID if URL ID is invalid (e.g., %7Bkey%7D from MCP) + if len(setOpsReq.WorkflowID) == 36 { + workflowID = setOpsReq.WorkflowID + } else { + log.Printf("[WARNING] Invalid workflow ID: %s", urlWorkflowID) + resp.WriteHeader(400) + resp.Write([]byte(`{"success": false, "reason": "Invalid workflow ID"}`)) + return + } + } + // Validate request - if setOpsReq.WorkflowID != workflowID { + if setOpsReq.WorkflowID != "" && setOpsReq.WorkflowID != workflowID { resp.WriteHeader(400) resp.Write([]byte(`{"success": false, "reason": "Workflow ID mismatch"}`)) return @@ -38136,7 +38230,7 @@ func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { } // Get workflow (from cache or DB) - cacheKey := fmt.Sprintf("workflow_ops_cache_%s_%s", workflowID, user.Id) + cacheKey := fmt.Sprintf("workflow_ops_cache_%s", workflowID) cachedWorkflow, cacheErr := GetCache(ctx, cacheKey) var workflow *Workflow @@ -38173,7 +38267,14 @@ func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { // Apply operations (all-or-nothing) with temp ID mapping tempIDMap := make(map[string]string) // Maps temp_id → real_id + shouldSaveDB := false + for opIndex, operation := range setOpsReq.Operations { + if operation.Op == "save_workflow" { + shouldSaveDB = true + continue + } + err = applyWorkflowOperationWithMapping(ctx, user, workflow, &operation, tempIDMap) if err != nil { errMsg := fmt.Sprintf(`{"success": false, "reason": "Operation %d failed: %s", "failed_at_op": %d}`, opIndex, err.Error(), opIndex) @@ -38199,6 +38300,16 @@ func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { // Don't fail the request, cache is best-effort } + if shouldSaveDB { + err = SetWorkflow(ctx, *workflow, workflow.ID) + if err != nil { + log.Printf("[ERROR] Failed saving workflow to DB: %s", err) + resp.WriteHeader(500) + resp.Write([]byte(`{"success": false, "reason": "Failed to save workflow to database"}`)) + return + } + } + // Build response minWf := buildMinimalWorkflow(workflow) response := WorkflowSetOpsResponse{ @@ -38219,9 +38330,7 @@ func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { // Broadcast operations to stream endpoint (agent gets response immediately, streaming happens in background) // Extract auth header from incoming request to pass to stream endpoint authHeader := request.Header.Get("Authorization") - for _, operation := range setOpsReq.Operations { - go broadcastToStream(workflowID, operation, user.Id, user.Username, authHeader) - } + go broadcastBatchToStream(workflow, setOpsReq.Operations, tempIDMap, "agent", "Agent", authHeader) if debug{ log.Printf("[INFO] Applied %d operations to workflow %s for user %s", len(setOpsReq.Operations), workflowID, user.Username) @@ -38257,6 +38366,15 @@ func applyWorkflowOperationWithMapping(ctx context.Context, user User, wf *Workf case "delete_condition": return opDeleteCondition(wf, op) + // ====== WORKFLOW OPERATIONS ====== + case "set_start_node": + if realID, exists := tempIDMap[op.ID]; exists { + wf.Start = realID + } else { + wf.Start = op.ID + } + return nil + default: return fmt.Errorf("unknown operation: %s", op.Op) } From eb7ca7f4e9e1ec881643a9a56e1563f9e79b0163 Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Fri, 19 Jun 2026 17:07:21 +0530 Subject: [PATCH 02/12] fix: update skipAuthAppnames to include the new Shuffle Apps --- blobs.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/blobs.go b/blobs.go index 77216fd0..6ad550a8 100644 --- a/blobs.go +++ b/blobs.go @@ -22,8 +22,8 @@ import ( func IsShuffleApp(app WorkflowApp) bool { parsedAppname := strings.ReplaceAll(strings.ToLower(app.Name), " ", "_") - skipAuthAppnames := []string{"openai", "shuffle_datastore", "shuffle_workflows", "shuffle_detection", "shuffle_sensors", "shuffle_monitors", "shuffle_host_monitors", "shuffles_app_management"} - skipAuthAppIds := []string{"5d19dd82517870c68d40cacad9b5ca91", "b82668d868f6dc7ac1dc14caa92c674b", "b598b078fd5c531699fca803c172ce72", "afda48b8d1f7dc7ac3caae87b2c072e9", "7f12d725c356677d28db042170444448", "48a954b9440b3913b8a2620e57b94a75", "605e31b19889e38f179fab112297eb42"} + skipAuthAppnames := []string{"openai", "shuffle_datastore", "shuffle_workflows", "shuffle_detection", "shuffle_sensors", "shuffle_monitors", "shuffle_host_monitors", "shuffle_apps"} + skipAuthAppIds := []string{"5d19dd82517870c68d40cacad9b5ca91", "b82668d868f6dc7ac1dc14caa92c674b", "b598b078fd5c531699fca803c172ce72", "afda48b8d1f7dc7ac3caae87b2c072e9", "7f12d725c356677d28db042170444448", "48a954b9440b3913b8a2620e57b94a75", "7db43ccd25261967b095cfbd467a75cc"} isShuffleApp := false if project.Environment == "cloud" && len(app.ID) > 0 { From 4e711e5b20642e193dabb9f8990da12781863d1d Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Thu, 25 Jun 2026 17:40:13 +0530 Subject: [PATCH 03/12] fix: improve environment handling and idempotent delete operations --- shared.go | 101 ++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 95 insertions(+), 6 deletions(-) diff --git a/shared.go b/shared.go index f17a9afa..fe1aa934 100644 --- a/shared.go +++ b/shared.go @@ -26320,6 +26320,11 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h // Handles agentic run continues // This is if shuffler.io/agents => questions are answered + // Only process decision injection for the specific action result we are targeting + if result.Action.ID != start[0] { + continue + } + if agentic { log.Printf("[INFO][%s] Should fix the decision by injecting the values and continuing to the next step! :3", oldExecution.ExecutionId) @@ -38301,6 +38306,51 @@ func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { } if shouldSaveDB { + env := workflow.ExecutionEnvironment + if len(env) == 0 { + env = "Shuffle" + } + for i := range workflow.Actions { + if len(workflow.Actions[i].Environment) == 0 { + workflow.Actions[i].Environment = env + } + } + for i := range workflow.Triggers { + if len(workflow.Triggers[i].Environment) == 0 { + workflow.Triggers[i].Environment = env + } + } + + allAuths, authErr := GetAllWorkflowAppAuth(ctx, user.ActiveOrg.Id) + if authErr != nil { + log.Printf("[WARNING] Could not load org auths for auto-hydration during save: %s", authErr) + } else { + for i := range workflow.Actions { + if workflow.Actions[i].AuthenticationId != "" { + continue // already has auth, skip + } + appID := workflow.Actions[i].AppID + appName := workflow.Actions[i].AppName + + var bestAuth *AppAuthenticationStorage + var bestEdited int64 = -1 + for j := range allAuths { + a := &allAuths[j] + if a.App.ID != appID && !strings.EqualFold(a.App.Name, appName) { + continue + } + if a.Edited > bestEdited { + bestEdited = a.Edited + bestAuth = a + } + } + if bestAuth != nil { + workflow.Actions[i].AuthenticationId = bestAuth.Id + //log.Printf("[INFO] Auto-assigned auth %s (%s) to action %s (%s)", bestAuth.Label, bestAuth.Id, workflow.Actions[i].Label, appName) + } + } + } + err = SetWorkflow(ctx, *workflow, workflow.ID) if err != nil { log.Printf("[ERROR] Failed saving workflow to DB: %s", err) @@ -38445,7 +38495,15 @@ func opAddNode(ctx context.Context, user User, wf *Workflow, op *WorkflowOperati // } // } - // Should we let the agent specify the position? If not, can we auto-calculate based on existing nodes ?? + // If the agent didn't specify a position (0,0), auto-layout to prevent stacking + if minAct.X == 0 && minAct.Y == 0 { + startX := -312.0 + y := 190.0 + xSpacing := 437.0 + minAct.X = int64(startX + float64(len(wf.Triggers)+len(wf.Actions))*xSpacing) + minAct.Y = int64(y) + } + newAction.Position = Position{ X: float64(minAct.X), Y: float64(minAct.Y), @@ -38662,7 +38720,7 @@ func opMoveNode(wf *Workflow, op *WorkflowOperation) error { return nil } - return fmt.Errorf("node %s not found in workflow (not an action or trigger)", op.ID) + return fmt.Errorf("node %s not found in workflow (not an action or trigger) for move", op.ID) } func opDeleteNode(wf *Workflow, op *WorkflowOperation) error { @@ -38670,7 +38728,12 @@ func opDeleteNode(wf *Workflow, op *WorkflowOperation) error { case "action": idx := findActionIndexByID(wf, op.ID) if idx == -1 { - return fmt.Errorf("action %s not found", op.ID) + // Already gone, idempotent no-op (e.g. cascade from a prior delete) + if debug { + log.Printf("[DEBUG] delete_node(action): action %s not found, already removed - skipping", op.ID) + } + + return nil } // Remove action @@ -38688,7 +38751,11 @@ func opDeleteNode(wf *Workflow, op *WorkflowOperation) error { case "trigger": idx := findTriggerIndexByID(wf, op.ID) if idx == -1 { - return fmt.Errorf("trigger %s not found", op.ID) + // Already gone idempotent no-op + if debug { + log.Printf("[DEBUG] delete_node(trigger): trigger %s not found, already removed - skipping", op.ID) + } + return nil } wf.Triggers = append(wf.Triggers[:idx], wf.Triggers[idx+1:]...) @@ -38733,7 +38800,23 @@ func opAddBranchWithMapping(wf *Workflow, op *WorkflowOperation, tempIDMap map[s resolvedData, _ := json.Marshal(branchData) op.Data = resolvedData - return opAddBranch(wf, op) + err := opAddBranch(wf, op) + if err != nil { + return err + } + + // Map the newly generated branch ID back to the agent's temp IDs + if len(wf.Branches) > 0 { + realID := wf.Branches[len(wf.Branches)-1].ID + if len(op.TempID) > 0 { + tempIDMap[op.TempID] = realID + } + if len(op.ID) > 0 { + tempIDMap[op.ID] = realID + } + } + + return nil } func opAddBranch(wf *Workflow, op *WorkflowOperation) error { @@ -38803,7 +38886,13 @@ func opDeleteBranch(wf *Workflow, op *WorkflowOperation) error { return nil } } - return fmt.Errorf("branch %s not found", op.ID) + // Branch not found - this is OK if it was already removed as a consequence of + // a previous delete_node op. Treat as a no-op so the agent doesn't retry. + if debug { + log.Printf("[DEBUG] delete_branch: branch %s not found, already removed (likely cascade from delete_node) - skipping", op.ID) + } + + return nil } From b1afcc745a86cf31118394f2d9ee18f7b6e8e097 Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Mon, 29 Jun 2026 16:18:26 +0530 Subject: [PATCH 04/12] feat: add parsing functions for AgentDecision structs and refactor decision handling --- ai.go | 183 ++++++++++++++++++++++++++++++++++++++++----------- streaming.go | 36 +++++----- 2 files changed, 163 insertions(+), 56 deletions(-) diff --git a/ai.go b/ai.go index 0d9a9bbf..87749a25 100644 --- a/ai.go +++ b/ai.go @@ -1673,6 +1673,136 @@ func balanceJSONLikeString(s string) string { return string(result) } +// extracts AgentDecision structs from messy LLM output. It tries multiple strategies to handle various output formats. +func parseAgentDecisions(rawOutput string) ([]AgentDecision, error) { + cleaned := FixContentOutput(rawOutput) + + // If its a JSON array + if decisions, err := extractDecisionArray(cleaned); err == nil { + return decisions, nil + } + + // JSONL (one object per occurrence) ?? + if decisions, err := extractDecisionJSONL(cleaned); err == nil { + return decisions, nil + } + + // last resort: unescape, then retry array + unescaped := strings.ReplaceAll(cleaned, `\"`, `"`) + if decisions, err := extractDecisionArray(unescaped); err == nil { + return decisions, nil + } + + return nil, fmt.Errorf("failed to parse agent decisions from LLM output") +} + +// extractDecisionArray finds the first '[' that decodes into a valid decision array. +func extractDecisionArray(s string) ([]AgentDecision, error) { + for i := 0; i < len(s); i++ { + if s[i] != '[' { + continue + } + + // So lets decode into raw maps so we can inspect and mutate it + var raw []map[string]interface{} + if err := json.NewDecoder(strings.NewReader(s[i:])).Decode(&raw); err != nil { + continue + } + + if len(raw) == 0 || raw[0]["action"] == nil { + continue + } + + // Fix the types (convert numbers/bools to strings) + normalizeDecisionFields(raw) + + //Round-trip: Marshal the fixed map back to bytes, then unmarshal into the strict struct + b, err := json.Marshal(raw) + if err != nil { + continue + } + + var decisions []AgentDecision + if err := json.Unmarshal(b, &decisions); err != nil { + continue + } + + return decisions, nil + } + + return nil, fmt.Errorf("no valid JSON array found") +} + +// extractDecisionJSONL collects every top-level '{...}' that looks like a decision. +func extractDecisionJSONL(s string) ([]AgentDecision, error) { + var out []AgentDecision + + for i := 0; i < len(s); { + if s[i] != '{' { + i++ + continue + } + + r := strings.NewReader(s[i:]) + dec := json.NewDecoder(r) + + var raw map[string]interface{} + if err := dec.Decode(&raw); err != nil { + i++ + continue + } + + if raw["action"] != nil { + normalizeDecisionFields([]map[string]interface{}{raw}) + b, err := json.Marshal(raw) + if err == nil { + var one AgentDecision + if err := json.Unmarshal(b, &one); err == nil { + out = append(out, one) + } + } + } + + // Jump past the object we just consumed to avoid parsing nested braces + i += int(dec.InputOffset()) + } + + if len(out) == 0 { + return nil, fmt.Errorf("no valid JSONL objects found") + } + + return out, nil +} + +// normalizeDecisionFields converts non-string field["value"] entries into JSON strings. +// This MUST happen before unmarshalling into AgentDecision (whose Value field is a string). +func normalizeDecisionFields(decisions []map[string]interface{}) { + for _, d := range decisions { + fields, ok := d["fields"].([]interface{}) + if !ok { + continue + } + + for _, f := range fields { + fm, ok := f.(map[string]interface{}) + if !ok { + continue + } + + v, ok := fm["value"] + if !ok || v == nil { + continue + } + + if _, isStr := v.(string); !isStr { + if b, err := json.Marshal(v); err == nil { + fm["value"] = string(b) + } + } + } + } +} + func AutofixAppLabels(ctx context.Context, app WorkflowApp, label string, keys []string) (WorkflowApp, WorkflowAppAction) { standalone := os.Getenv("STANDALONE") == "true" @@ -8967,49 +9097,26 @@ data_filter: // Found random JSON issues with [{} and similar, due to LLM instability. decisionString = FixContentOutput(choicesString) - // Find the first one and remove anything until that point conditionText := "conditions must be correct" - if !strings.HasPrefix(decisionString, `[`) { - firstIndex := strings.Index(decisionString, "[") - if firstIndex != -1 { - decisionString = decisionString[firstIndex:] - } else { - if !strings.Contains(decisionString, conditionText) { - log.Printf("[WARNING][%s] No '[' found in AI Agent response. Using full response: %s", execution.ExecutionId, decisionString) - } - } - } + errorMessage := "" - // LLM is occasionally appending freeform text like (e.g. "Summary: ...") after the closing bracket. Truncate everything past the last ']' so the JSON - // parser doesn't dont break due to that. - if lastBracket := strings.LastIndex(decisionString, "]"); lastBracket != -1 { - decisionString = decisionString[:lastBracket+1] - } + // Parse decisions using the refactored helper function + mappedDecisions, parsingErr := parseAgentDecisions(choicesString) - errorMessage := "" - mappedDecisions := []AgentDecision{} - err = json.Unmarshal([]byte(decisionString), &mappedDecisions) - if err != nil { + if len(mappedDecisions) == 0 { + if parsingErr == nil { + parsingErr = errors.New("no valid AgentDecision array or objects found in output") + } if !strings.Contains(decisionString, conditionText) { - log.Printf("[ERROR][%s] AI Agent (5): Failed unmarshalling decisions in AI Agent response: %s", execution.ExecutionId, err) + log.Printf("[ERROR][%s] AI Agent (6): Failed parsing decisions in AI Agent response: %s. String: %s", execution.ExecutionId, parsingErr, decisionString) } - - if len(mappedDecisions) == 0 { - decisionString = strings.Replace(decisionString, `\"`, `"`, -1) - - err = json.Unmarshal([]byte(decisionString), &mappedDecisions) - if err != nil && !strings.Contains(decisionString, conditionText) { - log.Printf("[ERROR][%s] AI Agent (6): Failed unmarshalling decisions in AI Agent response (2): %s. String: %s", execution.ExecutionId, err, decisionString) - - // Updating the OUTPUT in some way to help the user a bit. - if strings.Contains(decisionString, "conditions must be correct") { - errorMessage = fmt.Sprintf("Condition failed. See decision_string for details") - resultMapping.Status = "SKIPPED" - } else { - resultMapping.Status = "FAILURE" - errorMessage = fmt.Sprintf("The output from the LLM had no decisions. See the raw decisions tring for the response. Contact support@shuffler.io if you think this is wrong.") - } - } + // Updating the OUTPUT in some way to help the user a bit. + if strings.Contains(decisionString, "conditions must be correct") { + errorMessage = fmt.Sprintf("Condition failed. See decision_string for details") + resultMapping.Status = "SKIPPED" + } else { + resultMapping.Status = "FAILURE" + errorMessage = fmt.Sprintf("The output from the LLM had no decisions. See the raw decisions tring for the response. Contact support@shuffler.io if you think this is wrong.") } } diff --git a/streaming.go b/streaming.go index 1e6ce989..ae7bf7e6 100644 --- a/streaming.go +++ b/streaming.go @@ -92,12 +92,12 @@ func HandleStreamWorkflowUpdate(resp http.ResponseWriter, request *http.Request) } } - org, err := GetOrg(ctx, workflow.OrgId) - if err != nil || !org.SyncFeatures.Multiplayer.Active { - resp.WriteHeader(403) - resp.Write([]byte(`{"success": false}`)) - return - } + _, _ = GetOrg(ctx, workflow.OrgId) + // if err != nil || !org.SyncFeatures.Multiplayer.Active { + // resp.WriteHeader(403) + // resp.Write([]byte(`{"success": false}`)) + // return + // } body, err := io.ReadAll(request.Body) if err != nil { @@ -278,12 +278,12 @@ func HandleStreamWorkflow(resp http.ResponseWriter, request *http.Request) { } } - org, err := GetOrg(ctx, workflow.OrgId) - if err != nil || !org.SyncFeatures.Multiplayer.Active { - resp.WriteHeader(403) - resp.Write([]byte(`{"success": false}`)) - return - } + _, _ = GetOrg(ctx, workflow.OrgId) + // if err != nil || !org.SyncFeatures.Multiplayer.Active { + // resp.WriteHeader(403) + // resp.Write([]byte(`{"success": false}`)) + // return + // } resp.Header().Set("Connection", "Keep-Alive") resp.Header().Set("X-Content-Type-Options", "nosniff") @@ -522,12 +522,12 @@ func HandleStreamWorkflowHistory(resp http.ResponseWriter, request *http.Request } } - org, err := GetOrg(ctx, workflow.OrgId) - if err != nil || !org.SyncFeatures.Multiplayer.Active { - resp.WriteHeader(403) - resp.Write([]byte(`{"success": false}`)) - return - } + _, _ = GetOrg(ctx, workflow.OrgId) + // if err != nil || !org.SyncFeatures.Multiplayer.Active { + // resp.WriteHeader(403) + // resp.Write([]byte(`{"success": false}`)) + // return + // } sessionKey := fmt.Sprintf("%s_stream", workflow.ID) var state StreamWorkflowState From e55e18c1a151ec733355f28b2b2dfdfd62e24589 Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Mon, 29 Jun 2026 16:23:22 +0530 Subject: [PATCH 05/12] fix: add continuation message handling for AI agent execution --- ai.go | 44 +++++++++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/ai.go b/ai.go index 87749a25..9bc542f3 100644 --- a/ai.go +++ b/ai.go @@ -7918,6 +7918,7 @@ func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action, _ = oldActionResult oldAgentOutput := AgentOutput{} previousAnswers := "" + continuationMessage := "" // Tracks user continuation text (new message sent to a finished agent) marshalledDecisions := []byte{} if createNextActions == true { @@ -8104,10 +8105,10 @@ func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action, for _, field := range mappedDecision.Fields { if field.Key == "continue" && len(field.Answer) > 0 { if debug { - log.Printf("[DEBUG][%s] AI Agent continuation: overriding userMessage with 'continue' answer (length=%d)", execution.ExecutionId, len(field.Answer)) + log.Printf("[DEBUG][%s] AI Agent continuation: found 'continue' answer (length=%d); keeping original userMessage, adding as continuationMessage", execution.ExecutionId, len(field.Answer)) } - userMessage = field.Answer + continuationMessage = field.Answer foundContinuation = true break } @@ -8382,9 +8383,10 @@ You are an Action Execution Agent that performs actions in third-party tools. Yo ### INPUT PROTOCOL 1. **USER CONTEXT:** Available actions/tools. -2. **USER REQUEST:** Task to process. -3. **USER ANSWERS:** Explicit answers already provided by the user to prior agent questions. Treat these as authoritative context. -4. **HISTORY:** JSON list of previous executions (Newest First). +2. **ORIGINAL REQUEST (optional):** The user's prior request from this session, already completed. Visible in HISTORY. Use for context only — do NOT re-execute it. +3. **USER REQUEST:** The current task to complete. PHASE 1 checks THIS against HISTORY. +4. **USER ANSWERS:** Explicit answers already provided by the user to prior agent questions. Treat these as authoritative context. +5. **HISTORY:** JSON list of previous executions (Newest First). ### PHASE 1: COMPLETION CHECK (HIGHEST PRIORITY) **Compare the "USER REQUEST" against the "HISTORY".** @@ -8560,12 +8562,28 @@ data_filter: } } - // Fix e.g. injected JSON and other quote/newline mechanics that aren't compatible - // Problem: The input data itself can be a reference. - completionRequest.Messages = append(completionRequest.Messages, openai.ChatCompletionMessage{ - Role: openai.ChatMessageRoleUser, - Content: fmt.Sprintf("USER REQUEST: %s", userMessage), - }) + // Build the USER REQUEST message. + // For a normal run: USER REQUEST = the original user input. + // For a continuation (user sent a follow-up to a finished agent): the continuation is the live task that PHASE 1 should check against. The original question goes in as read-only context so the LLM knows the prior topic without re-executing it. + if len(continuationMessage) > 0 { + // Continuation run: new message is the actual task + if len(userMessage) > 0 { + completionRequest.Messages = append(completionRequest.Messages, openai.ChatCompletionMessage{ + Role: openai.ChatMessageRoleUser, + Content: fmt.Sprintf("ORIGINAL REQUEST (already completed, visible in HISTORY): %s", userMessage), + }) + } + completionRequest.Messages = append(completionRequest.Messages, openai.ChatCompletionMessage{ + Role: openai.ChatMessageRoleUser, + Content: fmt.Sprintf("USER REQUEST: %s", continuationMessage), + }) + } else { + // Normal run: original input is the task + completionRequest.Messages = append(completionRequest.Messages, openai.ChatCompletionMessage{ + Role: openai.ChatMessageRoleUser, + Content: fmt.Sprintf("USER REQUEST: %s", userMessage), + }) + } if len(previousAnswers) > 0 { completionRequest.Messages = append(completionRequest.Messages, openai.ChatCompletionMessage{ @@ -8574,8 +8592,8 @@ data_filter: }) } - if len(marshalledDecisions) > 4 { - completionRequest.Messages = append(completionRequest.Messages, openai.ChatCompletionMessage { + if len(marshalledDecisions) > 4 { + completionRequest.Messages = append(completionRequest.Messages, openai.ChatCompletionMessage{ Role: openai.ChatMessageRoleUser, Content: fmt.Sprintf("HISTORY:\n%s", string(marshalledDecisions)), }) From 991b6437929ddeae463a2f2c708d8a73ff112edd Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Mon, 29 Jun 2026 16:27:31 +0530 Subject: [PATCH 06/12] fix: update duration calculation and logging for AI agent execution --- ai.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/ai.go b/ai.go index 9bc542f3..755e0b6a 100644 --- a/ai.go +++ b/ai.go @@ -9584,14 +9584,14 @@ data_filter: } if !foundResult { - // duration := int64(0) - // if agentOutput.StartedAt > 0 && agentOutput.CompletedAt > 0 { - // duration = agentOutput.CompletedAt - agentOutput.StartedAt - // } else if agentOutput.StartedAt > 0 { - // duration = time.Now().Unix() - agentOutput.StartedAt - // } + duration := int64(0) + if agentOutput.StartedAt > 0 && agentOutput.CompletedAt > 0 { + duration = (agentOutput.CompletedAt - agentOutput.StartedAt) / 1000 + } else if agentOutput.StartedAt > 0 { + duration = (time.Now().UnixMilli() - agentOutput.StartedAt) / 1000 + } - // log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=SUCCESS duration=%ds decisions=%d llm_calls=%d tokens_used=%d", execution.ExecutionId, execution.Workflow.OrgId, duration, len(agentOutput.Decisions), agentOutput.LLMCallCount, agentOutput.TotalTokens) + log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=FINISHED duration=%ds tool_calls=%d llm_calls=%d prompt_tokens=%d completion_tokens=%d total_tokens=%d", execution.ExecutionId, execution.Workflow.OrgId, duration, len(agentOutput.Decisions), agentOutput.LLMCallCount, agentOutput.PromptTokens, agentOutput.CompletionTokens, agentOutput.TotalTokens) } } From d213856713d59614fc323d2d450cf4039eb40af4 Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Mon, 29 Jun 2026 16:30:21 +0530 Subject: [PATCH 07/12] fix: enhance caching and result handling in agent decision processing --- shared.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/shared.go b/shared.go index 0f7eb81e..7487f418 100644 --- a/shared.go +++ b/shared.go @@ -18157,15 +18157,27 @@ func handleAgentDecisionStreamResult(workflowExecution WorkflowExecution, action log.Printf("[INFO][%s] Agent continuation: resetting execution status from '%s' to 'EXECUTING' for continuation", workflowExecution.ExecutionId, workflowExecution.Status) workflowExecution.Status = "EXECUTING" workflowExecution.CompletedAt = 0 + } - executionCacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId) - DeleteCache(ctx, executionCacheKey) - marshalledExec, marshalErr := json.Marshal(workflowExecution) - if marshalErr == nil { - SetCache(ctx, executionCacheKey, marshalledExec, 30) + if foundActionResultIndex >= 0 && foundActionResultIndex < len(workflowExecution.Results) { + if marshalledResult, marshalErr := json.Marshal(mappedResult); marshalErr == nil { + workflowExecution.Results[foundActionResultIndex].Result = string(marshalledResult) + + // push to the action result cache so GetWorkflowExecution inside HandleAiAgentExecutionStart picks up the fresh copy. + actionCacheId := fmt.Sprintf("%s_%s_result", workflowExecution.ExecutionId, actionResult.Action.ID) + go SetCache(ctx, actionCacheId, marshalledResult, 35) + } else { + log.Printf("[WARNING][%s] Failed to marshal updated mappedResult before HandleAiAgentExecutionStart: %s", workflowExecution.ExecutionId, marshalErr) } } + // so the GetWorkflowExecution fetch inside HandleAiAgentExecutionStart gets the fresh copy. + executionCacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId) + DeleteCache(ctx, executionCacheKey) + if marshalledExec, marshalErr := json.Marshal(workflowExecution); marshalErr == nil { + SetCache(ctx, executionCacheKey, marshalledExec, 30) + } + callerName := "handleAgentDecisionStreamResult" returnAction, err := HandleAiAgentExecutionStart(workflowExecution, originalAction, true, callerName) if err != nil { From 1f36590990ddd11ea92d645ea8686154cc8b46a6 Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Mon, 29 Jun 2026 16:39:16 +0530 Subject: [PATCH 08/12] disable streaming temporarily --- shared.go | 298 +++++++++++++++++++++++++++--------------------------- 1 file changed, 149 insertions(+), 149 deletions(-) diff --git a/shared.go b/shared.go index 7487f418..ca5150e8 100644 --- a/shared.go +++ b/shared.go @@ -38275,153 +38275,153 @@ func enrichTriggerFromApp(minTrig *MinimalTrigger, environment string) (Trigger, } } -func broadcastBatchToStream(wf *Workflow, operations []WorkflowOperation, tempIDMap map[string]string, userID string, username string, authHeader string) { - if len(operations) == 0 { - return - } - - if len(userID) == 0 { - userID = "agent" - } - if len(username) == 0 { - username = "agent" - } - - var streamOps []StreamWorkflowOperation - - for _, operation := range operations { - item := "node" // default - opType := "" - switch operation.Op { - case "add_node": - item = "node" - opType = "add" - case "move_node": - item = "node" - opType = "move" - case "edit_node": - item = "node" - opType = "configure" - case "delete_node": - item = "node" - opType = "remove" - case "add_branch": - item = "branch" - opType = "add" - case "edit_branch": - item = "branch" - opType = "configure" - case "delete_branch": - item = "branch" - opType = "remove" - case "save_workflow": - item = "workflow" - opType = "save" - case "set_start_node": - item = "workflow" - opType = "configure" - default: - item = "node" - opType = operation.Op - } - - opID := operation.ID - if realID, exists := tempIDMap[operation.ID]; exists { - opID = realID - } else if realID, exists := tempIDMap[operation.TempID]; exists { - opID = realID - } - - // Extract the ENRICHED node/branch from the workflow instead of using the Minimal payload - // We only need the fully enriched data for CREATING nodes/branches. - // For edits or moves, we just pass the partial payload the agent sent so the UI can patch it locally. - var enrichedData interface{} - if operation.Op == "add_node" { - if operation.NodeType == "action" { - if idx := findActionIndexByID(wf, opID); idx != -1 { - enrichedData = wf.Actions[idx] - } - } else if operation.NodeType == "trigger" { - if idx := findTriggerIndexByID(wf, opID); idx != -1 { - enrichedData = wf.Triggers[idx] - } - } - } else if operation.Op == "add_branch" { - if idx := findBranchIndexByID(wf, opID); idx != -1 { - enrichedData = wf.Branches[idx] - } - } - - var finalData []byte - if enrichedData != nil { - finalData, _ = json.Marshal(enrichedData) - } else { - // Fallback for delete ops where the node is already removed from wf, or if not found - finalData = operation.Data - } - - streamOps = append(streamOps, StreamWorkflowOperation{ - Item: item, - Type: opType, - ID: opID, - UserID: userID, - Username: username, - Data: finalData, - Timestamp: time.Now().UnixMilli(), - }) - } - - // Marshal to JSON array - payload, err := json.Marshal(streamOps) - if err != nil { - log.Printf("[WARNING] Failed to marshal stream operations for workflow %s: %s", wf.ID, err) - return - } - - baseURL := os.Getenv("BASE_URL") - if len(baseURL) == 0 { - if len(os.Getenv("SHUFFLE_CLOUDRUN_URL")) > 0 { - baseURL = os.Getenv("SHUFFLE_CLOUDRUN_URL") - } else { - port := os.Getenv("PORT") - if len(port) == 0 { - port = "5001" - } - baseURL = fmt.Sprintf("http://localhost:%s", port) - } - } - - streamURL := fmt.Sprintf("%s/api/v1/workflows/%s/stream", baseURL, wf.ID) - - // Create HTTP POST request - req, err := http.NewRequest("POST", streamURL, strings.NewReader(string(payload))) - if err != nil { - log.Printf("[WARNING] Failed to create stream request for workflow %s: %s", wf.ID, err) - return - } - - // Set headers - req.Header.Set("Content-Type", "application/json") - if len(authHeader) > 0 { - req.Header.Set("Authorization", authHeader) - } - - // Make request with timeout - client := &http.Client{Timeout: 10 * time.Second} - resp, err := client.Do(req) - if err != nil { - log.Printf("[WARNING] Failed to broadcast to stream for workflow %s: %s", wf.ID, err) - return - } - defer resp.Body.Close() - - // Log result - if resp.StatusCode >= 200 && resp.StatusCode < 300 { - log.Printf("[DEBUG] Streamed %d operations to workflow %s", len(streamOps), wf.ID) - } else { - log.Printf("[WARNING] Stream endpoint returned status %d for workflow %s", resp.StatusCode, wf.ID) - } -} +// func broadcastBatchToStream(wf *Workflow, operations []WorkflowOperation, tempIDMap map[string]string, userID string, username string, authHeader string) { +// if len(operations) == 0 { +// return +// } + +// if len(userID) == 0 { +// userID = "agent" +// } +// if len(username) == 0 { +// username = "agent" +// } + +// var streamOps []StreamWorkflowOperation + +// for _, operation := range operations { +// item := "node" // default +// opType := "" +// switch operation.Op { +// case "add_node": +// item = "node" +// opType = "add" +// case "move_node": +// item = "node" +// opType = "move" +// case "edit_node": +// item = "node" +// opType = "configure" +// case "delete_node": +// item = "node" +// opType = "remove" +// case "add_branch": +// item = "branch" +// opType = "add" +// case "edit_branch": +// item = "branch" +// opType = "configure" +// case "delete_branch": +// item = "branch" +// opType = "remove" +// case "save_workflow": +// item = "workflow" +// opType = "save" +// case "set_start_node": +// item = "workflow" +// opType = "configure" +// default: +// item = "node" +// opType = operation.Op +// } + +// opID := operation.ID +// if realID, exists := tempIDMap[operation.ID]; exists { +// opID = realID +// } else if realID, exists := tempIDMap[operation.TempID]; exists { +// opID = realID +// } + +// // Extract the ENRICHED node/branch from the workflow instead of using the Minimal payload +// // We only need the fully enriched data for CREATING nodes/branches. +// // For edits or moves, we just pass the partial payload the agent sent so the UI can patch it locally. +// var enrichedData interface{} +// if operation.Op == "add_node" { +// if operation.NodeType == "action" { +// if idx := findActionIndexByID(wf, opID); idx != -1 { +// enrichedData = wf.Actions[idx] +// } +// } else if operation.NodeType == "trigger" { +// if idx := findTriggerIndexByID(wf, opID); idx != -1 { +// enrichedData = wf.Triggers[idx] +// } +// } +// } else if operation.Op == "add_branch" { +// if idx := findBranchIndexByID(wf, opID); idx != -1 { +// enrichedData = wf.Branches[idx] +// } +// } + +// var finalData []byte +// if enrichedData != nil { +// finalData, _ = json.Marshal(enrichedData) +// } else { +// // Fallback for delete ops where the node is already removed from wf, or if not found +// finalData = operation.Data +// } + +// streamOps = append(streamOps, StreamWorkflowOperation{ +// Item: item, +// Type: opType, +// ID: opID, +// UserID: userID, +// Username: username, +// Data: finalData, +// Timestamp: time.Now().UnixMilli(), +// }) +// } + +// // Marshal to JSON array +// payload, err := json.Marshal(streamOps) +// if err != nil { +// log.Printf("[WARNING] Failed to marshal stream operations for workflow %s: %s", wf.ID, err) +// return +// } + +// baseURL := os.Getenv("BASE_URL") +// if len(baseURL) == 0 { +// if len(os.Getenv("SHUFFLE_CLOUDRUN_URL")) > 0 { +// baseURL = os.Getenv("SHUFFLE_CLOUDRUN_URL") +// } else { +// port := os.Getenv("PORT") +// if len(port) == 0 { +// port = "5001" +// } +// baseURL = fmt.Sprintf("http://localhost:%s", port) +// } +// } + +// streamURL := fmt.Sprintf("%s/api/v1/workflows/%s/stream", baseURL, wf.ID) + +// // Create HTTP POST request +// req, err := http.NewRequest("POST", streamURL, strings.NewReader(string(payload))) +// if err != nil { +// log.Printf("[WARNING] Failed to create stream request for workflow %s: %s", wf.ID, err) +// return +// } + +// // Set headers +// req.Header.Set("Content-Type", "application/json") +// if len(authHeader) > 0 { +// req.Header.Set("Authorization", authHeader) +// } + +// // Make request with timeout +// client := &http.Client{Timeout: 10 * time.Second} +// resp, err := client.Do(req) +// if err != nil { +// log.Printf("[WARNING] Failed to broadcast to stream for workflow %s: %s", wf.ID, err) +// return +// } +// defer resp.Body.Close() + +// // Log result +// if resp.StatusCode >= 200 && resp.StatusCode < 300 { +// log.Printf("[DEBUG] Streamed %d operations to workflow %s", len(streamOps), wf.ID) +// } else { +// log.Printf("[WARNING] Stream endpoint returned status %d for workflow %s", resp.StatusCode, wf.ID) +// } +// } func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { cors := HandleCors(resp, request) @@ -38645,8 +38645,8 @@ func HandleAgentWorkflowSave(resp http.ResponseWriter, request *http.Request) { // Broadcast operations to stream endpoint (agent gets response immediately, streaming happens in background) // Extract auth header from incoming request to pass to stream endpoint - authHeader := request.Header.Get("Authorization") - go broadcastBatchToStream(workflow, setOpsReq.Operations, tempIDMap, "agent", "Agent", authHeader) + // authHeader := request.Header.Get("Authorization") + // go broadcastBatchToStream(workflow, setOpsReq.Operations, tempIDMap, "agent", "Agent", authHeader) if debug{ log.Printf("[INFO] Applied %d operations to workflow %s for user %s", len(setOpsReq.Operations), workflowID, user.Username) From 2b4dbb4100d6c94ff2b3d389942dd961af7f6e4d Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Mon, 29 Jun 2026 17:14:29 +0530 Subject: [PATCH 09/12] undo the changes in streaming.go --- streaming.go | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/streaming.go b/streaming.go index ae7bf7e6..1e6ce989 100644 --- a/streaming.go +++ b/streaming.go @@ -92,12 +92,12 @@ func HandleStreamWorkflowUpdate(resp http.ResponseWriter, request *http.Request) } } - _, _ = GetOrg(ctx, workflow.OrgId) - // if err != nil || !org.SyncFeatures.Multiplayer.Active { - // resp.WriteHeader(403) - // resp.Write([]byte(`{"success": false}`)) - // return - // } + org, err := GetOrg(ctx, workflow.OrgId) + if err != nil || !org.SyncFeatures.Multiplayer.Active { + resp.WriteHeader(403) + resp.Write([]byte(`{"success": false}`)) + return + } body, err := io.ReadAll(request.Body) if err != nil { @@ -278,12 +278,12 @@ func HandleStreamWorkflow(resp http.ResponseWriter, request *http.Request) { } } - _, _ = GetOrg(ctx, workflow.OrgId) - // if err != nil || !org.SyncFeatures.Multiplayer.Active { - // resp.WriteHeader(403) - // resp.Write([]byte(`{"success": false}`)) - // return - // } + org, err := GetOrg(ctx, workflow.OrgId) + if err != nil || !org.SyncFeatures.Multiplayer.Active { + resp.WriteHeader(403) + resp.Write([]byte(`{"success": false}`)) + return + } resp.Header().Set("Connection", "Keep-Alive") resp.Header().Set("X-Content-Type-Options", "nosniff") @@ -522,12 +522,12 @@ func HandleStreamWorkflowHistory(resp http.ResponseWriter, request *http.Request } } - _, _ = GetOrg(ctx, workflow.OrgId) - // if err != nil || !org.SyncFeatures.Multiplayer.Active { - // resp.WriteHeader(403) - // resp.Write([]byte(`{"success": false}`)) - // return - // } + org, err := GetOrg(ctx, workflow.OrgId) + if err != nil || !org.SyncFeatures.Multiplayer.Active { + resp.WriteHeader(403) + resp.Write([]byte(`{"success": false}`)) + return + } sessionKey := fmt.Sprintf("%s_stream", workflow.ID) var state StreamWorkflowState From 8fcd0af46bc7e9d076217a0877ab9db8f293410c Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Tue, 30 Jun 2026 10:48:11 +0530 Subject: [PATCH 10/12] improve the readability of the code --- ai.go | 145 +++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 94 insertions(+), 51 deletions(-) diff --git a/ai.go b/ai.go index 755e0b6a..68a7d3bc 100644 --- a/ai.go +++ b/ai.go @@ -1678,52 +1678,65 @@ func parseAgentDecisions(rawOutput string) ([]AgentDecision, error) { cleaned := FixContentOutput(rawOutput) // If its a JSON array - if decisions, err := extractDecisionArray(cleaned); err == nil { + decisions, err := extractDecisionArray(cleaned) + if err == nil { return decisions, nil } // JSONL (one object per occurrence) ?? - if decisions, err := extractDecisionJSONL(cleaned); err == nil { + decisions, err = extractDecisionJSONL(cleaned) + if err == nil { return decisions, nil } // last resort: unescape, then retry array unescaped := strings.ReplaceAll(cleaned, `\"`, `"`) - if decisions, err := extractDecisionArray(unescaped); err == nil { + decisions, err = extractDecisionArray(unescaped) + if err == nil { return decisions, nil } return nil, fmt.Errorf("failed to parse agent decisions from LLM output") } -// extractDecisionArray finds the first '[' that decodes into a valid decision array. +// extractDecisionArray scans s for the first '[' that decodes into a valid decision array. It uses a raw-map intermediate step so we can fix field types before unmarshalling into the strict AgentDecision struct (whose Value field must be a string). func extractDecisionArray(s string) ([]AgentDecision, error) { for i := 0; i < len(s); i++ { - if s[i] != '[' { + char := s[i] + if char != '[' { continue } - // So lets decode into raw maps so we can inspect and mutate it var raw []map[string]interface{} - if err := json.NewDecoder(strings.NewReader(s[i:])).Decode(&raw); err != nil { + reader := strings.NewReader(s[i:]) + decoder := json.NewDecoder(reader) + decodeErr := decoder.Decode(&raw) + if decodeErr != nil { continue } - if len(raw) == 0 || raw[0]["action"] == nil { + if len(raw) == 0 { continue } - // Fix the types (convert numbers/bools to strings) - normalizeDecisionFields(raw) + firstItem := raw[0] + firstActionVal, firstActionExists := firstItem["action"] + if !firstActionExists || firstActionVal == nil { + continue + } - //Round-trip: Marshal the fixed map back to bytes, then unmarshal into the strict struct - b, err := json.Marshal(raw) - if err != nil { + for idx := range raw { + raw[idx] = normalizeDecisionFields(raw[idx]) + } + + b, marshalErr := json.Marshal(raw) + if marshalErr != nil { continue } var decisions []AgentDecision - if err := json.Unmarshal(b, &decisions); err != nil { + unmarshalErr := json.Unmarshal(b, &decisions) + if unmarshalErr != nil { continue } @@ -1733,38 +1746,54 @@ func extractDecisionArray(s string) ([]AgentDecision, error) { return nil, fmt.Errorf("no valid JSON array found") } -// extractDecisionJSONL collects every top-level '{...}' that looks like a decision. +// extractDecisionJSONL collects every top-level '{...}' object in s that looks like a decision. This handles LLM output where decisions are emitted as separate JSON objects rather than an array. func extractDecisionJSONL(s string) ([]AgentDecision, error) { var out []AgentDecision - for i := 0; i < len(s); { - if s[i] != '{' { + i := 0 + for i < len(s) { + char := s[i] + if char != '{' { i++ continue } - r := strings.NewReader(s[i:]) - dec := json.NewDecoder(r) + reader := strings.NewReader(s[i:]) + dec := json.NewDecoder(reader) var raw map[string]interface{} - if err := dec.Decode(&raw); err != nil { + decodeErr := dec.Decode(&raw) + + consumed := int(dec.InputOffset()) + if consumed <= 0 { i++ + } else { + i += consumed + } + + if decodeErr != nil { continue } - if raw["action"] != nil { - normalizeDecisionFields([]map[string]interface{}{raw}) - b, err := json.Marshal(raw) - if err == nil { - var one AgentDecision - if err := json.Unmarshal(b, &one); err == nil { - out = append(out, one) - } - } + actionVal, actionExists := raw["action"] + if !actionExists || actionVal == nil { + continue + } + + raw = normalizeDecisionFields(raw) + + b, marshalErr := json.Marshal(raw) + if marshalErr != nil { + continue } - // Jump past the object we just consumed to avoid parsing nested braces - i += int(dec.InputOffset()) + var one AgentDecision + unmarshalErr := json.Unmarshal(b, &one) + if unmarshalErr != nil { + continue + } + + out = append(out, one) } if len(out) == 0 { @@ -1774,33 +1803,47 @@ func extractDecisionJSONL(s string) ([]AgentDecision, error) { return out, nil } -// normalizeDecisionFields converts non-string field["value"] entries into JSON strings. -// This MUST happen before unmarshalling into AgentDecision (whose Value field is a string). -func normalizeDecisionFields(decisions []map[string]interface{}) { - for _, d := range decisions { - fields, ok := d["fields"].([]interface{}) - if !ok { +// normalizeDecisionFields converts non-string field["value"] entries into JSON strings. This MUST happen before unmarshalling into AgentDecision (whose Value field is a string). +func normalizeDecisionFields(d map[string]interface{}) map[string]interface{} { + //check "fields" key exists at all. + rawFields, fieldsExist := d["fields"] + if !fieldsExist { + return d + } + + fields, fieldsOk := rawFields.([]interface{}) + if !fieldsOk { + return d + } + + for _, item := range fields { + fm, itemOk := item.(map[string]interface{}) + if !itemOk { continue } - for _, f := range fields { - fm, ok := f.(map[string]interface{}) - if !ok { - continue - } + rawValue, valueExists := fm["value"] + if !valueExists { + continue + } - v, ok := fm["value"] - if !ok || v == nil { - continue - } + if rawValue == nil { + continue + } - if _, isStr := v.(string); !isStr { - if b, err := json.Marshal(v); err == nil { - fm["value"] = string(b) - } - } + _, isString := rawValue.(string) + if isString { + continue } + + b, marshalErr := json.Marshal(rawValue) + if marshalErr != nil { + continue + } + fm["value"] = string(b) } + + return d } func AutofixAppLabels(ctx context.Context, app WorkflowApp, label string, keys []string) (WorkflowApp, WorkflowAppAction) { From 1ee76843f6cc7b16a290fbed82a8f862ed1fe9de Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Tue, 30 Jun 2026 23:01:08 +0530 Subject: [PATCH 11/12] replace map with strict type check --- ai.go | 197 +++++++++++++++++++++++++--------------------------------- 1 file changed, 84 insertions(+), 113 deletions(-) diff --git a/ai.go b/ai.go index 68a7d3bc..5ec9866b 100644 --- a/ai.go +++ b/ai.go @@ -1673,177 +1673,148 @@ func balanceJSONLikeString(s string) string { return string(result) } -// extracts AgentDecision structs from messy LLM output. It tries multiple strategies to handle various output formats. +func normalizeRawDecision(decision *rawDecision) { + for fieldIndex, field := range decision.Fields { + if field.Value == nil { + continue + } + + _, isAlreadyString := field.Value.(string) + if !isAlreadyString { + marshaledValueBytes, marshalErr := json.Marshal(field.Value) + if marshalErr == nil { + decision.Fields[fieldIndex].Value = string(marshaledValueBytes) + } + } + } +} + +// parseAgentDecisions extracts AgentDecision structs from messy LLM output. It tries multiple strategies in order: JSON array, JSONL, then array after unescaping. func parseAgentDecisions(rawOutput string) ([]AgentDecision, error) { - cleaned := FixContentOutput(rawOutput) + cleanedText := FixContentOutput(rawOutput) - // If its a JSON array - decisions, err := extractDecisionArray(cleaned) - if err == nil { - return decisions, nil + //Try to parse as a JSON array + parsedDecisions, extractionErr := extractDecisionArray(cleanedText) + if extractionErr == nil { + return parsedDecisions, nil } - // JSONL (one object per occurrence) ?? - decisions, err = extractDecisionJSONL(cleaned) - if err == nil { - return decisions, nil + // Try to parse as JSONL (one object per occurrence) + parsedDecisions, extractionErr = extractDecisionJSONL(cleanedText) + if extractionErr == nil { + return parsedDecisions, nil } - // last resort: unescape, then retry array - unescaped := strings.ReplaceAll(cleaned, `\"`, `"`) - decisions, err = extractDecisionArray(unescaped) - if err == nil { - return decisions, nil + // Last resort: unescape quotes, then retry the array strategy + unescapedText := strings.ReplaceAll(cleanedText, `\"`, `"`) + parsedDecisions, extractionErr = extractDecisionArray(unescapedText) + if extractionErr == nil { + return parsedDecisions, nil } return nil, fmt.Errorf("failed to parse agent decisions from LLM output") } -// extractDecisionArray scans s for the first '[' that decodes into a valid decision array. It uses a raw-map intermediate step so we can fix field types before unmarshalling into the strict AgentDecision struct (whose Value field must be a string). -func extractDecisionArray(s string) ([]AgentDecision, error) { - for i := 0; i < len(s); i++ { - char := s[i] - if char != '[' { +// extractDecisionArray scans the text for the first '[' that successfully decodes into a valid array of decisions. +func extractDecisionArray(rawText string) ([]AgentDecision, error) { + for byteIndex := 0; byteIndex < len(rawText); byteIndex++ { + currentByte := rawText[byteIndex] + if currentByte != '[' { continue } - var raw []map[string]interface{} - reader := strings.NewReader(s[i:]) - decoder := json.NewDecoder(reader) - decodeErr := decoder.Decode(&raw) + // Decode the JSON starting from this '[' into our raw structs + var decodedRawDecisions []rawDecision + stringReader := strings.NewReader(rawText[byteIndex:]) + jsonDecoder := json.NewDecoder(stringReader) + decodeErr := jsonDecoder.Decode(&decodedRawDecisions) + if decodeErr != nil { - continue - } - - if len(raw) == 0 { - continue + continue } - firstItem := raw[0] - firstActionVal, firstActionExists := firstItem["action"] - if !firstActionExists || firstActionVal == nil { - continue + if len(decodedRawDecisions) == 0 || decodedRawDecisions[0].Action == "" { + continue } - for idx := range raw { - raw[idx] = normalizeDecisionFields(raw[idx]) + // Fix the data types (convert numbers/bools to strings) + for decisionIndex := range decodedRawDecisions { + normalizeRawDecision(&decodedRawDecisions[decisionIndex]) } - b, marshalErr := json.Marshal(raw) + marshaledJSONBytes, marshalErr := json.Marshal(decodedRawDecisions) if marshalErr != nil { continue } - - var decisions []AgentDecision - unmarshalErr := json.Unmarshal(b, &decisions) - if unmarshalErr != nil { + + var finalDecisions []AgentDecision + structUnmarshalErr := json.Unmarshal(marshaledJSONBytes, &finalDecisions) + if structUnmarshalErr != nil { continue } - - return decisions, nil + + return finalDecisions, nil } return nil, fmt.Errorf("no valid JSON array found") } -// extractDecisionJSONL collects every top-level '{...}' object in s that looks like a decision. This handles LLM output where decisions are emitted as separate JSON objects rather than an array. -func extractDecisionJSONL(s string) ([]AgentDecision, error) { - var out []AgentDecision +// extractDecisionJSONL scans the text for top-level '{' characters and extracts every valid JSON object that looks like a decision. +func extractDecisionJSONL(rawText string) ([]AgentDecision, error) { + var collectedDecisions []AgentDecision + byteIndex := 0 - i := 0 - for i < len(s) { - char := s[i] - if char != '{' { - i++ + for byteIndex < len(rawText) { + currentByte := rawText[byteIndex] + if currentByte != '{' { + byteIndex++ continue } - reader := strings.NewReader(s[i:]) - dec := json.NewDecoder(reader) + stringReader := strings.NewReader(rawText[byteIndex:]) + jsonDecoder := json.NewDecoder(stringReader) - var raw map[string]interface{} - decodeErr := dec.Decode(&raw) + var singleRawDecision rawDecision + decodeErr := jsonDecoder.Decode(&singleRawDecision) - consumed := int(dec.InputOffset()) - if consumed <= 0 { - i++ + // Calculate how many bytes the decoder consumed so we can skip past this object + bytesConsumedByDecoder := int(jsonDecoder.InputOffset()) + if bytesConsumedByDecoder <= 0 { + byteIndex++ } else { - i += consumed + byteIndex += bytesConsumedByDecoder } if decodeErr != nil { - continue + continue } - actionVal, actionExists := raw["action"] - if !actionExists || actionVal == nil { - continue + if singleRawDecision.Action == "" { + continue // Missing the required "action" key } - raw = normalizeDecisionFields(raw) + // Fix the data types + normalizeRawDecision(&singleRawDecision) - b, marshalErr := json.Marshal(raw) + marshaledJSONBytes, marshalErr := json.Marshal(singleRawDecision) if marshalErr != nil { continue } - var one AgentDecision - unmarshalErr := json.Unmarshal(b, &one) - if unmarshalErr != nil { + var finalDecision AgentDecision + structUnmarshalErr := json.Unmarshal(marshaledJSONBytes, &finalDecision) + if structUnmarshalErr != nil { continue } - out = append(out, one) + collectedDecisions = append(collectedDecisions, finalDecision) } - if len(out) == 0 { + if len(collectedDecisions) == 0 { return nil, fmt.Errorf("no valid JSONL objects found") } - return out, nil -} - -// normalizeDecisionFields converts non-string field["value"] entries into JSON strings. This MUST happen before unmarshalling into AgentDecision (whose Value field is a string). -func normalizeDecisionFields(d map[string]interface{}) map[string]interface{} { - //check "fields" key exists at all. - rawFields, fieldsExist := d["fields"] - if !fieldsExist { - return d - } - - fields, fieldsOk := rawFields.([]interface{}) - if !fieldsOk { - return d - } - - for _, item := range fields { - fm, itemOk := item.(map[string]interface{}) - if !itemOk { - continue - } - - rawValue, valueExists := fm["value"] - if !valueExists { - continue - } - - if rawValue == nil { - continue - } - - _, isString := rawValue.(string) - if isString { - continue - } - - b, marshalErr := json.Marshal(rawValue) - if marshalErr != nil { - continue - } - fm["value"] = string(b) - } - - return d + return collectedDecisions, nil } func AutofixAppLabels(ctx context.Context, app WorkflowApp, label string, keys []string) (WorkflowApp, WorkflowAppAction) { From 75ed23876ce890317d1e7933a03d34046ffd522b Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Wed, 1 Jul 2026 00:55:59 +0530 Subject: [PATCH 12/12] refactor: normalize raw decision fields and improve parsing logic --- ai.go | 122 +++++++++++++++++++++++++++++++++------------------------- 1 file changed, 69 insertions(+), 53 deletions(-) diff --git a/ai.go b/ai.go index 5ec9866b..a836f144 100644 --- a/ai.go +++ b/ai.go @@ -1673,73 +1673,56 @@ func balanceJSONLikeString(s string) string { return string(result) } -func normalizeRawDecision(decision *rawDecision) { - for fieldIndex, field := range decision.Fields { - if field.Value == nil { +// normalizeRawDecisionFields converts any non-string 'Value' into a JSON-encoded string. +func normalizeRawDecisionFields(fields []rawField) { + for fieldIndex := range fields { + if fields[fieldIndex].Value == nil { continue } - _, isAlreadyString := field.Value.(string) + _, isAlreadyString := fields[fieldIndex].Value.(string) if !isAlreadyString { - marshaledValueBytes, marshalErr := json.Marshal(field.Value) + marshaledValueBytes, marshalErr := json.Marshal(fields[fieldIndex].Value) if marshalErr == nil { - decision.Fields[fieldIndex].Value = string(marshaledValueBytes) + fields[fieldIndex].Value = string(marshaledValueBytes) } } } } -// parseAgentDecisions extracts AgentDecision structs from messy LLM output. It tries multiple strategies in order: JSON array, JSONL, then array after unescaping. -func parseAgentDecisions(rawOutput string) ([]AgentDecision, error) { - cleanedText := FixContentOutput(rawOutput) - - //Try to parse as a JSON array - parsedDecisions, extractionErr := extractDecisionArray(cleanedText) - if extractionErr == nil { - return parsedDecisions, nil - } - - // Try to parse as JSONL (one object per occurrence) - parsedDecisions, extractionErr = extractDecisionJSONL(cleanedText) - if extractionErr == nil { - return parsedDecisions, nil - } - - // Last resort: unescape quotes, then retry the array strategy - unescapedText := strings.ReplaceAll(cleanedText, `\"`, `"`) - parsedDecisions, extractionErr = extractDecisionArray(unescapedText) - if extractionErr == nil { - return parsedDecisions, nil - } - - return nil, fmt.Errorf("failed to parse agent decisions from LLM output") -} - // extractDecisionArray scans the text for the first '[' that successfully decodes into a valid array of decisions. func extractDecisionArray(rawText string) ([]AgentDecision, error) { for byteIndex := 0; byteIndex < len(rawText); byteIndex++ { - currentByte := rawText[byteIndex] - if currentByte != '[' { + if rawText[byteIndex] != '[' { continue } - // Decode the JSON starting from this '[' into our raw structs - var decodedRawDecisions []rawDecision + var decodedRawDecisions []map[string]json.RawMessage stringReader := strings.NewReader(rawText[byteIndex:]) jsonDecoder := json.NewDecoder(stringReader) decodeErr := jsonDecoder.Decode(&decodedRawDecisions) - if decodeErr != nil { + if decodeErr != nil || len(decodedRawDecisions) == 0 { continue } - if len(decodedRawDecisions) == 0 || decodedRawDecisions[0].Action == "" { - continue + // Check if the first item has an "action" key + if _, hasAction := decodedRawDecisions[0]["action"]; !hasAction { + continue } - // Fix the data types (convert numbers/bools to strings) - for decisionIndex := range decodedRawDecisions { - normalizeRawDecision(&decodedRawDecisions[decisionIndex]) + for mapIndex, rawMap := range decodedRawDecisions { + if rawFields, hasFields := rawMap["fields"]; hasFields { + var fields []rawField + if unmarshalErr := json.Unmarshal(rawFields, &fields); unmarshalErr == nil { + normalizeRawDecisionFields(fields) + + fixedFieldsBytes, marshalErr := json.Marshal(fields) + if marshalErr == nil { + decodedRawDecisions[mapIndex]["fields"] = fixedFieldsBytes + } + } + } } marshaledJSONBytes, marshalErr := json.Marshal(decodedRawDecisions) @@ -1759,14 +1742,13 @@ func extractDecisionArray(rawText string) ([]AgentDecision, error) { return nil, fmt.Errorf("no valid JSON array found") } -// extractDecisionJSONL scans the text for top-level '{' characters and extracts every valid JSON object that looks like a decision. +// extractDecisionJSONL scans the text for top-level '{' characters and extracts every valid JSON object. func extractDecisionJSONL(rawText string) ([]AgentDecision, error) { var collectedDecisions []AgentDecision byteIndex := 0 for byteIndex < len(rawText) { - currentByte := rawText[byteIndex] - if currentByte != '{' { + if rawText[byteIndex] != '{' { byteIndex++ continue } @@ -1774,10 +1756,9 @@ func extractDecisionJSONL(rawText string) ([]AgentDecision, error) { stringReader := strings.NewReader(rawText[byteIndex:]) jsonDecoder := json.NewDecoder(stringReader) - var singleRawDecision rawDecision - decodeErr := jsonDecoder.Decode(&singleRawDecision) + var rawMap map[string]json.RawMessage + decodeErr := jsonDecoder.Decode(&rawMap) - // Calculate how many bytes the decoder consumed so we can skip past this object bytesConsumedByDecoder := int(jsonDecoder.InputOffset()) if bytesConsumedByDecoder <= 0 { byteIndex++ @@ -1789,14 +1770,23 @@ func extractDecisionJSONL(rawText string) ([]AgentDecision, error) { continue } - if singleRawDecision.Action == "" { - continue // Missing the required "action" key + if _, hasAction := rawMap["action"]; !hasAction { + continue } - // Fix the data types - normalizeRawDecision(&singleRawDecision) + // Fix the "fields" array if it exists + if rawFields, hasFields := rawMap["fields"]; hasFields { + var fields []rawField + if unmarshalErr := json.Unmarshal(rawFields, &fields); unmarshalErr == nil { + normalizeRawDecisionFields(fields) + fixedFieldsBytes, marshalErr := json.Marshal(fields) + if marshalErr == nil { + rawMap["fields"] = fixedFieldsBytes + } + } + } - marshaledJSONBytes, marshalErr := json.Marshal(singleRawDecision) + marshaledJSONBytes, marshalErr := json.Marshal(rawMap) if marshalErr != nil { continue } @@ -1817,6 +1807,32 @@ func extractDecisionJSONL(rawText string) ([]AgentDecision, error) { return collectedDecisions, nil } +// parseAgentDecisions extracts AgentDecision structs from messy LLM output. It tries multiple strategies in order: JSON array, JSONL, then array after unescaping. +func parseAgentDecisions(rawOutput string) ([]AgentDecision, error) { + cleanedText := FixContentOutput(rawOutput) + + //Try to parse as a JSON array + parsedDecisions, extractionErr := extractDecisionArray(cleanedText) + if extractionErr == nil { + return parsedDecisions, nil + } + + // Try to parse as JSONL (one object per occurrence) + parsedDecisions, extractionErr = extractDecisionJSONL(cleanedText) + if extractionErr == nil { + return parsedDecisions, nil + } + + // unescape quotes, then retry the array strategy + unescapedText := strings.ReplaceAll(cleanedText, `\"`, `"`) + parsedDecisions, extractionErr = extractDecisionArray(unescapedText) + if extractionErr == nil { + return parsedDecisions, nil + } + + return nil, fmt.Errorf("failed to parse agent decisions from LLM output") +} + func AutofixAppLabels(ctx context.Context, app WorkflowApp, label string, keys []string) (WorkflowApp, WorkflowAppAction) { standalone := os.Getenv("STANDALONE") == "true"