Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1551,6 +1551,7 @@ var (
WorkerDeploymentVersionCreatedManagedByController = NewCounterDef("worker_deployment_version_created_managed_by_controller")
WorkerDeploymentVersionVisibilityQueryCount = NewCounterDef("worker_deployment_version_visibility_query_count")
WorkerDeploymentVersioningOverrideCounter = NewCounterDef("worker_deployment_versioning_override_count")
WorkerDeploymentVersioningOneTimeOverrideCounter = NewCounterDef("worker_deployment_versioning_one_time_override_count")

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed with @rkannan82 - i think we should add in a metric here since i am curious to see how many folks end up using this option; could be a good sign for us when we build the new move wrapper that we have in our minds

StartDeploymentTransitionCounter = NewCounterDef("start_deployment_transition_count")
VersioningDataPropagationLatency = NewTimerDef("versioning_data_propagation_latency")
SlowVersioningDataPropagationCounter = NewCounterDef("slow_versioning_data_propagation")
Expand Down
25 changes: 24 additions & 1 deletion common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,10 +657,28 @@ func GetOverridePinnedVersion(override *workflowpb.VersioningOverride) *deployme
}
return nil
}

func GetOverrideOneTimeTargetVersion(override *workflowpb.VersioningOverride) *deploymentpb.WorkerDeploymentVersion {
return override.GetOneTime().GetTargetDeploymentVersion()
}

func GetOverrideTargetDeploymentVersion(override *workflowpb.VersioningOverride) *deploymentpb.WorkerDeploymentVersion {
switch {
case OverrideIsPinned(override):
return GetOverridePinnedVersion(override)
case override.GetOneTime() != nil:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switch on the type directly.

switch override.GetOverride().(type) {
  case *workflowpb.VersioningOverride_Pinned:
      return GetOverridePinnedVersion(override)
  case *workflowpb.VersioningOverride_OneTime:
      return override.GetOneTime().GetTargetDeploymentVersion()
  default:
      return nil
  }

return override.GetOneTime().GetTargetDeploymentVersion()
default:
// Auto-upgrade has no stored target version; the version is chosen by matching at task dispatch time
return nil
}
}
Comment thread
cursor[bot] marked this conversation as resolved.

func ExtractVersioningBehaviorFromOverride(override *workflowpb.VersioningOverride) enumspb.VersioningBehavior {
if override.GetAutoUpgrade() {
return enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE
} else if override.GetPinned() != nil {
} else if override.GetPinned() != nil || override.GetOneTime() != nil {
Comment thread
Shivs11 marked this conversation as resolved.
// A pending one-time override routes like pinned; unlike pinned, it clears after a WFT completes on its target.
return enumspb.VERSIONING_BEHAVIOR_PINNED
}

Expand Down Expand Up @@ -737,6 +755,11 @@ func ValidateVersioningOverrideAndGetReactivationEligibility(ctx context.Context
return false, 0, serviceerror.NewInvalidArgument("must specify pinned override behavior if override is pinned.")
}
return validateVersionAndGetReactivationEligibility(ctx, p.GetVersion(), matchingClient, versionCache, tq, tqType, namespaceID)
} else if oneTime := override.GetOneTime(); oneTime != nil {
if oneTime.GetTargetDeploymentVersion() == nil {
return false, 0, serviceerror.NewInvalidArgument("must provide target deployment version if override is one-time.")
}
return validateVersionAndGetReactivationEligibility(ctx, oneTime.GetTargetDeploymentVersion(), matchingClient, versionCache, tq, tqType, namespaceID)
}

//nolint:staticcheck // SA1019: worker versioning v0.31
Expand Down
55 changes: 55 additions & 0 deletions common/worker_versioning/worker_versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,61 @@ func TestValidateVersioningOverrideAndGetReactivationEligibility(t *testing.T) {
},
expectError: false,
},
{
name: "v0.32: One-time override, with cache hit, returns cached reactivation eligibility",
override: &workflowpb.VersioningOverride{
Override: &workflowpb.VersioningOverride_OneTime{
OneTime: &workflowpb.VersioningOverride_OneTimeOverride{
TargetDeploymentVersion: testVersion,
},
},
},
setupCache: func(c *testVersionMembershipCache) {
c.Put(testNamespaceID, testTaskQueue, enumspb.TASK_QUEUE_TYPE_WORKFLOW, testVersion.DeploymentName, testVersion.BuildId, true, false, 42)
},
setupMock: func(m *matchingservicemock.MockMatchingServiceClient) {
m.EXPECT().CheckTaskQueueVersionMembership(gomock.Any(), gomock.Any()).Times(0)
},
expectError: false,
expectedShouldSkipReactivation: false,
expectedRevisionNumber: 42,
},
{
name: "v0.32: One-time override, with cache miss, RPC returns member and active",
override: &workflowpb.VersioningOverride{
Override: &workflowpb.VersioningOverride_OneTime{
OneTime: &workflowpb.VersioningOverride_OneTimeOverride{
TargetDeploymentVersion: testVersion,
},
},
},
setupCache: func(c *testVersionMembershipCache) {},
setupMock: func(m *matchingservicemock.MockMatchingServiceClient) {
m.EXPECT().CheckTaskQueueVersionMembership(
gomock.Any(),
gomock.Any(),
).Return(&matchingservice.CheckTaskQueueVersionMembershipResponse{
IsMember: true,
ShouldSkipReactivation: true,
RevisionNumber: 7,
}, nil)
},
expectError: false,
expectedShouldSkipReactivation: true,
expectedRevisionNumber: 7,
},
{
name: "v0.32: One-time override, without target deployment version, returns error",
override: &workflowpb.VersioningOverride{
Override: &workflowpb.VersioningOverride_OneTime{
OneTime: &workflowpb.VersioningOverride_OneTimeOverride{},
},
},
setupCache: func(c *testVersionMembershipCache) {},
setupMock: func(m *matchingservicemock.MockMatchingServiceClient) {},
expectError: true,
errorContains: "must provide target deployment version if override is one-time",
},
{
name: "v0.32: Pinned override, with cache hit (drained), returns isDrainedOrInactive=true and cached revision",
override: &workflowpb.VersioningOverride{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ require (
go.opentelemetry.io/otel/sdk v1.43.0
go.opentelemetry.io/otel/sdk/metric v1.43.0
go.opentelemetry.io/otel/trace v1.43.0
go.temporal.io/api v1.62.15-0.20260615235047-378792ab2240
go.temporal.io/api v1.62.15-0.20260625002849-da8f81b6e9cb
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2
go.temporal.io/sdk v1.41.1
go.uber.org/fx v1.24.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,8 @@ go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.3.0 h1:R
go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.3.0/go.mod h1:I89cynRj8y+383o7tEQVg2SVA6SRgDVIouWPUVXjx0U=
go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.3.0 h1:CQvJSldHRUN6Z8jsUeYv8J0lXRvygALXIzsmAeCcZE0=
go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.3.0/go.mod h1:xSQ+mEfJe/GjK1LXEyVOoSI1N9JV9ZI923X5kup43W4=
go.temporal.io/api v1.62.15-0.20260615235047-378792ab2240 h1:Up/CNfkScGxN1TdrGZ3ez+0k6MIIhuhlbBgdZnrPhm0=
go.temporal.io/api v1.62.15-0.20260615235047-378792ab2240/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q=
go.temporal.io/api v1.62.15-0.20260625002849-da8f81b6e9cb h1:55BeRudODtRYSrS+5E45f/u1kIW/NWrBPpsXpy6K1bQ=
go.temporal.io/api v1.62.15-0.20260625002849-da8f81b6e9cb/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q=
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2 h1:1hKeH3GyR6YD6LKMHGCZ76t6h1Sgha0hXVQBxWi3dlQ=
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2/go.mod h1:T8dnzVPeO+gaUTj9eDgm/lT2lZH4+JXNvrGaQGyVi50=
go.temporal.io/sdk v1.41.1 h1:yOpvsHyDD1lNuwlGBv/SUodCPhjv9nDeC9lLHW/fJUA=
Expand Down
11 changes: 11 additions & 0 deletions service/history/api/updateworkflowoptions/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,17 @@ func mergeWorkflowExecutionOptions(
mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride()
}

// Keep the deprecated behavior/deployment pair above for compatibility, but require v0.32
// oneof overrides to be replaced atomically instead of pretending nested masks are partial updates.
for key := range updateFields {
if strings.HasPrefix(key, "versioningOverride.") &&
key != "versioningOverride.deployment" &&
key != "versioningOverride.behavior" {
return nil, OptionsToReapply{}, serviceerror.NewInvalidArgument(
"versioning_override doesn't support partial updates")
}
}

// ==== Priority

if _, ok := updateFields["priority"]; ok {
Expand Down
58 changes: 58 additions & 0 deletions service/history/api/updateworkflowoptions/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@ var (
PinnedVersion: "X.B",
},
}
oneTimeOverrideOptions = &workflowpb.WorkflowExecutionOptions{
VersioningOverride: &workflowpb.VersioningOverride{
Override: &workflowpb.VersioningOverride_OneTime{
OneTime: &workflowpb.VersioningOverride_OneTimeOverride{
TargetDeploymentVersion: &deploymentpb.WorkerDeploymentVersion{
DeploymentName: "X",
BuildId: "C",
},
},
},
},
}
)

func TestMergeOptions_VersionOverrideMask(t *testing.T) {
Expand Down Expand Up @@ -161,6 +173,52 @@ func TestMergeOptions_PartialMask(t *testing.T) {

}

func TestMergeOptions_VersionOverrideNestedMask(t *testing.T) {
testCases := []struct {
name string
mask *fieldmaskpb.FieldMask
}{
{
name: "one_time field",
mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.one_time"}},
},
{
name: "one_time target version field",
mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.one_time.target_deployment_version"}},
},
{
name: "one_time target version deployment name field",
mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.one_time.target_deployment_version.deployment_name"}},
},
{
name: "one_time target version build id field",
mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.one_time.target_deployment_version.build_id"}},
},
{
name: "pinned oneof field",
mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.pinned"}},
},
{
name: "pinned nested version field",
mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.pinned.version"}},
},
{
name: "auto_upgrade oneof field",
mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.auto_upgrade"}},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
input := proto.Clone(pinnedOverrideOptionsB).(*workflowpb.WorkflowExecutionOptions)
requested := proto.Clone(oneTimeOverrideOptions).(*workflowpb.WorkflowExecutionOptions)

_, _, err := mergeWorkflowExecutionOptions(input, requested, tc.mask)
require.Error(t, err)
})
}
}

func TestMergeOptions_EmptyMask(t *testing.T) {
emptyUpdateMask := &fieldmaskpb.FieldMask{Paths: []string{}}
input := pinnedOverrideOptionsB
Expand Down
14 changes: 5 additions & 9 deletions service/history/api/worker_versioning_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ type VersionReactivationSignalerFn func(
) error

// ReactivateVersionWorkflowIfPinned sends a reactivation signal to the version workflow
// when workflows are pinned to a potentially DRAINED/INACTIVE version.
// when workflow execution options target a potentially DRAINED/INACTIVE version
// (for example, by specifying a Pinned or OneTime override via update options).
// This is a fire-and-forget operation - the signal is sent asynchronously and errors are
// logged by the signaler implementation. The signaler itself is responsible for per-pod
// dedup by revision number; cross-pod duplicates fold at the receiver via a deterministic
Expand All @@ -47,20 +48,15 @@ func ReactivateVersionWorkflowIfPinned(
return
}

// Only process if we're pinning to a specific version
if !worker_versioning.OverrideIsPinned(override) {
return
}

pinnedVersion := worker_versioning.GetOverridePinnedVersion(override)
if pinnedVersion == nil {
targetVersion := worker_versioning.GetOverrideTargetDeploymentVersion(override)
if targetVersion == nil {
return
}

// Send the signal asynchronously to avoid adding latency to the caller's request.
// Errors are logged by the signaler implementation (e.g. via convertAndRecordError). However,
// errors are not propagated to the caller as this is a fire-and-forget operation.
go func() {
signaler(context.Background(), namespaceEntry, pinnedVersion.GetDeploymentName(), pinnedVersion.GetBuildId(), revisionNumber) //nolint:errcheck
signaler(context.Background(), namespaceEntry, targetVersion.GetDeploymentName(), targetVersion.GetBuildId(), revisionNumber) //nolint:errcheck
}()
}
8 changes: 4 additions & 4 deletions service/history/interfaces/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,8 @@ type (
// GetEffectiveDeployment returns the effective deployment in the following order:
// 1. DeploymentVersionTransition.Deployment: this is returned when the wf is transitioning to a
// new deployment
// 2. VersioningOverride.Deployment: this is returned when user has set a PINNED override
// at wf start time, or later via UpdateWorkflowExecutionOptions.
// 2. VersioningOverride target: this is returned when user has set a PINNED override or
// pending one-time move, either at wf start time or later via UpdateWorkflowExecutionOptions.
// 3. Deployment: this is returned when there is no transition and no override (the most
// common case). Deployment is set based on the worker-sent deployment in the latest WFT
// completion. Exception: if Deployment is set but the workflow's effective behavior is
Expand All @@ -387,8 +387,8 @@ type (
// GetEffectiveVersioningBehavior returns the effective versioning behavior in the following
// order:
// 1. DeploymentVersionTransition: if there is a transition, then effective behavior is AUTO_UPGRADE.
// 2. VersioningOverride.Behavior: this is returned when user has set a behavior override
// at wf start time, or later via UpdateWorkflowExecutionOptions.
// 2. VersioningOverride behavior: this is returned when user has set a behavior override
// or pending one-time move at wf start time, or later via UpdateWorkflowExecutionOptions.
// 3. Behavior: this is returned when there is no override (most common case). Behavior is
// set based on the worker-sent deployment in the latest WFT completion.
GetEffectiveVersioningBehavior() enumspb.VersioningBehavior
Expand Down
18 changes: 9 additions & 9 deletions service/history/transfer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,14 +984,14 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution(
}
}

// Pinned override is inherited if Task Queue of new run is compatible with the override version.
var inheritedPinnedOverride *workflowpb.VersioningOverride
if o := mutableState.GetExecutionInfo().GetVersioningInfo().GetVersioningOverride(); worker_versioning.OverrideIsPinned(o) {
inheritedPinnedOverride = o
// Pinned and one-time overrides are inherited if Task Queue of new run is compatible with the override version.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason why the child workflow path is receiving this special treatment but none of CAN/retry/cron paths are is because of the following idea:

  • if an operator has a "Started" WFT and were to run a VersioningOverride (One Time Move) operation, and the said WFT returns a command to CAN/retry/cron, we reject those commands since we have a buffered history event thanks to that pending one time move command. Thus, for these three primitives, we shall always only have the case where the task finishes and the pending one time move is cleared, after which we handle commands (i have added versioning tests that are validating this)

  • on the other hand, the child path is a bit different in the sense that we don't reject this start child command even if we do have buffered events.

var inheritedVersioningOverride *workflowpb.VersioningOverride
if o := mutableState.GetExecutionInfo().GetVersioningInfo().GetVersioningOverride(); worker_versioning.GetOverrideTargetDeploymentVersion(o) != nil {
inheritedVersioningOverride = o
newTQ := attributes.GetTaskQueue().GetName()
if newTQ != mutableState.GetExecutionInfo().GetTaskQueue() && !newTQInPinnedVersion ||
attributes.GetNamespaceId() != mutableState.GetExecutionInfo().GetNamespaceId() { // don't inherit pinned version if child is in a different namespace
inheritedPinnedOverride = nil
attributes.GetNamespaceId() != mutableState.GetExecutionInfo().GetNamespaceId() { // don't inherit override if child is in a different namespace
inheritedVersioningOverride = nil
}
}

Expand Down Expand Up @@ -1091,7 +1091,7 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution(
inheritedBuildId,
initiatedEvent.GetUserMetadata(),
shouldTerminateAndStartChild,
inheritedPinnedOverride,
inheritedVersioningOverride,
inheritedPinnedVersion,
priorities.Merge(mutableState.GetExecutionInfo().Priority, attributes.Priority),
inheritedAutoUpgradeInfo,
Expand Down Expand Up @@ -1665,7 +1665,7 @@ func (t *transferQueueActiveTaskExecutor) startWorkflow(
inheritedBuildId string,
userMetadata *sdkpb.UserMetadata,
shouldTerminateAndStartChild bool,
inheritedPinnedOverride *workflowpb.VersioningOverride,
inheritedVersioningOverride *workflowpb.VersioningOverride,
inheritedPinnedVersion *deploymentpb.WorkerDeploymentVersion,
priority *commonpb.Priority,
inheritedAutoUpgradeInfo *deploymentpb.InheritedAutoUpgradeInfo,
Expand All @@ -1690,7 +1690,7 @@ func (t *transferQueueActiveTaskExecutor) startWorkflow(
Memo: attributes.Memo,
SearchAttributes: attributes.SearchAttributes,
UserMetadata: userMetadata,
VersioningOverride: inheritedPinnedOverride,
VersioningOverride: inheritedVersioningOverride,
Priority: priority,
TimeSkippingConfig: attributes.GetTimeSkippingConfig(),
}
Expand Down
Loading
Loading