diff --git a/.changeset/workflows-terminate-rollback.md b/.changeset/workflows-terminate-rollback.md new file mode 100644 index 0000000000..509f634d91 --- /dev/null +++ b/.changeset/workflows-terminate-rollback.md @@ -0,0 +1,11 @@ +--- +"@cloudflare/workflows-shared": minor +"wrangler": minor +"miniflare": minor +--- + +Add rollback support when terminating Workflow instances + +`WorkflowInstance.terminate({ rollback: true })` now runs registered rollback handlers before marking a local Workflow instance as terminated. Wrangler also supports this via `wrangler workflows instances terminate --rollback`, including local mode. + +The rollback option is only sent for terminate operations and is rejected by the Local Explorer API for pause, resume, and restart actions. diff --git a/packages/miniflare/scripts/openapi-filter-config.ts b/packages/miniflare/scripts/openapi-filter-config.ts index 2b2341c212..9854f5bed4 100644 --- a/packages/miniflare/scripts/openapi-filter-config.ts +++ b/packages/miniflare/scripts/openapi-filter-config.ts @@ -1169,6 +1169,11 @@ const config = { }, }, }, + rollback: { + type: "boolean", + description: + "The option to trigger rollbacks when terminating the workflow instance.", + }, }, }, }, diff --git a/packages/miniflare/src/workers/local-explorer/generated/types.gen.ts b/packages/miniflare/src/workers/local-explorer/generated/types.gen.ts index 6ae0ff6ace..215f2aab52 100644 --- a/packages/miniflare/src/workers/local-explorer/generated/types.gen.ts +++ b/packages/miniflare/src/workers/local-explorer/generated/types.gen.ts @@ -1747,6 +1747,10 @@ export type WorkflowsChangeInstanceStatusData = { */ type?: "do" | "sleep" | "waitForEvent"; }; + /** + * The option to trigger rollbacks when terminating the workflow instance. + */ + rollback?: boolean; }; path: { workflow_name: WorkflowsWorkflowName; diff --git a/packages/miniflare/src/workers/local-explorer/generated/zod.gen.ts b/packages/miniflare/src/workers/local-explorer/generated/zod.gen.ts index d1372a39d3..a9a0769ff6 100644 --- a/packages/miniflare/src/workers/local-explorer/generated/zod.gen.ts +++ b/packages/miniflare/src/workers/local-explorer/generated/zod.gen.ts @@ -1090,6 +1090,7 @@ export const zWorkflowsChangeInstanceStatusData = z.object({ type: z.enum(["do", "sleep", "waitForEvent"]).optional(), }) .optional(), + rollback: z.boolean().optional(), }), path: z.object({ workflow_name: zWorkflowsWorkflowName, diff --git a/packages/miniflare/src/workers/local-explorer/openapi.local.json b/packages/miniflare/src/workers/local-explorer/openapi.local.json index a0b52a86db..47357fcdbb 100644 --- a/packages/miniflare/src/workers/local-explorer/openapi.local.json +++ b/packages/miniflare/src/workers/local-explorer/openapi.local.json @@ -1810,6 +1810,10 @@ "description": "The step type. Defaults to do." } } + }, + "rollback": { + "type": "boolean", + "description": "The option to trigger rollbacks when terminating the workflow instance." } } } diff --git a/packages/miniflare/src/workers/local-explorer/resources/workflows.ts b/packages/miniflare/src/workers/local-explorer/resources/workflows.ts index 620123740b..f626a48d5a 100644 --- a/packages/miniflare/src/workers/local-explorer/resources/workflows.ts +++ b/packages/miniflare/src/workers/local-explorer/resources/workflows.ts @@ -11,7 +11,10 @@ import type { WorkflowsWorkflow, } from "../generated"; import type { zWorkflowsListInstancesData } from "../generated/zod.gen"; -import type { RestartFromStep } from "@cloudflare/workflows-shared/src/binding"; +import type { + RestartFromStep, + WorkflowInstanceTerminateOptions, +} from "@cloudflare/workflows-shared/src/binding"; import type { z } from "zod"; // ============================================================================ @@ -35,7 +38,7 @@ interface WorkflowHandle { pause(): Promise; resume(): Promise; restart(options?: { from?: RestartFromStep }): Promise; - terminate(): Promise; + terminate(options?: WorkflowInstanceTerminateOptions): Promise; sendEvent(args: { payload: unknown; type: string }): Promise; status(): Promise<{ status: string; output?: unknown; error?: unknown }>; } @@ -886,6 +889,14 @@ export async function changeWorkflowInstanceStatus( ); } + if (body.rollback !== undefined && action !== "terminate") { + return errorResponse( + 400, + 10001, + "'rollback' is only valid when terminating." + ); + } + const handle = await workflow.get(instanceId); switch (action) { @@ -909,7 +920,10 @@ export async function changeWorkflowInstanceStatus( break; } case "terminate": - await handle.terminate(); + // TODO(vaish): remove cast once @cloudflare/workers-types ships terminate options + await (handle as unknown as WorkflowHandle).terminate( + body.rollback === true ? { rollback: true } : undefined + ); break; } diff --git a/packages/miniflare/src/workers/workflows/wrapped-binding.worker.ts b/packages/miniflare/src/workers/workflows/wrapped-binding.worker.ts index a725df4852..fdaa952106 100644 --- a/packages/miniflare/src/workers/workflows/wrapped-binding.worker.ts +++ b/packages/miniflare/src/workers/workflows/wrapped-binding.worker.ts @@ -1,6 +1,7 @@ import type { WorkflowBinding, WorkflowInstanceRestartOptions, + WorkflowInstanceTerminateOptions, } from "@cloudflare/workflows-shared/src/binding"; import type { WorkflowIntrospectionOperation } from "@cloudflare/workflows-shared/src/types"; @@ -104,9 +105,16 @@ class InstanceImpl implements WorkflowInstance { await instance.resume(); } - public async terminate(): Promise { + public async terminate( + options?: WorkflowInstanceTerminateOptions + ): Promise { using instance = await this.getInstance(); - await instance.terminate(); + // TODO(vaish): remove cast once @cloudflare/workers-types ships terminate options + await ( + instance.terminate as ( + options?: WorkflowInstanceTerminateOptions + ) => Promise + )(options); } public async restart( diff --git a/packages/workflows-shared/src/binding.ts b/packages/workflows-shared/src/binding.ts index dbaf53cd3d..9574eeb4c4 100644 --- a/packages/workflows-shared/src/binding.ts +++ b/packages/workflows-shared/src/binding.ts @@ -124,6 +124,13 @@ export interface RestartFromStep { type?: "do" | "sleep" | "waitForEvent"; } +export interface WorkflowInstanceTerminateOptions { + /** + * If true, run registered rollback handlers before terminating the instance. + */ + rollback?: boolean; +} + export interface WorkflowInstanceRestartOptions { from?: RestartFromStep; } @@ -357,9 +364,11 @@ export class WorkflowHandle extends RpcTarget implements WorkflowInstance { await this.stub.changeInstanceStatus("resume"); } - public async terminate(): Promise { + public async terminate( + options?: WorkflowInstanceTerminateOptions + ): Promise { try { - await this.stub.changeInstanceStatus("terminate"); + await this.stub.changeInstanceStatus("terminate", undefined, options); } catch (e) { // terminate causes instance abortion if (!isUserTriggeredTerminate(e)) { diff --git a/packages/workflows-shared/src/context.ts b/packages/workflows-shared/src/context.ts index 3d20639e99..cd88b6b423 100644 --- a/packages/workflows-shared/src/context.ts +++ b/packages/workflows-shared/src/context.ts @@ -17,7 +17,6 @@ import { import { calcRetryDuration } from "./lib/retries"; import { parseRollbackOptions, - registerRollbackFn, ROLLBACK_CACHE_KEY_PREFIX, } from "./lib/rollback"; import { @@ -247,7 +246,7 @@ export class Context extends RpcTarget { const { cacheKey, rollbackFn, stepContext, output, rollbackConfig } = options; if (rollbackFn && this.#rollbackStep === undefined) { - registerRollbackFn(this.#engine.rollbackRegistry, { + this.#engine.registerRollbackFn({ cacheKey, fn: rollbackFn, stepContext, @@ -297,6 +296,12 @@ export class Context extends RpcTarget { const { rollback: rollbackFn, rollbackConfig } = rollbackOptions ?? {}; const isRollback = this.#rollbackStep !== undefined; + if (this.#engine.rollbackPhase === "rollback" && !isRollback) { + throw new WorkflowFatalError( + "Cannot execute steps during rollback phase" + ); + } + const events = isRollback ? { start: InstanceEvent.ROLLBACK_STEP_START, @@ -937,7 +942,7 @@ export class Context extends RpcTarget { events.failure, cacheKey, stepNameWithCounter, - {} + !isRollback && rollbackFn ? { hasRollback: true } : {} ); this.#registerRollback({ cacheKey, @@ -1039,7 +1044,7 @@ export class Context extends RpcTarget { events.failure, cacheKey, stepNameWithCounter, - {} + !isRollback && rollbackFn ? { hasRollback: true } : {} ); this.#registerRollback({ cacheKey, @@ -1059,6 +1064,7 @@ export class Context extends RpcTarget { ...(lastStreamMeta && { streamOutput: { cacheKey, meta: lastStreamMeta }, }), + ...(!isRollback && rollbackFn ? { hasRollback: true } : {}), }); this.#registerRollback({ cacheKey, @@ -1073,13 +1079,21 @@ export class Context extends RpcTarget { const result = await doWrapper(closure); - // Check if a pause was requested while this step was running - await this.#checkForPendingPause(); + // Check if a pause was requested while this step was running. + // Rollback steps may run while the instance is paused; don't let stale pause + // state abort rollback cleanup after the rollback step itself succeeds. + if (!isRollback) { + await this.#checkForPendingPause(); + } return result; } async sleep(name: string, duration: WorkflowSleepDuration): Promise { + if (this.#engine.rollbackPhase === "replay") { + return; + } + if (typeof duration == "string") { duration = ms(duration); } @@ -1201,6 +1215,10 @@ export class Context extends RpcTarget { } async sleepUntil(name: string, timestamp: Date | number): Promise { + if (this.#engine.rollbackPhase === "replay") { + return; + } + if (timestamp instanceof Date) { timestamp = timestamp.valueOf(); } @@ -1223,6 +1241,12 @@ export class Context extends RpcTarget { timeout?: string | number; } ): Promise> { + if (this.#engine.rollbackPhase === "rollback") { + throw new WorkflowFatalError( + "Cannot execute steps during rollback phase" + ); + } + if (!options.timeout) { options.timeout = "24 hours"; } @@ -1242,6 +1266,10 @@ export class Context extends RpcTarget { const maybeResult = await this.#state.storage.get(waitForEventKey); + if (this.#engine.rollbackPhase === "replay") { + return maybeResult as WorkflowStepEvent; + } + if (maybeResult) { const shouldWriteLog = (await this.#state.storage.get(waitForEventKey)) == undefined; diff --git a/packages/workflows-shared/src/engine.ts b/packages/workflows-shared/src/engine.ts index 5d024c7289..c825beae0c 100644 --- a/packages/workflows-shared/src/engine.ts +++ b/packages/workflows-shared/src/engine.ts @@ -29,7 +29,13 @@ import { storeRestartFromStep, wipeRestartState, } from "./lib/restart"; -import { clearRollbackRegistry, executeRollbacks } from "./lib/rollback"; +import { + clearRollbackRegistry, + disposeRollbackStub, + executeRollbacks, + registerRollbackFn, + ROLLBACK_CACHE_KEY_PREFIX, +} from "./lib/rollback"; import { createReplayReadableStream, getInvalidStoredStreamOutputError, @@ -38,10 +44,16 @@ import { } from "./lib/streams"; import { TimePriorityQueue } from "./lib/timePriorityQueue"; import { MODIFIER_KEYS, WorkflowInstanceModifier } from "./modifier"; -import type { RestartFromStep } from "./binding"; +import type { + RestartFromStep, + WorkflowInstanceTerminateOptions, +} from "./binding"; import type { Event } from "./context"; import type { InstanceMetadata, RawInstanceLog } from "./instance"; -import type { RollbackRegistryEntry } from "./lib/rollback"; +import type { + RollbackRegistration, + RollbackRegistryEntry, +} from "./lib/rollback"; import type { StreamOutputMeta } from "./lib/streams"; import type { WorkflowEntrypoint, @@ -108,6 +120,8 @@ export const DEFAULT_STEP_LIMIT = 10_000; const PAUSE_DATETIME = "PAUSE_DATETIME"; +export type RollbackPhase = "replay" | "rollback"; + /** * JSON.stringify replacer that converts TypedArrays and ArrayBuffers to a * human-readable description. Without this, JSON.stringify(Uint8Array) encodes @@ -145,6 +159,8 @@ export class Engine extends DurableObject { stepLimit: number; engineAbortController: AbortController = new AbortController(); pauseController: AbortController = new AbortController(); + rollbackPhase: RollbackPhase | undefined = undefined; + rollbackEligibleCacheKeys: Set | undefined = undefined; waiters: Map< string, @@ -237,6 +253,90 @@ export class Engine extends DurableObject { return rows.map(({ groupKey }) => groupKey); } + private getEligibleRollbackSteps(limit?: number): string[] { + const rollbackTerminalGroups = new Set(); + const rollbackEligibleGroups = new Set(); + const stepStartGroupKeysDesc: string[] = []; + const rows = [ + ...this.ctx.storage.sql.exec<{ + event: InstanceEvent; + groupKey: string; + metadata: string; + }>( + "SELECT event, groupKey, metadata FROM states WHERE groupKey IS NOT NULL ORDER BY id DESC" + ), + ]; + + for (const row of rows) { + if (row.event === InstanceEvent.STEP_START) { + stepStartGroupKeysDesc.push(row.groupKey); + continue; + } + + if ( + row.event === InstanceEvent.ROLLBACK_STEP_SUCCESS || + row.event === InstanceEvent.ROLLBACK_STEP_FAILURE + ) { + rollbackTerminalGroups.add( + row.groupKey.startsWith(ROLLBACK_CACHE_KEY_PREFIX) + ? row.groupKey.slice(ROLLBACK_CACHE_KEY_PREFIX.length) + : row.groupKey + ); + continue; + } + + if ( + row.event !== InstanceEvent.STEP_SUCCESS && + row.event !== InstanceEvent.STEP_FAILURE + ) { + continue; + } + + try { + if ( + (JSON.parse(row.metadata) as { hasRollback?: boolean }) + .hasRollback === true + ) { + rollbackEligibleGroups.add(row.groupKey); + } + } catch { + // Ignore malformed metadata in local persisted logs. + } + } + + const eligible: string[] = []; + for (const groupKey of stepStartGroupKeysDesc) { + if ( + rollbackEligibleGroups.has(groupKey) && + !rollbackTerminalGroups.has(groupKey) + ) { + eligible.push(groupKey); + if (limit !== undefined && eligible.length >= limit) { + break; + } + } + } + + return eligible; + } + + registerRollbackFn(registration: RollbackRegistration): void { + if ( + this.rollbackPhase === "replay" && + this.rollbackEligibleCacheKeys !== undefined && + !this.rollbackEligibleCacheKeys.has(registration.cacheKey) + ) { + disposeRollbackStub(registration.fn); + return; + } + + registerRollbackFn(this.rollbackRegistry, registration); + } + + setRollbackPhase(phase: RollbackPhase | undefined): void { + this.rollbackPhase = phase; + } + // Lives here for access to the protected DurableObject `ctx`. createRollbackContext(rollbackStep?: { cacheKey: string }): Context { return new Context(this, this.ctx, rollbackStep); @@ -804,8 +904,9 @@ export class Engine extends DurableObject { async changeInstanceStatus( newStatus: "resume" | "pause" | "terminate" | "restart", - from?: RestartFromStep - ) { + from?: RestartFromStep, + terminateOptions?: WorkflowInstanceTerminateOptions + ): Promise { const metadata = await this.ctx.storage.get(INSTANCE_METADATA); @@ -849,7 +950,7 @@ export class Engine extends DurableObject { "instance.cannot_terminate" ); } - await this.userTriggeredTerminate(); + await this.userTriggeredTerminate(terminateOptions); break; } case "restart": @@ -864,7 +965,36 @@ export class Engine extends DurableObject { } } - async userTriggeredTerminate() { + private async replayRollbackRegistry( + metadata: InstanceMetadata + ): Promise { + if (this.rollbackRegistry.size > 0) { + return; + } + + const eligible = this.getEligibleRollbackSteps(); + if (eligible.length === 0) { + return; + } + + this.rollbackEligibleCacheKeys = new Set(eligible); + const stubStep = this.createRollbackContext(); + this.setRollbackPhase("replay"); + try { + await this.env.USER_WORKFLOW.run( + metadata.event, + stubStep as unknown as WorkflowStep + ); + } catch { + // Match the production engine: replay may stop on normal workflow control + // flow; rollback execution uses whatever handlers replay registered. + } finally { + this.setRollbackPhase(undefined); + this.rollbackEligibleCacheKeys = undefined; + } + } + + async userTriggeredTerminate(options?: WorkflowInstanceTerminateOptions) { const metadata = await this.ctx.storage.get(INSTANCE_METADATA); @@ -875,6 +1005,21 @@ export class Engine extends DurableObject { ); } + if (options?.rollback === true) { + await this.replayRollbackRegistry(metadata); + + const error = new Error("Instance terminated during rollback"); + error.name = "Terminated"; + this.setRollbackPhase("rollback"); + try { + await executeRollbacks(this, error); + } catch (rollbackErr) { + console.error("Rollback execution failed:", rollbackErr); + } finally { + this.setRollbackPhase(undefined); + } + } + this.writeLog(InstanceEvent.WORKFLOW_TERMINATED, null, null, { trigger: { source: InstanceTrigger.API, diff --git a/packages/workflows-shared/tests/engine.test.ts b/packages/workflows-shared/tests/engine.test.ts index 648dab2f14..5f6bf4317d 100644 --- a/packages/workflows-shared/tests/engine.test.ts +++ b/packages/workflows-shared/tests/engine.test.ts @@ -1673,6 +1673,177 @@ describe("Rollback", () => { expect(countOf(logs, InstanceEvent.ROLLBACK_COMPLETE)).toBe(0); }); + it("runs rollback when terminate requests it", async ({ expect }) => { + const instanceId = "RB-TERMINATE"; + const engineStub = await runWorkflow(instanceId, async (_e, step) => { + await doWithRollback( + step, + "setup-resource", + async () => "resource-id", + rollbackOptions() + ); + await step.sleep("wait-forever", "1 hour"); + }); + + await readLogsAfter(engineStub, (currentLogs) => + currentLogs.logs.some( + (log) => + log.event === InstanceEvent.STEP_SUCCESS && + log.target === "setup-resource-1" + ) + ); + + try { + await runInDurableObject(engineStub, async (engine) => { + await engine.changeInstanceStatus("terminate", undefined, { + rollback: true, + }); + }); + } catch (error) { + if ( + !(error instanceof Error) || + !error.message.startsWith("Aborting engine:") + ) { + throw error; + } + } + + const logs = await readLogsAfter( + env.ENGINE.get(env.ENGINE.idFromName(instanceId)), + (currentLogs) => + currentLogs.logs.some( + (log) => log.event === InstanceEvent.WORKFLOW_TERMINATED + ) + ); + expect(targetsOf(logs, InstanceEvent.ROLLBACK_STEP_SUCCESS)).toEqual([ + "setup-resource-1", + ]); + expect(countOf(logs, InstanceEvent.ROLLBACK_COMPLETE)).toBe(1); + }); + + it("replays cached steps before terminate rollback when registry is empty", async ({ + expect, + }) => { + const instanceId = "RB-TERMINATE-REPLAY"; + const engineStub = await runWorkflow(instanceId, async (_e, step) => { + await doWithRollback( + step, + "setup-resource", + async () => "resource-id", + rollbackOptions() + ); + await step.sleep("wait-forever", "1 hour"); + }); + + await readLogsAfter(engineStub, (currentLogs) => + currentLogs.logs.some( + (log) => + log.event === InstanceEvent.STEP_SUCCESS && + log.target === "setup-resource-1" + ) + ); + + await runInDurableObject(engineStub, async (engine) => { + engine.rollbackRegistry.clear(); + }); + + try { + await runInDurableObject(engineStub, async (engine) => { + await engine.changeInstanceStatus("terminate", undefined, { + rollback: true, + }); + }); + } catch (error) { + if ( + !(error instanceof Error) || + !error.message.startsWith("Aborting engine:") + ) { + throw error; + } + } + + const logs = await readLogsAfter( + env.ENGINE.get(env.ENGINE.idFromName(instanceId)), + (currentLogs) => + currentLogs.logs.some( + (log) => log.event === InstanceEvent.WORKFLOW_TERMINATED + ) + ); + expect(targetsOf(logs, InstanceEvent.ROLLBACK_STEP_SUCCESS)).toEqual([ + "setup-resource-1", + ]); + expect(countOf(logs, InstanceEvent.ROLLBACK_COMPLETE)).toBe(1); + }); + + it("runs terminate rollback while paused with an empty registry", async ({ + expect, + }) => { + const instanceId = "RB-TERMINATE-PAUSED"; + const engineId = env.ENGINE.idFromName(instanceId); + const engineStub = await runWorkflow(instanceId, async (_e, step) => { + await doWithRollback( + step, + "setup-resource", + async () => "resource-id", + rollbackOptions() + ); + await step.sleep("wait-forever", "1 hour"); + }); + + await readLogsAfter(engineStub, (currentLogs) => + currentLogs.logs.some( + (log) => + log.event === InstanceEvent.STEP_SUCCESS && + log.target === "setup-resource-1" + ) + ); + + try { + await runInDurableObject(engineStub, async (engine) => { + await engine.changeInstanceStatus("pause"); + }); + } catch (error) { + if (!isAbortError(error)) { + throw error; + } + } + + await vi.waitUntil( + async () => + runInDurableObject( + env.ENGINE.get(engineId), + async (engine) => (await engine.getStatus()) === InstanceStatus.Paused + ), + { timeout: 5000 } + ); + + await runInDurableObject(env.ENGINE.get(engineId), async (engine) => { + engine.rollbackRegistry.clear(); + }); + + try { + await runInDurableObject(env.ENGINE.get(engineId), async (engine) => { + await engine.changeInstanceStatus("terminate", undefined, { + rollback: true, + }); + }); + } catch (error) { + if (!isAbortError(error)) { + throw error; + } + } + + const logs = await readLogsAfter(env.ENGINE.get(engineId), (currentLogs) => + currentLogs.logs.some( + (log) => log.event === InstanceEvent.WORKFLOW_TERMINATED + ) + ); + expect(targetsOf(logs, InstanceEvent.ROLLBACK_STEP_SUCCESS)).toEqual([ + "setup-resource-1", + ]); + expect(countOf(logs, InstanceEvent.ROLLBACK_COMPLETE)).toBe(1); + }); + it("does not run rollback when workflow succeeds", async ({ expect }) => { const stub = await runWorkflowAndAwait("RB-NOOP", async (_e, step) => { await doWithRollback(step, "a", async () => "ok", rollbackOptions()); diff --git a/packages/wrangler/src/__tests__/workflows.test.ts b/packages/wrangler/src/__tests__/workflows.test.ts index fe8c1d26e6..148dbc75c5 100644 --- a/packages/wrangler/src/__tests__/workflows.test.ts +++ b/packages/wrangler/src/__tests__/workflows.test.ts @@ -49,13 +49,17 @@ describe("wrangler workflows", () => { const mockChangeStatusRequest = async ( expect: ExpectStatic, - expectedInstance: string + expectedInstance: string, + expectedBody?: Record ) => { msw.use( http.patch( `*/accounts/:accountId/workflows/some-workflow/instances/:instanceId/status`, - async ({ params }) => { + async ({ params, request }) => { expect(params.instanceId).toEqual(expectedInstance); + if (expectedBody) { + expect(await request.json()).toEqual(expectedBody); + } return HttpResponse.json({ success: true, errors: [], @@ -656,13 +660,31 @@ describe("wrangler workflows", () => { }) => { writeWranglerConfig(); await mockGetInstances(mockInstances); - await mockChangeStatusRequest(expect, "bar"); + await mockChangeStatusRequest(expect, "bar", { + status: "terminate", + }); await runWrangler(`workflows instances terminate some-workflow bar`); expect(std.info).toMatchInlineSnapshot( `"🥷 The instance "bar" from some-workflow was terminated successfully"` ); }); + + it("should pass rollback when requested", async ({ expect }) => { + writeWranglerConfig(); + await mockGetInstances(mockInstances); + await mockChangeStatusRequest(expect, "bar", { + status: "terminate", + rollback: true, + }); + + await runWrangler( + `workflows instances terminate some-workflow bar --rollback` + ); + expect(std.info).toMatchInlineSnapshot( + `"🥷 The instance "bar" from some-workflow was terminated successfully"` + ); + }); }); describe("instances restart", () => { @@ -1733,8 +1755,9 @@ describe("wrangler workflows", () => { async ({ params, request }) => { expect(params.workflowName).toEqual("my-workflow"); expect(params.instanceId).toEqual("instance-123"); - const body = (await request.json()) as Record; - expect(body.action).toEqual("terminate"); + expect(await request.json()).toEqual({ + action: "terminate", + }); return HttpResponse.json({ success: true, errors: [], @@ -1752,6 +1775,37 @@ describe("wrangler workflows", () => { `"🥷 The instance "instance-123" from my-workflow was terminated successfully"` ); }); + + it("should pass rollback in local dev session", async ({ expect }) => { + writeWranglerConfig(); + + msw.use( + http.patch( + `${LOCAL_BASE}/workflows/:workflowName/instances/:instanceId/status`, + async ({ params, request }) => { + expect(params.workflowName).toEqual("my-workflow"); + expect(params.instanceId).toEqual("instance-123"); + expect(await request.json()).toEqual({ + action: "terminate", + rollback: true, + }); + return HttpResponse.json({ + success: true, + errors: [], + messages: [], + result: { success: true }, + }); + } + ) + ); + + await runWrangler( + "workflows instances terminate my-workflow instance-123 --local --rollback" + ); + expect(std.info).toMatchInlineSnapshot( + `"🥷 The instance "instance-123" from my-workflow was terminated successfully"` + ); + }); }); describe("workflows instances restart --local", () => { diff --git a/packages/wrangler/src/workflows/commands/instances/terminate.ts b/packages/wrangler/src/workflows/commands/instances/terminate.ts index 1cce610965..471b76bf14 100644 --- a/packages/wrangler/src/workflows/commands/instances/terminate.ts +++ b/packages/wrangler/src/workflows/commands/instances/terminate.ts @@ -28,6 +28,11 @@ export const workflowsInstancesTerminateCommand = createCommand({ type: "string", demandOption: true, }, + rollback: { + describe: "Run registered rollback handlers before terminating", + type: "boolean", + default: false, + }, }, async handler(args, { config }) { @@ -35,11 +40,26 @@ export const workflowsInstancesTerminateCommand = createCommand({ if (args.local) { id = await getLocalInstanceIdFromArgs(args.port, args); - await updateLocalInstanceStatus(args.port, args.name, id, "terminate"); + await updateLocalInstanceStatus( + args.port, + args.name, + id, + "terminate", + undefined, + args.rollback + ); } else { const accountId = await requireAuth(config); id = await getInstanceIdFromArgs(accountId, args, config); - await updateInstanceStatus(config, accountId, args.name, id, "terminate"); + await updateInstanceStatus( + config, + accountId, + args.name, + id, + "terminate", + undefined, + args.rollback + ); } logger.info( diff --git a/packages/wrangler/src/workflows/local.ts b/packages/wrangler/src/workflows/local.ts index 62f27f96a0..505872aedd 100644 --- a/packages/wrangler/src/workflows/local.ts +++ b/packages/wrangler/src/workflows/local.ts @@ -143,9 +143,14 @@ export async function updateLocalInstanceStatus( workflowName: string, instanceId: string, action: "pause" | "resume" | "restart" | "terminate", - from?: WorkflowInstanceRestartFrom + from?: WorkflowInstanceRestartFrom, + rollback?: boolean ): Promise { - const body = from ? { action, from } : { action }; + const body = { + action, + ...(from ? { from } : {}), + ...(action === "terminate" && rollback === true ? { rollback: true } : {}), + }; await fetchLocalResult<{ success: boolean }>( port, diff --git a/packages/wrangler/src/workflows/utils.ts b/packages/wrangler/src/workflows/utils.ts index 5357ad80e4..da821d5e71 100644 --- a/packages/wrangler/src/workflows/utils.ts +++ b/packages/wrangler/src/workflows/utils.ts @@ -122,9 +122,14 @@ export async function updateInstanceStatus( workflowName: string, instanceId: string, status: "pause" | "resume" | "restart" | "terminate", - from?: WorkflowInstanceRestartFrom + from?: WorkflowInstanceRestartFrom, + rollback?: boolean ): Promise { - const body = from ? { status, from } : { status }; + const body = { + status, + ...(from ? { from } : {}), + ...(status === "terminate" && rollback === true ? { rollback: true } : {}), + }; await fetchResult( config,