Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/workflows-terminate-rollback.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"@cloudflare/workflows-shared": minor
"wrangler": minor
"miniflare": minor
---

Add rollback support when terminating Workflow instances

`WorkflowInstance.terminate({ rollback: true })` now runs registered rollback handlers before marking a local Workflow instance as terminated. Wrangler also supports this via `wrangler workflows instances terminate --rollback`, including local mode.

The rollback option is only sent for terminate operations and is rejected by the Local Explorer API for pause, resume, and restart actions.
5 changes: 5 additions & 0 deletions packages/miniflare/scripts/openapi-filter-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,11 @@ const config = {
},
},
},
rollback: {
type: "boolean",
description:
"The option to trigger rollbacks when terminating the workflow instance.",
},
},
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1747,6 +1747,10 @@ export type WorkflowsChangeInstanceStatusData = {
*/
type?: "do" | "sleep" | "waitForEvent";
};
/**
* The option to trigger rollbacks when terminating the workflow instance.
*/
rollback?: boolean;
};
path: {
workflow_name: WorkflowsWorkflowName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,7 @@ export const zWorkflowsChangeInstanceStatusData = z.object({
type: z.enum(["do", "sleep", "waitForEvent"]).optional(),
})
.optional(),
rollback: z.boolean().optional(),
}),
path: z.object({
workflow_name: zWorkflowsWorkflowName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1810,6 +1810,10 @@
"description": "The step type. Defaults to do."
}
}
},
"rollback": {
"type": "boolean",
"description": "The option to trigger rollbacks when terminating the workflow instance."
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import type {
WorkflowsWorkflow,
} from "../generated";
import type { zWorkflowsListInstancesData } from "../generated/zod.gen";
import type { RestartFromStep } from "@cloudflare/workflows-shared/src/binding";
import type {
RestartFromStep,
WorkflowInstanceTerminateOptions,
} from "@cloudflare/workflows-shared/src/binding";
import type { z } from "zod";

// ============================================================================
Expand All @@ -35,7 +38,7 @@ interface WorkflowHandle {
pause(): Promise<void>;
resume(): Promise<void>;
restart(options?: { from?: RestartFromStep }): Promise<void>;
terminate(): Promise<void>;
terminate(options?: WorkflowInstanceTerminateOptions): Promise<void>;
sendEvent(args: { payload: unknown; type: string }): Promise<void>;
status(): Promise<{ status: string; output?: unknown; error?: unknown }>;
}
Expand Down Expand Up @@ -886,6 +889,14 @@ export async function changeWorkflowInstanceStatus(
);
}

if (body.rollback !== undefined && action !== "terminate") {
return errorResponse(
400,
10001,
"'rollback' is only valid when terminating."
);
}

const handle = await workflow.get(instanceId);

switch (action) {
Expand All @@ -909,7 +920,10 @@ export async function changeWorkflowInstanceStatus(
break;
}
case "terminate":
await handle.terminate();
// TODO(vaish): remove cast once @cloudflare/workers-types ships terminate options
await (handle as unknown as WorkflowHandle).terminate(
body.rollback === true ? { rollback: true } : undefined
);
break;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type {
WorkflowBinding,
WorkflowInstanceRestartOptions,
WorkflowInstanceTerminateOptions,
} from "@cloudflare/workflows-shared/src/binding";
import type { WorkflowIntrospectionOperation } from "@cloudflare/workflows-shared/src/types";

Expand Down Expand Up @@ -104,9 +105,16 @@ class InstanceImpl implements WorkflowInstance {
await instance.resume();
}

public async terminate(): Promise<void> {
public async terminate(
options?: WorkflowInstanceTerminateOptions
): Promise<void> {
using instance = await this.getInstance();
await instance.terminate();
// TODO(vaish): remove cast once @cloudflare/workers-types ships terminate options
await (
instance.terminate as (
options?: WorkflowInstanceTerminateOptions
) => Promise<void>
)(options);
}

public async restart(
Expand Down
13 changes: 11 additions & 2 deletions packages/workflows-shared/src/binding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ export interface RestartFromStep {
type?: "do" | "sleep" | "waitForEvent";
}

export interface WorkflowInstanceTerminateOptions {
/**
* If true, run registered rollback handlers before terminating the instance.
*/
rollback?: boolean;
}

export interface WorkflowInstanceRestartOptions {
from?: RestartFromStep;
}
Expand Down Expand Up @@ -357,9 +364,11 @@ export class WorkflowHandle extends RpcTarget implements WorkflowInstance {
await this.stub.changeInstanceStatus("resume");
}

public async terminate(): Promise<void> {
public async terminate(
options?: WorkflowInstanceTerminateOptions
): Promise<void> {
try {
await this.stub.changeInstanceStatus("terminate");
await this.stub.changeInstanceStatus("terminate", undefined, options);
} catch (e) {
// terminate causes instance abortion
if (!isUserTriggeredTerminate(e)) {
Expand Down
40 changes: 34 additions & 6 deletions packages/workflows-shared/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import {
import { calcRetryDuration } from "./lib/retries";
import {
parseRollbackOptions,
registerRollbackFn,
ROLLBACK_CACHE_KEY_PREFIX,
} from "./lib/rollback";
import {
Expand Down Expand Up @@ -247,7 +246,7 @@ export class Context extends RpcTarget {
const { cacheKey, rollbackFn, stepContext, output, rollbackConfig } =
options;
if (rollbackFn && this.#rollbackStep === undefined) {
registerRollbackFn(this.#engine.rollbackRegistry, {
this.#engine.registerRollbackFn({
cacheKey,
fn: rollbackFn,
stepContext,
Expand Down Expand Up @@ -297,6 +296,12 @@ export class Context extends RpcTarget {
const { rollback: rollbackFn, rollbackConfig } = rollbackOptions ?? {};

const isRollback = this.#rollbackStep !== undefined;
if (this.#engine.rollbackPhase === "rollback" && !isRollback) {
throw new WorkflowFatalError(
"Cannot execute steps during rollback phase"
);
}

const events = isRollback
? {
start: InstanceEvent.ROLLBACK_STEP_START,
Expand Down Expand Up @@ -937,7 +942,7 @@ export class Context extends RpcTarget {
events.failure,
cacheKey,
stepNameWithCounter,
{}
!isRollback && rollbackFn ? { hasRollback: true } : {}
);
this.#registerRollback({
cacheKey,
Expand Down Expand Up @@ -1039,7 +1044,7 @@ export class Context extends RpcTarget {
events.failure,
cacheKey,
stepNameWithCounter,
{}
!isRollback && rollbackFn ? { hasRollback: true } : {}
);
this.#registerRollback({
cacheKey,
Expand All @@ -1059,6 +1064,7 @@ export class Context extends RpcTarget {
...(lastStreamMeta && {
streamOutput: { cacheKey, meta: lastStreamMeta },
}),
...(!isRollback && rollbackFn ? { hasRollback: true } : {}),
});
this.#registerRollback({
cacheKey,
Expand All @@ -1073,13 +1079,21 @@ export class Context extends RpcTarget {

const result = await doWrapper(closure);

// Check if a pause was requested while this step was running
await this.#checkForPendingPause();
// Check if a pause was requested while this step was running.
// Rollback steps may run while the instance is paused; don't let stale pause
// state abort rollback cleanup after the rollback step itself succeeds.
if (!isRollback) {
await this.#checkForPendingPause();
}

return result;
}

async sleep(name: string, duration: WorkflowSleepDuration): Promise<void> {
if (this.#engine.rollbackPhase === "replay") {
return;
}

if (typeof duration == "string") {
duration = ms(duration);
}
Expand Down Expand Up @@ -1201,6 +1215,10 @@ export class Context extends RpcTarget {
}

async sleepUntil(name: string, timestamp: Date | number): Promise<void> {
if (this.#engine.rollbackPhase === "replay") {
return;
}

if (timestamp instanceof Date) {
timestamp = timestamp.valueOf();
}
Expand All @@ -1223,6 +1241,12 @@ export class Context extends RpcTarget {
timeout?: string | number;
}
): Promise<WorkflowStepEvent<T>> {
if (this.#engine.rollbackPhase === "rollback") {
throw new WorkflowFatalError(
"Cannot execute steps during rollback phase"
);
}

if (!options.timeout) {
options.timeout = "24 hours";
}
Expand All @@ -1242,6 +1266,10 @@ export class Context extends RpcTarget {

const maybeResult = await this.#state.storage.get<Event>(waitForEventKey);

if (this.#engine.rollbackPhase === "replay") {
return maybeResult as WorkflowStepEvent<T>;
}

if (maybeResult) {
const shouldWriteLog =
(await this.#state.storage.get(waitForEventKey)) == undefined;
Expand Down
Loading
Loading