Skip to content
11 changes: 11 additions & 0 deletions .changeset/workflows-terminate-rollback.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"@cloudflare/workflows-shared": minor
"wrangler": minor
"miniflare": minor
---

Add rollback support when terminating Workflow instances

`WorkflowInstance.terminate({ rollback: true })` now runs registered rollback handlers before marking a local Workflow instance as terminated. Wrangler also supports this via `wrangler workflows instances terminate --rollback`, including local mode.

The rollback option is only sent for terminate operations and is rejected by the Local Explorer API for pause, resume, and restart actions.
5 changes: 5 additions & 0 deletions packages/miniflare/scripts/openapi-filter-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,11 @@ const config = {
},
},
},
rollback: {
type: "boolean",
description:
"The option to trigger rollbacks when terminating the workflow instance.",
},
},
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1747,6 +1747,10 @@ export type WorkflowsChangeInstanceStatusData = {
*/
type?: "do" | "sleep" | "waitForEvent";
};
/**
* The option to trigger rollbacks when terminating the workflow instance.
*/
rollback?: boolean;
};
path: {
workflow_name: WorkflowsWorkflowName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,7 @@ export const zWorkflowsChangeInstanceStatusData = z.object({
type: z.enum(["do", "sleep", "waitForEvent"]).optional(),
})
.optional(),
rollback: z.boolean().optional(),
}),
path: z.object({
workflow_name: zWorkflowsWorkflowName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1810,6 +1810,10 @@
"description": "The step type. Defaults to do."
}
}
},
"rollback": {
"type": "boolean",
"description": "The option to trigger rollbacks when terminating the workflow instance."
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import type {
WorkflowsWorkflow,
} from "../generated";
import type { zWorkflowsListInstancesData } from "../generated/zod.gen";
import type { RestartFromStep } from "@cloudflare/workflows-shared/src/binding";
import type {
RestartFromStep,
WorkflowInstanceTerminateOptions,
} from "@cloudflare/workflows-shared/src/binding";
import type { z } from "zod";

// ============================================================================
Expand All @@ -35,7 +38,7 @@ interface WorkflowHandle {
pause(): Promise<void>;
resume(): Promise<void>;
restart(options?: { from?: RestartFromStep }): Promise<void>;
terminate(): Promise<void>;
terminate(options?: WorkflowInstanceTerminateOptions): Promise<void>;
sendEvent(args: { payload: unknown; type: string }): Promise<void>;
status(): Promise<{ status: string; output?: unknown; error?: unknown }>;
}
Expand Down Expand Up @@ -886,6 +889,14 @@ export async function changeWorkflowInstanceStatus(
);
}

if (body.rollback !== undefined && action !== "terminate") {
return errorResponse(
400,
10001,
"'rollback' is only valid when terminating."
);
}

const handle = await workflow.get(instanceId);

switch (action) {
Expand All @@ -909,7 +920,10 @@ export async function changeWorkflowInstanceStatus(
break;
}
case "terminate":
await handle.terminate();
// TODO(vaish): remove cast once @cloudflare/workers-types ships terminate options
await (handle as unknown as WorkflowHandle).terminate(
body.rollback === true ? { rollback: true } : undefined
);
break;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type {
WorkflowBinding,
WorkflowInstanceRestartOptions,
WorkflowInstanceTerminateOptions,
} from "@cloudflare/workflows-shared/src/binding";
import type { WorkflowIntrospectionOperation } from "@cloudflare/workflows-shared/src/types";

Expand Down Expand Up @@ -104,9 +105,16 @@ class InstanceImpl implements WorkflowInstance {
await instance.resume();
}

public async terminate(): Promise<void> {
public async terminate(
options?: WorkflowInstanceTerminateOptions
): Promise<void> {
using instance = await this.getInstance();
await instance.terminate();
// TODO(vaish): remove cast once @cloudflare/workers-types ships terminate options
await (
instance.terminate as (
options?: WorkflowInstanceTerminateOptions
) => Promise<void>
)(options);
}

public async restart(
Expand Down
13 changes: 11 additions & 2 deletions packages/workflows-shared/src/binding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ export interface RestartFromStep {
type?: "do" | "sleep" | "waitForEvent";
}

export interface WorkflowInstanceTerminateOptions {
/**
* If true, run registered rollback handlers before terminating the instance.
*/
rollback?: boolean;
}

export interface WorkflowInstanceRestartOptions {
from?: RestartFromStep;
}
Expand Down Expand Up @@ -357,9 +364,11 @@ export class WorkflowHandle extends RpcTarget implements WorkflowInstance {
await this.stub.changeInstanceStatus("resume");
}

public async terminate(): Promise<void> {
public async terminate(
options?: WorkflowInstanceTerminateOptions
): Promise<void> {
try {
await this.stub.changeInstanceStatus("terminate");
await this.stub.changeInstanceStatus("terminate", undefined, options);
} catch (e) {
// terminate causes instance abortion
if (!isUserTriggeredTerminate(e)) {
Expand Down
73 changes: 67 additions & 6 deletions packages/workflows-shared/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import {
import { calcRetryDuration } from "./lib/retries";
import {
parseRollbackOptions,
registerRollbackFn,
ROLLBACK_CACHE_KEY_PREFIX,
} from "./lib/rollback";
import {
Expand Down Expand Up @@ -247,7 +246,7 @@ export class Context extends RpcTarget {
const { cacheKey, rollbackFn, stepContext, output, rollbackConfig } =
options;
if (rollbackFn && this.#rollbackStep === undefined) {
registerRollbackFn(this.#engine.rollbackRegistry, {
this.#engine.registerRollbackFn({
cacheKey,
fn: rollbackFn,
stepContext,
Expand Down Expand Up @@ -297,6 +296,12 @@ export class Context extends RpcTarget {
const { rollback: rollbackFn, rollbackConfig } = rollbackOptions ?? {};

const isRollback = this.#rollbackStep !== undefined;
if (this.#engine.rollbackPhase === "rollback" && !isRollback) {
throw new WorkflowFatalError(
"Cannot execute steps during rollback phase"
);
}

const events = isRollback
? {
start: InstanceEvent.ROLLBACK_STEP_START,
Expand Down Expand Up @@ -454,6 +459,17 @@ export class Context extends RpcTarget {
) as Error | undefined;

if (maybeError) {
const cachedState = maybeMap.get(stepStateKey) as StepState | undefined;
this.#registerRollback({
cacheKey,
rollbackFn,
stepContext: {
step: { name, count },
attempt: cachedState?.attemptedCount ?? 1,
config: cachedConfig ?? config,
},
rollbackConfig,
});
maybeError.isUserError = true;
throw maybeError;
}
Expand All @@ -465,6 +481,21 @@ export class Context extends RpcTarget {
config = cachedConfig;
}

if (this.#engine.rollbackPhase === "replay" && !isRollback) {
const cachedState = maybeMap.get(stepStateKey) as StepState | undefined;
this.#registerRollback({
cacheKey,
rollbackFn,
stepContext: {
step: { name, count },
attempt: cachedState?.attemptedCount ?? 1,
config,
},
rollbackConfig,
});
return undefined;
}

const attemptLogs = this.#engine
.readLogsFromStep(cacheKey)
.filter((val) =>
Expand Down Expand Up @@ -538,6 +569,7 @@ export class Context extends RpcTarget {
if (stepState.attemptedCount == 0) {
this.#engine.writeLog(events.start, cacheKey, stepNameWithCounter, {
config,
...(!isRollback && rollbackFn ? { hasRollback: true } : {}),
});
} else {
// in case the engine dies while retrying and wakes up before the retry period
Expand Down Expand Up @@ -623,6 +655,12 @@ export class Context extends RpcTarget {
}
);
stepState.attemptedCount++;
this.#registerRollback({
cacheKey,
rollbackFn,
stepContext: forwardStepContext(),
rollbackConfig,
});
await this.#state.storage.put(stepStateKey, stepState);
const priorityQueueHash = `${cacheKey}-${stepState.attemptedCount}`;

Expand Down Expand Up @@ -937,7 +975,7 @@ export class Context extends RpcTarget {
events.failure,
cacheKey,
stepNameWithCounter,
{}
!isRollback && rollbackFn ? { hasRollback: true } : {}
);
this.#registerRollback({
cacheKey,
Expand Down Expand Up @@ -1039,7 +1077,7 @@ export class Context extends RpcTarget {
events.failure,
cacheKey,
stepNameWithCounter,
{}
!isRollback && rollbackFn ? { hasRollback: true } : {}
);
this.#registerRollback({
cacheKey,
Expand All @@ -1059,6 +1097,7 @@ export class Context extends RpcTarget {
...(lastStreamMeta && {
streamOutput: { cacheKey, meta: lastStreamMeta },
}),
...(!isRollback && rollbackFn ? { hasRollback: true } : {}),
});
this.#registerRollback({
cacheKey,
Expand All @@ -1073,13 +1112,21 @@ export class Context extends RpcTarget {

const result = await doWrapper(closure);

// Check if a pause was requested while this step was running
await this.#checkForPendingPause();
// Check if a pause was requested while this step was running.
// Rollback steps may run while the instance is paused; don't let stale pause
// state abort rollback cleanup after the rollback step itself succeeds.
if (!isRollback) {
await this.#checkForPendingPause();
}

return result;
}

async sleep(name: string, duration: WorkflowSleepDuration): Promise<void> {
if (this.#engine.rollbackPhase === "replay") {
return;
}

if (typeof duration == "string") {
duration = ms(duration);
}
Expand Down Expand Up @@ -1201,6 +1248,10 @@ export class Context extends RpcTarget {
}

async sleepUntil(name: string, timestamp: Date | number): Promise<void> {
if (this.#engine.rollbackPhase === "replay") {
return;
}

if (timestamp instanceof Date) {
timestamp = timestamp.valueOf();
}
Expand All @@ -1223,6 +1274,12 @@ export class Context extends RpcTarget {
timeout?: string | number;
}
): Promise<WorkflowStepEvent<T>> {
if (this.#engine.rollbackPhase === "rollback") {
throw new WorkflowFatalError(
"Cannot execute steps during rollback phase"
);
}

if (!options.timeout) {
options.timeout = "24 hours";
}
Expand All @@ -1242,6 +1299,10 @@ export class Context extends RpcTarget {

const maybeResult = await this.#state.storage.get<Event>(waitForEventKey);

if (this.#engine.rollbackPhase === "replay") {
return maybeResult as WorkflowStepEvent<T>;
}

if (maybeResult) {
const shouldWriteLog =
(await this.#state.storage.get(waitForEventKey)) == undefined;
Expand Down
Loading
Loading