diff --git a/common/worker_versioning/worker_versioning.go b/common/worker_versioning/worker_versioning.go index 9be76a58421..6d0f4025019 100644 --- a/common/worker_versioning/worker_versioning.go +++ b/common/worker_versioning/worker_versioning.go @@ -657,10 +657,22 @@ func GetOverridePinnedVersion(override *workflowpb.VersioningOverride) *deployme } return nil } + +func GetOverrideOneTimeTargetVersion(override *workflowpb.VersioningOverride) *deploymentpb.WorkerDeploymentVersion { + return override.GetOneTime().GetTargetDeploymentVersion() +} + +func GetOverrideTargetDeploymentVersion(override *workflowpb.VersioningOverride) *deploymentpb.WorkerDeploymentVersion { + if OverrideIsPinned(override) { + return GetOverridePinnedVersion(override) + } + return GetOverrideOneTimeTargetVersion(override) +} + func ExtractVersioningBehaviorFromOverride(override *workflowpb.VersioningOverride) enumspb.VersioningBehavior { if override.GetAutoUpgrade() { return enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE - } else if override.GetPinned() != nil { + } else if override.GetPinned() != nil || override.GetOneTime() != nil { return enumspb.VERSIONING_BEHAVIOR_PINNED } @@ -737,6 +749,11 @@ func ValidateVersioningOverrideAndGetReactivationEligibility(ctx context.Context return false, 0, serviceerror.NewInvalidArgument("must specify pinned override behavior if override is pinned.") } return validateVersionAndGetReactivationEligibility(ctx, p.GetVersion(), matchingClient, versionCache, tq, tqType, namespaceID) + } else if oneTime := override.GetOneTime(); oneTime != nil { + if oneTime.GetTargetDeploymentVersion() == nil { + return false, 0, serviceerror.NewInvalidArgument("must provide target deployment version if override is one-time.") + } + return validateVersionAndGetReactivationEligibility(ctx, oneTime.GetTargetDeploymentVersion(), matchingClient, versionCache, tq, tqType, namespaceID) } //nolint:staticcheck // SA1019: worker versioning v0.31 diff --git a/common/worker_versioning/worker_versioning_test.go b/common/worker_versioning/worker_versioning_test.go index ff1d4eb8407..48383879f15 100644 --- a/common/worker_versioning/worker_versioning_test.go +++ b/common/worker_versioning/worker_versioning_test.go @@ -1060,6 +1060,61 @@ func TestValidateVersioningOverrideAndGetReactivationEligibility(t *testing.T) { }, expectError: false, }, + { + name: "v0.32: One-time override, with cache hit, returns cached reactivation eligibility", + override: &workflowpb.VersioningOverride{ + Override: &workflowpb.VersioningOverride_OneTime{ + OneTime: &workflowpb.VersioningOverride_OneTimeOverride{ + TargetDeploymentVersion: testVersion, + }, + }, + }, + setupCache: func(c *testVersionMembershipCache) { + c.Put(testNamespaceID, testTaskQueue, enumspb.TASK_QUEUE_TYPE_WORKFLOW, testVersion.DeploymentName, testVersion.BuildId, true, false, 42) + }, + setupMock: func(m *matchingservicemock.MockMatchingServiceClient) { + m.EXPECT().CheckTaskQueueVersionMembership(gomock.Any(), gomock.Any()).Times(0) + }, + expectError: false, + expectedShouldSkipReactivation: false, + expectedRevisionNumber: 42, + }, + { + name: "v0.32: One-time override, with cache miss, RPC returns member and active", + override: &workflowpb.VersioningOverride{ + Override: &workflowpb.VersioningOverride_OneTime{ + OneTime: &workflowpb.VersioningOverride_OneTimeOverride{ + TargetDeploymentVersion: testVersion, + }, + }, + }, + setupCache: func(c *testVersionMembershipCache) {}, + setupMock: func(m *matchingservicemock.MockMatchingServiceClient) { + m.EXPECT().CheckTaskQueueVersionMembership( + gomock.Any(), + gomock.Any(), + ).Return(&matchingservice.CheckTaskQueueVersionMembershipResponse{ + IsMember: true, + ShouldSkipReactivation: true, + RevisionNumber: 7, + }, nil) + }, + expectError: false, + expectedShouldSkipReactivation: true, + expectedRevisionNumber: 7, + }, + { + name: "v0.32: One-time override, without target deployment version, returns error", + override: &workflowpb.VersioningOverride{ + Override: &workflowpb.VersioningOverride_OneTime{ + OneTime: &workflowpb.VersioningOverride_OneTimeOverride{}, + }, + }, + setupCache: func(c *testVersionMembershipCache) {}, + setupMock: func(m *matchingservicemock.MockMatchingServiceClient) {}, + expectError: true, + errorContains: "must provide target deployment version if override is one-time", + }, { name: "v0.32: Pinned override, with cache hit (drained), returns isDrainedOrInactive=true and cached revision", override: &workflowpb.VersioningOverride{ diff --git a/go.mod b/go.mod index 8c3ae7e2fab..f17059a6c6f 100644 --- a/go.mod +++ b/go.mod @@ -64,7 +64,7 @@ require ( go.opentelemetry.io/otel/sdk v1.43.0 go.opentelemetry.io/otel/sdk/metric v1.43.0 go.opentelemetry.io/otel/trace v1.43.0 - go.temporal.io/api v1.62.15-0.20260615235047-378792ab2240 + go.temporal.io/api v1.62.15-0.20260618002053-7c062185c563 go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2 go.temporal.io/sdk v1.41.1 go.uber.org/fx v1.24.0 diff --git a/go.sum b/go.sum index fdb33780b70..9bce20bcf8b 100644 --- a/go.sum +++ b/go.sum @@ -471,8 +471,8 @@ go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.3.0 h1:R go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.3.0/go.mod h1:I89cynRj8y+383o7tEQVg2SVA6SRgDVIouWPUVXjx0U= go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.3.0 h1:CQvJSldHRUN6Z8jsUeYv8J0lXRvygALXIzsmAeCcZE0= go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.3.0/go.mod h1:xSQ+mEfJe/GjK1LXEyVOoSI1N9JV9ZI923X5kup43W4= -go.temporal.io/api v1.62.15-0.20260615235047-378792ab2240 h1:Up/CNfkScGxN1TdrGZ3ez+0k6MIIhuhlbBgdZnrPhm0= -go.temporal.io/api v1.62.15-0.20260615235047-378792ab2240/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q= +go.temporal.io/api v1.62.15-0.20260618002053-7c062185c563 h1:gPketS2mBLHDxz8l2xldQxQFEmm14h+sqmaJ5QQmKi4= +go.temporal.io/api v1.62.15-0.20260618002053-7c062185c563/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q= go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2 h1:1hKeH3GyR6YD6LKMHGCZ76t6h1Sgha0hXVQBxWi3dlQ= go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2/go.mod h1:T8dnzVPeO+gaUTj9eDgm/lT2lZH4+JXNvrGaQGyVi50= go.temporal.io/sdk v1.41.1 h1:yOpvsHyDD1lNuwlGBv/SUodCPhjv9nDeC9lLHW/fJUA= diff --git a/service/history/api/updateworkflowoptions/api.go b/service/history/api/updateworkflowoptions/api.go index 01ee1b25616..1b6d5a38b43 100644 --- a/service/history/api/updateworkflowoptions/api.go +++ b/service/history/api/updateworkflowoptions/api.go @@ -223,6 +223,38 @@ func mergeWorkflowExecutionOptions( mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride() } + if _, ok := updateFields["versioningOverride.pinned"]; ok { + mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride() + } + + if _, ok := updateFields["versioningOverride.pinned.behavior"]; ok { + mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride() + } + + if _, ok := updateFields["versioningOverride.pinned.version"]; ok { + mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride() + } + + if _, ok := updateFields["versioningOverride.autoUpgrade"]; ok { + mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride() + } + + if _, ok := updateFields["versioningOverride.oneTime"]; ok { + mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride() + } + + if _, ok := updateFields["versioningOverride.oneTime.targetDeploymentVersion"]; ok { + mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride() + } + + if _, ok := updateFields["versioningOverride.oneTime.targetDeploymentVersion.deploymentName"]; ok { + mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride() + } + + if _, ok := updateFields["versioningOverride.oneTime.targetDeploymentVersion.buildId"]; ok { + mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride() + } + // ==== Priority if _, ok := updateFields["priority"]; ok { diff --git a/service/history/api/updateworkflowoptions/api_test.go b/service/history/api/updateworkflowoptions/api_test.go index ba177416a5a..ec5ae7803d0 100644 --- a/service/history/api/updateworkflowoptions/api_test.go +++ b/service/history/api/updateworkflowoptions/api_test.go @@ -100,6 +100,18 @@ var ( PinnedVersion: "X.B", }, } + oneTimeOverrideOptions = &workflowpb.WorkflowExecutionOptions{ + VersioningOverride: &workflowpb.VersioningOverride{ + Override: &workflowpb.VersioningOverride_OneTime{ + OneTime: &workflowpb.VersioningOverride_OneTimeOverride{ + TargetDeploymentVersion: &deploymentpb.WorkerDeploymentVersion{ + DeploymentName: "X", + BuildId: "C", + }, + }, + }, + }, + } ) func TestMergeOptions_VersionOverrideMask(t *testing.T) { @@ -161,6 +173,63 @@ func TestMergeOptions_PartialMask(t *testing.T) { } +func TestMergeOptions_VersionOverrideOneofNestedMask(t *testing.T) { + testCases := []struct { + name string + mask *fieldmaskpb.FieldMask + }{ + { + name: "one_time field", + mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.one_time"}}, + }, + { + name: "one_time target version field", + mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.one_time.target_deployment_version"}}, + }, + { + name: "one_time target version deployment name field", + mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.one_time.target_deployment_version.deployment_name"}}, + }, + { + name: "one_time target version build id field", + mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.one_time.target_deployment_version.build_id"}}, + }, + { + name: "pinned oneof field", + mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.pinned"}}, + }, + { + name: "pinned nested version field", + mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.pinned.version"}}, + }, + { + name: "auto_upgrade oneof field", + mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.auto_upgrade"}}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + input := proto.Clone(pinnedOverrideOptionsB).(*workflowpb.WorkflowExecutionOptions) + requested := proto.Clone(oneTimeOverrideOptions).(*workflowpb.WorkflowExecutionOptions) + if tc.name == "auto_upgrade oneof field" { + requested = &workflowpb.WorkflowExecutionOptions{ + VersioningOverride: &workflowpb.VersioningOverride{ + Override: &workflowpb.VersioningOverride_AutoUpgrade{ + AutoUpgrade: true, + }, + }, + } + } + + merged, optionsToReapply, err := mergeWorkflowExecutionOptions(input, requested, tc.mask) + require.NoError(t, err) + require.True(t, proto.Equal(requested, merged)) + require.False(t, optionsToReapply.hasChanges()) + }) + } +} + func TestMergeOptions_EmptyMask(t *testing.T) { emptyUpdateMask := &fieldmaskpb.FieldMask{Paths: []string{}} input := pinnedOverrideOptionsB diff --git a/service/history/api/worker_versioning_util.go b/service/history/api/worker_versioning_util.go index dee031418d5..5ef53939961 100644 --- a/service/history/api/worker_versioning_util.go +++ b/service/history/api/worker_versioning_util.go @@ -21,7 +21,8 @@ type VersionReactivationSignalerFn func( ) error // ReactivateVersionWorkflowIfPinned sends a reactivation signal to the version workflow -// when workflows are pinned to a potentially DRAINED/INACTIVE version. +// when workflow execution options target a potentially DRAINED/INACTIVE version +// (for example, by specifying a Pinned or OneTime override via update options). // This is a fire-and-forget operation - the signal is sent asynchronously and errors are // logged by the signaler implementation. The signaler itself is responsible for per-pod // dedup by revision number; cross-pod duplicates fold at the receiver via a deterministic @@ -47,13 +48,8 @@ func ReactivateVersionWorkflowIfPinned( return } - // Only process if we're pinning to a specific version - if !worker_versioning.OverrideIsPinned(override) { - return - } - - pinnedVersion := worker_versioning.GetOverridePinnedVersion(override) - if pinnedVersion == nil { + targetVersion := worker_versioning.GetOverrideTargetDeploymentVersion(override) + if targetVersion == nil { return } @@ -61,6 +57,6 @@ func ReactivateVersionWorkflowIfPinned( // Errors are logged by the signaler implementation (e.g. via convertAndRecordError). However, // errors are not propagated to the caller as this is a fire-and-forget operation. go func() { - signaler(context.Background(), namespaceEntry, pinnedVersion.GetDeploymentName(), pinnedVersion.GetBuildId(), revisionNumber) //nolint:errcheck + signaler(context.Background(), namespaceEntry, targetVersion.GetDeploymentName(), targetVersion.GetBuildId(), revisionNumber) //nolint:errcheck }() } diff --git a/service/history/interfaces/mutable_state.go b/service/history/interfaces/mutable_state.go index f5f9203b264..a0887a58bd3 100644 --- a/service/history/interfaces/mutable_state.go +++ b/service/history/interfaces/mutable_state.go @@ -376,8 +376,8 @@ type ( // GetEffectiveDeployment returns the effective deployment in the following order: // 1. DeploymentVersionTransition.Deployment: this is returned when the wf is transitioning to a // new deployment - // 2. VersioningOverride.Deployment: this is returned when user has set a PINNED override - // at wf start time, or later via UpdateWorkflowExecutionOptions. + // 2. VersioningOverride target: this is returned when user has set a PINNED override or + // pending one-time move, either at wf start time or later via UpdateWorkflowExecutionOptions. // 3. Deployment: this is returned when there is no transition and no override (the most // common case). Deployment is set based on the worker-sent deployment in the latest WFT // completion. Exception: if Deployment is set but the workflow's effective behavior is @@ -387,8 +387,8 @@ type ( // GetEffectiveVersioningBehavior returns the effective versioning behavior in the following // order: // 1. DeploymentVersionTransition: if there is a transition, then effective behavior is AUTO_UPGRADE. - // 2. VersioningOverride.Behavior: this is returned when user has set a behavior override - // at wf start time, or later via UpdateWorkflowExecutionOptions. + // 2. VersioningOverride behavior: this is returned when user has set a behavior override + // or pending one-time move at wf start time, or later via UpdateWorkflowExecutionOptions. // 3. Behavior: this is returned when there is no override (most common case). Behavior is // set based on the worker-sent deployment in the latest WFT completion. GetEffectiveVersioningBehavior() enumspb.VersioningBehavior diff --git a/service/history/transfer_queue_active_task_executor.go b/service/history/transfer_queue_active_task_executor.go index 9b70923721b..d4f1f80b8d9 100644 --- a/service/history/transfer_queue_active_task_executor.go +++ b/service/history/transfer_queue_active_task_executor.go @@ -984,14 +984,14 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution( } } - // Pinned override is inherited if Task Queue of new run is compatible with the override version. - var inheritedPinnedOverride *workflowpb.VersioningOverride - if o := mutableState.GetExecutionInfo().GetVersioningInfo().GetVersioningOverride(); worker_versioning.OverrideIsPinned(o) { - inheritedPinnedOverride = o + // Pinned and one-time overrides are inherited if Task Queue of new run is compatible with the override version. + var inheritedVersioningOverride *workflowpb.VersioningOverride + if o := mutableState.GetExecutionInfo().GetVersioningInfo().GetVersioningOverride(); worker_versioning.GetOverrideTargetDeploymentVersion(o) != nil { + inheritedVersioningOverride = o newTQ := attributes.GetTaskQueue().GetName() if newTQ != mutableState.GetExecutionInfo().GetTaskQueue() && !newTQInPinnedVersion || - attributes.GetNamespaceId() != mutableState.GetExecutionInfo().GetNamespaceId() { // don't inherit pinned version if child is in a different namespace - inheritedPinnedOverride = nil + attributes.GetNamespaceId() != mutableState.GetExecutionInfo().GetNamespaceId() { // don't inherit override if child is in a different namespace + inheritedVersioningOverride = nil } } @@ -1091,7 +1091,7 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution( inheritedBuildId, initiatedEvent.GetUserMetadata(), shouldTerminateAndStartChild, - inheritedPinnedOverride, + inheritedVersioningOverride, inheritedPinnedVersion, priorities.Merge(mutableState.GetExecutionInfo().Priority, attributes.Priority), inheritedAutoUpgradeInfo, @@ -1665,7 +1665,7 @@ func (t *transferQueueActiveTaskExecutor) startWorkflow( inheritedBuildId string, userMetadata *sdkpb.UserMetadata, shouldTerminateAndStartChild bool, - inheritedPinnedOverride *workflowpb.VersioningOverride, + inheritedVersioningOverride *workflowpb.VersioningOverride, inheritedPinnedVersion *deploymentpb.WorkerDeploymentVersion, priority *commonpb.Priority, inheritedAutoUpgradeInfo *deploymentpb.InheritedAutoUpgradeInfo, @@ -1690,7 +1690,7 @@ func (t *transferQueueActiveTaskExecutor) startWorkflow( Memo: attributes.Memo, SearchAttributes: attributes.SearchAttributes, UserMetadata: userMetadata, - VersioningOverride: inheritedPinnedOverride, + VersioningOverride: inheritedVersioningOverride, Priority: priority, TimeSkippingConfig: attributes.GetTimeSkippingConfig(), } diff --git a/service/history/transfer_queue_active_task_executor_test.go b/service/history/transfer_queue_active_task_executor_test.go index 0a7f0eb824c..a5a7205010e 100644 --- a/service/history/transfer_queue_active_task_executor_test.go +++ b/service/history/transfer_queue_active_task_executor_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/suite" commandpb "go.temporal.io/api/command/v1" commonpb "go.temporal.io/api/common/v1" + deploymentpb "go.temporal.io/api/deployment/v1" enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" sdkpb "go.temporal.io/api/sdk/v1" @@ -2168,6 +2169,113 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Su s.NoError(resp.ExecutionErr) } +func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_InheritsPendingOneTimeOverride() { + execution := &commonpb.WorkflowExecution{ + WorkflowId: "some random workflow ID", + RunId: uuid.NewString(), + } + workflowType := "some random workflow type" + taskQueueName := "some random task queue" + + childWorkflowID := "some random child workflow ID" + childRunID := uuid.NewString() + childWorkflowType := "some random child workflow type" + targetVersion := &deploymentpb.WorkerDeploymentVersion{ + DeploymentName: "my_app", + BuildId: "build_2", + } + versioningOverride := &workflowpb.VersioningOverride{ + Override: &workflowpb.VersioningOverride_OneTime{ + OneTime: &workflowpb.VersioningOverride_OneTimeOverride{ + TargetDeploymentVersion: targetVersion, + }, + }, + } + + mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetWorkflowId(), execution.GetRunId()) + _, err := mutableState.AddWorkflowExecutionStartedEvent( + execution, + &historyservice.StartWorkflowExecutionRequest{ + Attempt: 1, + NamespaceId: s.namespaceID.String(), + StartRequest: &workflowservice.StartWorkflowExecutionRequest{ + WorkflowId: execution.WorkflowId, + WorkflowType: &commonpb.WorkflowType{Name: workflowType}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName}, + WorkflowExecutionTimeout: durationpb.New(2 * time.Second), + WorkflowTaskTimeout: durationpb.New(1 * time.Second), + }, + }, + ) + s.NoError(err) + + wt := addWorkflowTaskScheduledEvent(mutableState) + event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.NewString()) + wt.StartedEventID = event.GetEventId() + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + + _, err = mutableState.AddWorkflowExecutionOptionsUpdatedEvent( + versioningOverride, false, "", nil, nil, uuid.NewString(), nil, nil, false, nil) + s.NoError(err) + + taskID := s.mustGenerateTaskID() + event, ci := addStartChildWorkflowExecutionInitiatedEvent( + mutableState, + event.GetEventId(), + s.namespace, + s.namespaceID, + childWorkflowID, + childWorkflowType, + taskQueueName, + nil, + 1*time.Second, + 1*time.Second, + 1*time.Second, + enumspb.PARENT_CLOSE_POLICY_TERMINATE, + ) + + transferTask := &tasks.StartChildExecutionTask{ + WorkflowKey: definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ), + Version: s.version, + TaskID: taskID, + InitiatedEventID: event.GetEventId(), + VisibilityTimestamp: time.Now().UTC(), + } + + rootExecutionInfo := &workflowspb.RootExecutionInfo{ + Execution: &commonpb.WorkflowExecution{ + WorkflowId: execution.WorkflowId, + RunId: execution.RunId, + }, + } + + childClock := vclock.NewVectorClock(rand.Int63(), rand.Int31(), rand.Int63()) + persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion()) + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + expectedRequest := s.createChildWorkflowExecutionRequest( + s.namespace, + transferTask, + mutableState, + ci, + rootExecutionInfo, + nil, + ) + expectedRequest.StartRequest.VersioningOverride = versioningOverride + expectedRequest.VersioningOverride = versioningOverride + expectedRequest.InheritedPinnedVersion = targetVersion + s.mockHistoryClient.EXPECT().StartWorkflowExecution(gomock.Any(), protomock.Eq(expectedRequest)). + Return(&historyservice.StartWorkflowExecutionResponse{RunId: childRunID, Clock: childClock}, nil) + s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil) + s.mockHistoryClient.EXPECT().ScheduleWorkflowTask(gomock.Any(), gomock.Any()).Return(&historyservice.ScheduleWorkflowTaskResponse{}, nil) + + resp := s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) + s.NoError(resp.ExecutionErr) +} + // 1. Creates a parent workflow, initiates a child workflow, then pauses the parent // 2. Executes the StartChildExecutionTask // 3. Asserts: diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 997b33bb393..c3477459dcd 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -9629,8 +9629,8 @@ func (ms *MutableStateImpl) initVersionedTransitionInDB() { // GetEffectiveDeployment returns the effective deployment in the following order: // 1. DeploymentVersionTransition.Deployment: this is returned when the wf is transitioning to a // new deployment -// 2. VersioningOverride.Deployment: this is returned when user has set a PINNED override -// at wf start time, or later via UpdateWorkflowExecutionOptions. +// 2. VersioningOverride target: this is returned when user has set a PINNED override or +// pending one-time move at wf start time, or later via UpdateWorkflowExecutionOptions. // 3. Deployment: this is returned when there is no transition and no override (the most // common case). Deployment is set based on the worker-sent deployment in the latest WFT // completion. Exception: if Deployment is set but the workflow's effective behavior is @@ -9643,18 +9643,10 @@ func (ms *MutableStateImpl) GetEffectiveDeployment() *deploymentpb.Deployment { func (ms *MutableStateImpl) GetWorkerDeploymentSA() string { versioningInfo := ms.GetExecutionInfo().GetVersioningInfo() - if override := versioningInfo.GetVersioningOverride(); override != nil && - worker_versioning.OverrideIsPinned(override) { - if v := override.GetPinned().GetVersion(); v != nil { + if override := versioningInfo.GetVersioningOverride(); override != nil { + if v := worker_versioning.GetOverrideTargetDeploymentVersion(override); v != nil { return v.GetDeploymentName() } - //nolint:staticcheck // SA1019: worker versioning v0.31 - if vs := override.GetPinnedVersion(); vs != "" { - v, _ := worker_versioning.WorkerDeploymentVersionFromStringV31(vs) - return v.GetDeploymentName() - } - //nolint:staticcheck // SA1019: worker versioning v0.30 - return override.GetDeployment().GetSeriesName() } if v := versioningInfo.GetDeploymentVersion(); v != nil { return v.GetDeploymentName() @@ -9664,17 +9656,10 @@ func (ms *MutableStateImpl) GetWorkerDeploymentSA() string { func (ms *MutableStateImpl) GetWorkerDeploymentVersionSA() string { versioningInfo := ms.GetExecutionInfo().GetVersioningInfo() - if override := versioningInfo.GetVersioningOverride(); override != nil && - worker_versioning.OverrideIsPinned(override) { - if v := override.GetPinned().GetVersion(); v != nil { + if override := versioningInfo.GetVersioningOverride(); override != nil { + if v := worker_versioning.GetOverrideTargetDeploymentVersion(override); v != nil { return worker_versioning.ExternalWorkerDeploymentVersionToString(v) } - //nolint:staticcheck // SA1019: worker versioning v0.31 - if vs := override.GetPinnedVersion(); vs != "" { - return worker_versioning.ExternalWorkerDeploymentVersionToString(worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(vs)) - } - //nolint:staticcheck // SA1019: worker versioning v0.30 - return worker_versioning.ExternalWorkerDeploymentVersionToString(worker_versioning.ExternalWorkerDeploymentVersionFromDeployment(override.GetDeployment())) } if v := versioningInfo.GetDeploymentVersion(); v != nil { return worker_versioning.ExternalWorkerDeploymentVersionToString(v) @@ -9687,7 +9672,7 @@ func (ms *MutableStateImpl) GetWorkflowVersioningBehaviorSA() enumspb.Versioning if override := ms.executionInfo.GetVersioningInfo().GetVersioningOverride(); override != nil { if override.GetAutoUpgrade() { return enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE - } else if worker_versioning.OverrideIsPinned(override) { + } else if worker_versioning.GetOverrideTargetDeploymentVersion(override) != nil { return enumspb.VERSIONING_BEHAVIOR_PINNED } //nolint:staticcheck // SA1019: worker versioning v0.31 and v0.30 @@ -9717,8 +9702,8 @@ func (ms *MutableStateImpl) GetDeploymentTransition() *workflowpb.DeploymentTran // GetEffectiveVersioningBehavior returns the effective versioning behavior in the following // order: // 1. DeploymentVersionTransition: if there is a transition, then effective behavior is AUTO_UPGRADE. -// 2. VersioningOverride.Behavior: this is returned when user has set a behavior override -// at wf start time, or later via UpdateWorkflowExecutionOptions. +// 2. VersioningOverride behavior: this is returned when user has set a behavior override +// or pending one-time move at wf start time, or later via UpdateWorkflowExecutionOptions. // 3. Behavior: this is returned when there is no override (most common case). Behavior is // set based on the worker-sent deployment in the latest WFT completion. func (ms *MutableStateImpl) GetEffectiveVersioningBehavior() enumspb.VersioningBehavior { diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 103ab1400ca..96b67337449 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -852,6 +852,21 @@ func (s *mutableStateSuite) TestEffectiveDeployment() { versioningInfo.Behavior = enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE s.verifyEffectiveDeployment(deployment2, enumspb.VERSIONING_BEHAVIOR_PINNED) + // worker/base says AUTO_UPGRADE, but a pending one-time override routes + // the next workflow task as PINNED to the one-time target. + if useV32 { + versioningInfo.DeploymentVersion = deploymentVersion1 + versioningInfo.VersioningOverride = &workflowpb.VersioningOverride{ + Override: &workflowpb.VersioningOverride_OneTime{ + OneTime: &workflowpb.VersioningOverride_OneTimeOverride{ + TargetDeploymentVersion: deploymentVersion2, + }, + }, + } + versioningInfo.Behavior = enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE + s.verifyEffectiveDeployment(deployment2, enumspb.VERSIONING_BEHAVIOR_PINNED) + } + // ------- With transition if useV32 { @@ -919,6 +934,45 @@ func (s *mutableStateSuite) verifyEffectiveDeployment( s.Equal(expectedBehavior, s.mutableState.GetEffectiveVersioningBehavior()) } +func oneTimeOverride(deployment *deploymentpb.Deployment) *workflowpb.VersioningOverride { + return &workflowpb.VersioningOverride{ + Override: &workflowpb.VersioningOverride_OneTime{ + OneTime: &workflowpb.VersioningOverride_OneTimeOverride{ + TargetDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromDeployment(deployment), + }, + }, + } +} + +func (s *mutableStateSuite) requireOneTimeOverride(expectedDeployment *deploymentpb.Deployment) { + s.requireOneTimeOverrideInVersioningInfo(s.mutableState.GetExecutionInfo().GetVersioningInfo(), expectedDeployment) +} + +func (s *mutableStateSuite) requireOneTimeOverrideInVersioningInfo( + versioningInfo *workflowpb.WorkflowExecutionVersioningInfo, + expectedDeployment *deploymentpb.Deployment, +) { + s.Require().NotNil(versioningInfo) + override := versioningInfo.GetVersioningOverride() + s.Require().NotNil(override) + oneTime := override.GetOneTime() + s.Require().NotNil(oneTime) + targetVersion := oneTime.GetTargetDeploymentVersion() + s.Require().NotNil(targetVersion) + expectedVersion := worker_versioning.ExternalWorkerDeploymentVersionFromDeployment(expectedDeployment) + s.Equal(expectedVersion.GetDeploymentName(), targetVersion.GetDeploymentName()) + s.Equal(expectedVersion.GetBuildId(), targetVersion.GetBuildId()) +} + +func (s *mutableStateSuite) requireEffectiveDeploymentForState( + ms historyi.MutableState, + expectedDeployment *deploymentpb.Deployment, + expectedBehavior enumspb.VersioningBehavior, +) { + s.True(ms.GetEffectiveDeployment().Equal(expectedDeployment)) + s.Equal(expectedBehavior, ms.GetEffectiveVersioningBehavior()) +} + // Creates a mutable state with first WFT completed on the given deployment and behavior set // to the given behavior, testing expected output after Add, Start, and Complete Workflow Task. func (s *mutableStateSuite) createMutableStateWithVersioningBehavior( @@ -1333,6 +1387,154 @@ func (s *mutableStateSuite) TestOverride_BaseDeploymentUpdatedOnCompletion() { s.verifyOverrides(baseBehavior, enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED, deployment2, nil) } +func (s *mutableStateSuite) TestOneTimeOverrideClearedAfterTargetWorkflowTaskCompletion() { + tq := &taskqueuepb.TaskQueue{Name: "tq"} + s.createMutableStateWithVersioningBehavior(enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE, deployment1, tq) + + _, err := s.mutableState.AddWorkflowExecutionOptionsUpdatedEvent( + oneTimeOverride(deployment2), false, "", nil, nil, uuid.NewString(), nil, nil, false, nil) + s.NoError(err) + s.verifyEffectiveDeployment(deployment2, enumspb.VERSIONING_BEHAVIOR_PINNED) + + wft, err := s.mutableState.AddWorkflowTaskScheduledEvent(true, enumsspb.WORKFLOW_TASK_TYPE_NORMAL) + s.NoError(err) + _, wft, err = s.mutableState.AddWorkflowTaskStartedEvent( + wft.ScheduledEventID, + "", + tq, + "", + nil, + nil, + nil, + false, + nil, + 0, + ) + s.NoError(err) + + _, err = s.mutableState.AddWorkflowTaskCompletedEvent( + wft, + &workflowservice.RespondWorkflowTaskCompletedRequest{ + VersioningBehavior: enumspb.VERSIONING_BEHAVIOR_PINNED, + Deployment: deployment2, //nolint:staticcheck // SA1019: worker versioning v0.30 + }, + workflowTaskCompletionLimits, + ) + s.NoError(err) + s.Nil(s.mutableState.GetExecutionInfo().GetVersioningInfo().GetVersioningOverride()) + s.verifyEffectiveDeployment(deployment2, enumspb.VERSIONING_BEHAVIOR_PINNED) +} + +func (s *mutableStateSuite) TestOneTimeOverrideNotClearedAfterDifferentWorkflowTaskCompletion() { + tq := &taskqueuepb.TaskQueue{Name: "tq"} + s.createMutableStateWithVersioningBehavior(enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE, deployment1, tq) + + wft, err := s.mutableState.AddWorkflowTaskScheduledEvent(true, enumsspb.WORKFLOW_TASK_TYPE_NORMAL) + s.NoError(err) + _, wft, err = s.mutableState.AddWorkflowTaskStartedEvent( + wft.ScheduledEventID, + "", + tq, + "", + nil, + nil, + nil, + false, + nil, + 0, + ) + s.NoError(err) + + // Note: Here, we are replicating the case where the UpdateOptionsEvent is persisted while we have an + // Started WFT + _, err = s.mutableState.AddWorkflowExecutionOptionsUpdatedEvent( + oneTimeOverride(deployment2), false, "", nil, nil, uuid.NewString(), nil, nil, false, nil) + s.NoError(err) + s.verifyEffectiveDeployment(deployment2, enumspb.VERSIONING_BEHAVIOR_PINNED) + + _, err = s.mutableState.AddWorkflowTaskCompletedEvent( + wft, + &workflowservice.RespondWorkflowTaskCompletedRequest{ + VersioningBehavior: enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE, + Deployment: deployment1, //nolint:staticcheck // SA1019: worker versioning v0.30 + }, + workflowTaskCompletionLimits, + ) + s.NoError(err) + s.requireOneTimeOverride(deployment2) + s.verifyEffectiveDeployment(deployment2, enumspb.VERSIONING_BEHAVIOR_PINNED) +} + +func (s *mutableStateSuite) TestOneTimeOverrideSearchAttributesUseTargetVersion() { + tq := &taskqueuepb.TaskQueue{Name: "tq"} + s.createMutableStateWithVersioningBehavior(enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE, deployment1, tq) + + _, err := s.mutableState.AddWorkflowExecutionOptionsUpdatedEvent( + oneTimeOverride(deployment2), false, "", nil, nil, uuid.NewString(), nil, nil, false, nil) + s.NoError(err) + + expectedVersion := worker_versioning.ExternalWorkerDeploymentVersionToString( + worker_versioning.ExternalWorkerDeploymentVersionFromDeployment(deployment2), + ) + s.Equal(deployment2.GetSeriesName(), s.mutableState.GetWorkerDeploymentSA()) + s.Equal(expectedVersion, s.mutableState.GetWorkerDeploymentVersionSA()) + s.Equal(enumspb.VERSIONING_BEHAVIOR_PINNED, s.mutableState.GetWorkflowVersioningBehaviorSA()) +} + +func (s *mutableStateSuite) TestOneTimeOverrideContinueAsNewAfterTargetWorkflowTaskCompletionAllowsAutoUpgrade() { + tq := &taskqueuepb.TaskQueue{Name: "tq"} + s.createMutableStateWithVersioningBehavior(enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE, deployment1, tq) + + _, err := s.mutableState.AddWorkflowExecutionOptionsUpdatedEvent( + oneTimeOverride(deployment2), false, "", nil, nil, uuid.NewString(), nil, nil, false, nil) + s.NoError(err) + + wft, err := s.mutableState.AddWorkflowTaskScheduledEvent(true, enumsspb.WORKFLOW_TASK_TYPE_NORMAL) + s.NoError(err) + _, wft, err = s.mutableState.AddWorkflowTaskStartedEvent( + wft.ScheduledEventID, + "", + tq, + "", + nil, + nil, + nil, + false, + nil, + 0, + ) + s.NoError(err) + + completedEvent, err := s.mutableState.AddWorkflowTaskCompletedEvent( + wft, + &workflowservice.RespondWorkflowTaskCompletedRequest{ + VersioningBehavior: enumspb.VERSIONING_BEHAVIOR_PINNED, + Deployment: deployment2, //nolint:staticcheck // SA1019: worker versioning v0.30 + }, + workflowTaskCompletionLimits, + ) + s.NoError(err) + s.Nil(s.mutableState.GetExecutionInfo().GetVersioningInfo().GetVersioningOverride()) + s.mutableState.SetVersioningRevisionNumber(123) + + s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes() + s.mockEventsCache.EXPECT().GetEvent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(&historypb.HistoryEvent{}, nil).AnyTimes() + _, newRunMutableState, err := s.mutableState.AddContinueAsNewEvent( + context.Background(), + completedEvent.GetEventId(), + "", + &commandpb.ContinueAsNewWorkflowExecutionCommandAttributes{ + WorkflowRunTimeout: s.mutableState.GetExecutionInfo().WorkflowRunTimeout, + InitialVersioningBehavior: enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE, + }, + nil, + ) + s.NoError(err) + s.Nil(newRunMutableState.GetExecutionInfo().GetVersioningInfo().GetVersioningOverride()) + s.requireEffectiveDeploymentForState(newRunMutableState, deployment2, enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE) +} + func (s *mutableStateSuite) TestChecksum() { // set the checksum probabilities to 100% for exercising during test s.mockConfig.MutableStateChecksumGenProbability = func(namespace string) int { return 100 } diff --git a/service/history/workflow/util.go b/service/history/workflow/util.go index d3d731c41d3..fc15dc57992 100644 --- a/service/history/workflow/util.go +++ b/service/history/workflow/util.go @@ -167,8 +167,8 @@ func (mse MutableStateWithEffects) CanAddEvent() bool { // GetEffectiveDeployment returns the effective deployment in the following order: // 1. DeploymentVersionTransition.Deployment: this is returned when the wf is transitioning to a // new deployment -// 2. VersioningOverride.Deployment: this is returned when user has set a PINNED override -// at wf start time, or later via UpdateWorkflowExecutionOptions. +// 2. VersioningOverride target: this is returned when user has set a PINNED override or +// pending one-time move at wf start time, or later via UpdateWorkflowExecutionOptions. // 3. Deployment: this is returned when there is no transition and no override (the most // common case). Deployment is set based on the worker-sent deployment in the latest WFT // completion. Exception: if Deployment is set but the workflow's effective behavior is @@ -189,8 +189,12 @@ func GetEffectiveDeployment(versioningInfo *workflowpb.WorkflowExecutionVersioni } else if transition := versioningInfo.GetDeploymentTransition(); transition != nil { // //nolint:staticcheck // SA1019: worker versioning v0.30 return transition.GetDeployment() } else if override := versioningInfo.GetVersioningOverride(); override != nil && - (override.GetBehavior() == enumspb.VERSIONING_BEHAVIOR_PINNED || //nolint:staticcheck // SA1019: worker versioning v0.31 and v0.30 + (worker_versioning.GetOverrideOneTimeTargetVersion(override) != nil || + override.GetBehavior() == enumspb.VERSIONING_BEHAVIOR_PINNED || //nolint:staticcheck // SA1019: worker versioning v0.31 and v0.30 override.GetPinned() != nil) { + if oneTimeTarget := worker_versioning.GetOverrideOneTimeTargetVersion(override); oneTimeTarget != nil { + return worker_versioning.DeploymentFromExternalDeploymentVersion(oneTimeTarget) + } if pinnedVersion := override.GetPinned().GetVersion(); pinnedVersion != nil { return worker_versioning.DeploymentFromExternalDeploymentVersion(pinnedVersion) } @@ -217,8 +221,8 @@ func GetEffectiveDeployment(versioningInfo *workflowpb.WorkflowExecutionVersioni // GetEffectiveVersioningBehavior returns the effective versioning behavior in the following // order: // 1. DeploymentVersionTransition: if there is a transition, then effective behavior is AUTO_UPGRADE. -// 2. VersioningOverride.Behavior: this is returned when user has set a behavior override -// at wf start time, or later via UpdateWorkflowExecutionOptions. +// 2. VersioningOverride behavior: this is returned when user has set a behavior override +// or pending one-time move at wf start time, or later via UpdateWorkflowExecutionOptions. // 3. Behavior: this is returned when there is no override (most common case). Behavior is // set based on the worker-sent deployment in the latest WFT completion. func GetEffectiveVersioningBehavior(versioningInfo *workflowpb.WorkflowExecutionVersioningInfo) enumspb.VersioningBehavior { @@ -227,7 +231,7 @@ func GetEffectiveVersioningBehavior(versioningInfo *workflowpb.WorkflowExecution } else if t := versioningInfo.GetVersionTransition(); t != nil { return enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE } else if override := versioningInfo.GetVersioningOverride(); override != nil { - if override.GetAutoUpgrade() || override.GetPinned() != nil { // v0.32 override behavior + if override.GetAutoUpgrade() || override.GetPinned() != nil || override.GetOneTime() != nil { // v0.32 override behavior if override.GetAutoUpgrade() { return enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE } diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index 47d74160c76..bcd508c4648 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -1358,6 +1358,9 @@ func (m *workflowTaskStateMachine) afterAddWorkflowTaskCompletedEvent( versioningInfo.Version = worker_versioning.WorkerDeploymentVersionToStringV31(worker_versioning.DeploymentVersionFromDeployment(wftDeployment)) versioningInfo.DeploymentVersion = worker_versioning.ExternalWorkerDeploymentVersionFromDeployment(wftDeployment) } + if clearOneTimeOverrideAfterCompletedWFT(versioningInfo, wftDeployment) { + versioningInfo.VersioningOverride = nil + } // Deployment and behavior after applying the data came from the completed wft. wfDeploymentAfter := m.ms.GetEffectiveDeployment() @@ -1409,6 +1412,21 @@ func (m *workflowTaskStateMachine) afterAddWorkflowTaskCompletedEvent( return nil } +func clearOneTimeOverrideAfterCompletedWFT( + versioningInfo *workflowpb.WorkflowExecutionVersioningInfo, + wftDeployment *deploymentpb.Deployment, +) bool { + if versioningInfo == nil { + return false + } + oneTimeTarget := worker_versioning.GetOverrideOneTimeTargetVersion(versioningInfo.GetVersioningOverride()) + completedVersion := worker_versioning.ExternalWorkerDeploymentVersionFromDeployment(wftDeployment) + return oneTimeTarget != nil && + completedVersion != nil && + oneTimeTarget.GetDeploymentName() == completedVersion.GetDeploymentName() && + oneTimeTarget.GetBuildId() == completedVersion.GetBuildId() +} + func (m *workflowTaskStateMachine) emitWorkflowTaskAttemptStats( attempt int32, ) { diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index 48435d66dda..bf5ea3f79dc 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -2253,6 +2253,600 @@ func (s *Versioning3Suite) makePinnedOverride(tv *testvars.TestVars) *workflowpb }} } +func (s *Versioning3Suite) makeOneTimeOverride(tv *testvars.TestVars) *workflowpb.VersioningOverride { + return &workflowpb.VersioningOverride{ + Override: &workflowpb.VersioningOverride_OneTime{ + OneTime: &workflowpb.VersioningOverride_OneTimeOverride{ + TargetDeploymentVersion: tv.ExternalDeploymentVersion(), + }, + }} +} + +func (s *Versioning3Suite) describeVersioningInfo( + env *testcore.TestEnv, + execution *commonpb.WorkflowExecution, +) *workflowpb.WorkflowExecutionVersioningInfo { + resp, err := env.FrontendClient().DescribeWorkflowExecution( + s.Context(), + &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: env.Namespace().String(), + Execution: execution, + }, + ) + s.NoError(err) + return resp.GetWorkflowExecutionInfo().GetVersioningInfo() +} + +func (s *Versioning3Suite) requireOneTimeOverride( + env *testcore.TestEnv, + execution *commonpb.WorkflowExecution, + tv *testvars.TestVars, +) { + versioningInfo := s.describeVersioningInfo(env, execution) + oneTime := versioningInfo.GetVersioningOverride().GetOneTime() + s.NotNil(oneTime) + s.ProtoEqual(tv.ExternalDeploymentVersion(), oneTime.GetTargetDeploymentVersion()) +} + +func (s *Versioning3Suite) requireNoVersioningOverride( + env *testcore.TestEnv, + execution *commonpb.WorkflowExecution, +) { + versioningInfo := s.describeVersioningInfo(env, execution) + s.Nil(versioningInfo.GetVersioningOverride()) +} + +func (s *Versioning3Suite) updateVersioningOverride( + env *testcore.TestEnv, + execution *commonpb.WorkflowExecution, + override *workflowpb.VersioningOverride, +) { + _, err := env.FrontendClient().UpdateWorkflowExecutionOptions(s.Context(), &workflowservice.UpdateWorkflowExecutionOptionsRequest{ + Namespace: env.Namespace().String(), + WorkflowExecution: execution, + WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{VersioningOverride: override}, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override"}}, + }) + s.NoError(err) +} + +func (s *Versioning3Suite) pollWorkflowTask( + env *testcore.TestEnv, + tv *testvars.TestVars, +) *workflowservice.PollWorkflowTaskQueueResponse { + task, err := env.FrontendClient().PollWorkflowTaskQueue(s.Context(), &workflowservice.PollWorkflowTaskQueueRequest{ + Namespace: env.Namespace().String(), + Identity: tv.WorkerIdentity(), + TaskQueue: tv.TaskQueue(), + DeploymentOptions: tv.WorkerDeploymentOptions(true), + }) + s.NoError(err) + s.NotEmpty(task.GetTaskToken()) + return task +} + +func (s *Versioning3Suite) completeWorkflowTask( + env *testcore.TestEnv, + tv *testvars.TestVars, + task *workflowservice.PollWorkflowTaskQueueResponse, + request *workflowservice.RespondWorkflowTaskCompletedRequest, +) { + request.Namespace = env.Namespace().String() + request.Identity = tv.WorkerIdentity() + request.TaskToken = task.GetTaskToken() + _, err := env.FrontendClient().RespondWorkflowTaskCompleted(s.Context(), request) + s.NoError(err) +} + +func startChildWorkflowCommand(tv *testvars.TestVars) *commandpb.Command { + attributes := &commandpb.StartChildWorkflowExecutionCommandAttributes{ + WorkflowId: tv.WorkflowID(), + WorkflowType: tv.WorkflowType(), + TaskQueue: tv.TaskQueue(), + Input: tv.Any().Payloads(), + } + + return &commandpb.Command{ + CommandType: enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_StartChildWorkflowExecutionCommandAttributes{ + StartChildWorkflowExecutionCommandAttributes: attributes, + }, + } +} + +func (s *Versioning3Suite) pollUntilChildWorkflowTask( + env *testcore.TestEnv, + tv *testvars.TestVars, + childWorkflowID string, + handleChild func(*workflowservice.PollWorkflowTaskQueueResponse) *workflowservice.RespondWorkflowTaskCompletedRequest, +) *commonpb.WorkflowExecution { + // Starting the child can create two WFTs on the same version/task queue: + // one parent follow-up WFT for ChildWorkflowExecutionStarted, and one child + // first WFT. Drain the parent follow-up if it arrives first. + const maxWorkflowTasksAfterChildStart = 2 + var childExecution *commonpb.WorkflowExecution + for i := 0; i < maxWorkflowTasksAfterChildStart && childExecution == nil; i++ { + s.pollWftAndHandle(env, tv, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + if task.GetWorkflowExecution().GetWorkflowId() == childWorkflowID { + childExecution = task.GetWorkflowExecution() + return handleChild(task), nil + } + return respondEmptyWft(tv, false, vbPinned), nil + }) + } + s.NotNil(childExecution) + return childExecution +} + +// TestOneTimeOverride_TargetWorkflowTaskClearsOverride verifies the core +// one-time override lifecycle. The workflow first runs on version 1, then an +// operator sets a one-time override to version 2. The next WFT must route to +// version 2 while the override is pending. Once that WFT completes from version +// 2, the server should clear the override and keep the workflow on the base +// behavior/version reported by that completion. +func (s *Versioning3Suite) TestOneTimeOverride_TargetWorkflowTaskClearsOverride() { + env := s.setupEnv() + tv1 := env.Tv().WithBuildIDNumber(1) + tv2 := tv1.WithBuildIDNumber(2) + + execution, _ := s.drainWorkflowTaskAfterSetCurrent(env, tv1) + s.pollUntilRegistered(env, tv2) + + s.updateVersioningOverride(env, execution, s.makeOneTimeOverride(tv2)) + s.requireOneTimeOverride(env, execution, tv2) + + s.triggerNormalWFT(env, tv1, execution) + s.pollWftAndHandle(env, tv2, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + return respondEmptyWft(tv2, false, vbPinned), nil + }) + + s.requireNoVersioningOverride(env, execution) + s.verifyWorkflowVersioning(env, tv1, vbPinned, tv2.Deployment(), nil, nil) +} + +// TestOneTimeOverride_PendingWorkflowTaskRoutesToTargetAndClears verifies that +// a one-time override applies to a WFT that was scheduled but not yet started. +// The workflow is started while version 1 is current, leaving its first WFT +// pending. An operator then sets a one-time override to version 2 before any +// worker starts that task. The first WFT should be dispatched to version 2, and +// completion from version 2 should consume and clear the override. +func (s *Versioning3Suite) TestOneTimeOverride_PendingWorkflowTaskRoutesToTargetAndClears() { + env := s.setupEnv() + tv1 := env.Tv().WithBuildIDNumber(1) + tv2 := tv1.WithBuildIDNumber(2) + + s.pollUntilRegistered(env, tv1) + s.setCurrentDeployment(env, tv1) + s.pollUntilRegistered(env, tv2) + + runID := s.startWorkflow(env, tv1, nil) + execution := tv1.WithRunID(runID).WorkflowExecution() + + s.updateVersioningOverride(env, execution, s.makeOneTimeOverride(tv2)) + s.requireOneTimeOverride(env, execution, tv2) + + s.pollWftAndHandle(env, tv2, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + return respondEmptyWft(tv2, false, vbPinned), nil + }) + + s.requireNoVersioningOverride(env, execution) + s.verifyWorkflowVersioning(env, tv1, vbPinned, tv2.Deployment(), nil, nil) +} + +// TestOneTimeOverride_TargetWorkflowTaskReportsAutoUpgrade verifies that a +// one-time override only controls routing until the target WFT completes. The +// workflow first runs on version 1, then an operator sets a one-time override to +// version 2. The version 2 worker completes that WFT and reports AutoUpgrade, +// so the override should clear and the workflow's base state should become +// AutoUpgrade on version 2. When version 3 later becomes current, the workflow +// should route to version 3 through normal AutoUpgrade routing. +func (s *Versioning3Suite) TestOneTimeOverride_TargetWorkflowTaskReportsAutoUpgrade() { + env := s.setupEnv() + tv1 := env.Tv().WithBuildIDNumber(1) + tv2 := tv1.WithBuildIDNumber(2) + tv3 := tv1.WithBuildIDNumber(3) + + execution, _ := s.drainWorkflowTaskAfterSetCurrent(env, tv1) + s.pollUntilRegistered(env, tv2) + + s.updateVersioningOverride(env, execution, s.makeOneTimeOverride(tv2)) + s.requireOneTimeOverride(env, execution, tv2) + + s.triggerNormalWFT(env, tv1, execution) + s.pollWftAndHandle(env, tv2, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + return respondEmptyWft(tv2, false, vbUnpinned), nil + }) + + s.requireNoVersioningOverride(env, execution) + s.verifyWorkflowVersioning(env, tv1, vbUnpinned, tv2.Deployment(), nil, nil) + + s.pollUntilRegistered(env, tv3) + s.setCurrentDeployment(env, tv3) + + s.triggerNormalWFT(env, tv1, execution) + s.pollWftAndHandle(env, tv3, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + return respondEmptyWft(tv3, false, vbUnpinned), nil + }) + + s.verifyWorkflowVersioning(env, tv1, vbUnpinned, tv3.Deployment(), nil, nil) +} + +// TestOneTimeOverride_StartedWorkflowTaskOnPreviousVersionDoesNotClear verifies +// the race where an operator sets a one-time override while a WFT from the +// previous version is already started. That old WFT completion should update +// base behavior/version from the old worker, but it must not consume the +// one-time override because it did not complete on the target version. The next +// WFT should still route to the one-time target and clear the override there. +func (s *Versioning3Suite) TestOneTimeOverride_StartedWorkflowTaskOnPreviousVersionDoesNotClear() { + env := s.setupEnv() + tv1 := env.Tv().WithBuildIDNumber(1) + tv2 := tv1.WithBuildIDNumber(2) + + execution, _ := s.drainWorkflowTaskAfterSetCurrent(env, tv1) + s.pollUntilRegistered(env, tv2) + + s.triggerNormalWFT(env, tv1, execution) + startedTask := s.pollWorkflowTask(env, tv1) + + s.updateVersioningOverride(env, execution, s.makeOneTimeOverride(tv2)) + s.requireOneTimeOverride(env, execution, tv2) + + s.completeWorkflowTask(env, tv1, startedTask, respondEmptyWft(tv1, false, vbUnpinned)) + + s.requireOneTimeOverride(env, execution, tv2) + s.verifyWorkflowVersioning(env, tv1, vbUnpinned, tv1.Deployment(), s.makeOneTimeOverride(tv2), nil) + + s.triggerNormalWFT(env, tv1, execution) + s.pollWftAndHandle(env, tv2, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + return respondEmptyWft(tv2, false, vbPinned), nil + }) + + s.requireNoVersioningOverride(env, execution) + s.verifyWorkflowVersioning(env, tv1, vbPinned, tv2.Deployment(), nil, nil) +} + +// TestOneTimeOverride_TargetWorkflowTaskContinueAsNewDoesNotInheritOverride +// verifies the CAN boundary after the one-time move has actually happened. The +// workflow routes to version 2 through the one-time override, and that version 2 +// WFT completes with Continue-As-New. Since the WFT completed on the target +// version, the override should clear before the new run's versioning state is +// computed. The continued run should not inherit a stale one-time override. +func (s *Versioning3Suite) TestOneTimeOverride_TargetWorkflowTaskContinueAsNewDoesNotInheritOverride() { + env := s.setupEnv() + tv1 := env.Tv().WithBuildIDNumber(1) + tv2 := tv1.WithBuildIDNumber(2) + + execution, _ := s.drainWorkflowTaskAfterSetCurrent(env, tv1) + s.pollUntilRegistered(env, tv2) + + s.updateVersioningOverride(env, execution, s.makeOneTimeOverride(tv2)) + s.requireOneTimeOverride(env, execution, tv2) + + s.triggerNormalWFT(env, tv1, execution) + s.pollWftAndHandle(env, tv2, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_ContinueAsNewWorkflowExecutionCommandAttributes{ + ContinueAsNewWorkflowExecutionCommandAttributes: &commandpb.ContinueAsNewWorkflowExecutionCommandAttributes{ + WorkflowType: tv1.WorkflowType(), + TaskQueue: tv1.TaskQueue(), + Input: tv1.Any().Payloads(), + }, + }, + }, + }, + VersioningBehavior: vbPinned, + DeploymentOptions: tv2.WorkerDeploymentOptions(true), + }, nil + }) + + s.pollWftAndHandle(env, tv2, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + s.NotEqual(execution.GetRunId(), task.GetWorkflowExecution().GetRunId()) // The workflow CAN'ed + s.requireNoVersioningOverride(env, task.GetWorkflowExecution()) + return respondEmptyWft(tv2, false, vbPinned), nil + }) + + currentExecution := &commonpb.WorkflowExecution{WorkflowId: execution.GetWorkflowId()} + s.requireNoVersioningOverride(env, currentExecution) + s.verifyWorkflowVersioning(env, tv1, vbPinned, tv2.Deployment(), nil, nil) +} + +// TestOneTimeOverride_ClearedMoveAllowsUpgradeOnContinueAsNewToNewCurrent +// verifies the main one-time override use case. An operator moves a running +// pinned workflow from version 1 to patched version 2. After one successful WFT +// on version 2, the override clears and version 2 becomes the workflow's base +// pinned version. When version 3 later becomes current, an explicit +// upgrade-on-CAN should not be blocked by the old override; the new run should +// start on version 3 through normal AutoUpgrade initial-versioning behavior. +func (s *Versioning3Suite) TestOneTimeOverride_ClearedMoveAllowsUpgradeOnContinueAsNewToNewCurrent() { + env := s.setupEnv() + tv1 := env.Tv().WithBuildIDNumber(1) + tv2 := tv1.WithBuildIDNumber(2) + tv3 := tv1.WithBuildIDNumber(3) + + execution, _ := s.drainWorkflowTaskAfterSetCurrent(env, tv1) + s.pollUntilRegistered(env, tv2) + + s.updateVersioningOverride(env, execution, s.makeOneTimeOverride(tv2)) + s.requireOneTimeOverride(env, execution, tv2) + + s.triggerNormalWFT(env, tv1, execution) + s.pollWftAndHandle(env, tv2, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + return respondEmptyWft(tv2, false, vbPinned), nil + }) + + s.requireNoVersioningOverride(env, execution) + s.verifyWorkflowVersioning(env, tv1, vbPinned, tv2.Deployment(), nil, nil) + + s.pollUntilRegistered(env, tv3) + s.setCurrentDeployment(env, tv3) + + s.triggerNormalWFT(env, tv1, execution) + s.pollWftAndHandle(env, tv2, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_ContinueAsNewWorkflowExecutionCommandAttributes{ + ContinueAsNewWorkflowExecutionCommandAttributes: &commandpb.ContinueAsNewWorkflowExecutionCommandAttributes{ + WorkflowType: tv1.WorkflowType(), + TaskQueue: tv1.TaskQueue(), + Input: tv1.Any().Payloads(), + InitialVersioningBehavior: enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE, + }, + }, + }, + }, + VersioningBehavior: vbPinned, + DeploymentOptions: tv2.WorkerDeploymentOptions(true), + }, nil + }) + + var newRunID string + s.pollWftAndHandle(env, tv3, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + newRunID = task.GetWorkflowExecution().GetRunId() + s.NotEqual(execution.GetRunId(), newRunID) + s.requireNoVersioningOverride(env, task.GetWorkflowExecution()) + return respondCompleteWorkflow(tv3, vbPinned), nil + }) + s.NotEmpty(newRunID) + + currentExecution := &commonpb.WorkflowExecution{WorkflowId: execution.GetWorkflowId()} + s.requireNoVersioningOverride(env, currentExecution) + s.verifyWorkflowVersioning(env, tv3, vbPinned, tv3.Deployment(), nil, nil) +} + +// TestOneTimeOverride_ResetReappliesPendingMove verifies reset/reapply +// semantics for a consumed one-time override. The original run moves from +// version 1 to version 2 and consumes the one-time override. Reset then goes +// back to a point before the override was set. Default reset reapply should +// reapply the WorkflowExecutionOptionsUpdated event that set the one-time +// override, so the reset run gets one pending WFT routed to version 2. +func (s *Versioning3Suite) TestOneTimeOverride_ResetReappliesPendingMove() { + env := s.setupEnv() + tv1 := env.Tv().WithBuildIDNumber(1) + tv2 := tv1.WithBuildIDNumber(2) + + execution, _ := s.drainWorkflowTaskAfterSetCurrent(env, tv1) + s.pollUntilRegistered(env, tv2) + + var resetEventID int64 + for _, event := range env.GetHistory(env.Namespace().String(), execution) { + if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { + resetEventID = event.GetEventId() // just capturing the resetID here so that we come back to this event when we actually do the reset! + break + } + } + s.NotZero(resetEventID) + + s.updateVersioningOverride(env, execution, s.makeOneTimeOverride(tv2)) + s.requireOneTimeOverride(env, execution, tv2) + + s.triggerNormalWFT(env, tv1, execution) + s.pollWftAndHandle(env, tv2, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + return respondEmptyWft(tv2, false, vbPinned), nil + }) + s.requireNoVersioningOverride(env, execution) + s.verifyWorkflowVersioning(env, tv1, vbPinned, tv2.Deployment(), nil, nil) + + resetResp, err := env.FrontendClient().ResetWorkflowExecution(s.Context(), &workflowservice.ResetWorkflowExecutionRequest{ + Namespace: env.Namespace().String(), + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: execution.GetWorkflowId(), + RunId: execution.GetRunId(), + }, + Reason: "reset before one-time override", + WorkflowTaskFinishEventId: resetEventID, + RequestId: uuid.NewString(), + }) + s.NoError(err) + + resetExecution := &commonpb.WorkflowExecution{ + WorkflowId: execution.GetWorkflowId(), + RunId: resetResp.GetRunId(), + } + s.requireOneTimeOverride(env, resetExecution, tv2) + + s.pollWftAndHandle(env, tv2, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + s.Equal(resetResp.GetRunId(), task.GetWorkflowExecution().GetRunId()) + return respondEmptyWft(tv2, false, vbPinned), nil + }) + + s.requireNoVersioningOverride(env, resetExecution) + s.verifyWorkflowVersioning(env, tv1, vbPinned, tv2.Deployment(), nil, nil) +} + +// TestOneTimeOverride_SameNamespaceChildInheritsPendingOverride verifies the +// child-workflow boundary when a one-time override is pending. A parent WFT is +// already started on version 1, then an operator sets a one-time override to +// version 2. That old parent WFT starts a same-namespace child. The parent WFT +// itself must not consume the one-time override because it completed on version +// 1, but the child should inherit the pending override, route its first WFT to +// version 2, and clear the override after that first child WFT completes. +func (s *Versioning3Suite) TestOneTimeOverride_SameNamespaceChildInheritsPendingOverride() { + env := s.setupEnv() + tv1 := env.Tv().WithBuildIDNumber(1) + tv2 := tv1.WithBuildIDNumber(2) + childTV1 := tv1.WithWorkflowIDNumber(2) + childTV2 := childTV1.WithBuildIDNumber(2) + + execution, _ := s.drainWorkflowTaskAfterSetCurrent(env, tv1) + s.pollUntilRegistered(env, tv2) + + s.triggerNormalWFT(env, tv1, execution) + startedTask := s.pollWorkflowTask(env, tv1) + + s.updateVersioningOverride(env, execution, s.makeOneTimeOverride(tv2)) + s.requireOneTimeOverride(env, execution, tv2) + + s.completeWorkflowTask(env, tv1, startedTask, &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{ + startChildWorkflowCommand(childTV1), + }, + VersioningBehavior: vbPinned, + DeploymentOptions: tv1.WorkerDeploymentOptions(true), + }) + s.requireOneTimeOverride(env, execution, tv2) + + childExecution := s.pollUntilChildWorkflowTask(env, tv2, childTV1.WorkflowID(), + func(task *workflowservice.PollWorkflowTaskQueueResponse) *workflowservice.RespondWorkflowTaskCompletedRequest { + s.requireOneTimeOverride(env, task.GetWorkflowExecution(), childTV2) + return respondEmptyWft(tv2, false, vbPinned) + }) + + s.requireNoVersioningOverride(env, childExecution) + s.verifyWorkflowVersioning(env, childTV2, vbPinned, childTV2.Deployment(), nil, nil) +} + +// TestOneTimeOverride_CrossTaskQueueChildInheritsOnlyWhenTargetOwnsTaskQueue +// verifies the child task-queue ownership boundary. If the one-time target +// version is registered on the child's task queue, the child inherits the +// pending one-time override and routes its first WFT to the target. If the +// target version is not registered on the child's task queue, the child does not +// inherit the parent override and starts through the child task queue's own +// current routing. +func (s *Versioning3Suite) TestOneTimeOverride_CrossTaskQueueChildInheritsOnlyWhenTargetOwnsTaskQueue() { + for _, tc := range []struct { + name string + targetPresentInChildTaskQueue bool + }{ + { + name: "target present in child task queue", + targetPresentInChildTaskQueue: true, + }, + { + name: "target not present in child task queue", + targetPresentInChildTaskQueue: false, + }, + } { + s.Run(tc.name, func(s *Versioning3Suite) { + env := s.setupEnv() + tv1 := env.Tv().WithBuildIDNumber(1) + tv2 := tv1.WithBuildIDNumber(2) + childTV1 := tv1.WithWorkflowIDNumber(2).WithTaskQueueNumber(2) + childTV2 := childTV1.WithBuildIDNumber(2) + + execution, _ := s.drainWorkflowTaskAfterSetCurrent(env, tv1) + s.pollUntilRegistered(env, tv2) + s.pollUntilRegistered(env, childTV1) + s.waitForDeploymentDataPropagation(env, childTV1, versionStatusCurrent, false, tqTypeWf) + if tc.targetPresentInChildTaskQueue { + s.pollUntilRegistered(env, childTV2) + s.waitForDeploymentDataPropagation(env, childTV2, versionStatusInactive, false, tqTypeWf) + } + + s.triggerNormalWFT(env, tv1, execution) + startedTask := s.pollWorkflowTask(env, tv1) + + s.updateVersioningOverride(env, execution, s.makeOneTimeOverride(tv2)) + s.requireOneTimeOverride(env, execution, tv2) + + s.completeWorkflowTask(env, tv1, startedTask, &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{ + startChildWorkflowCommand(childTV1), + }, + VersioningBehavior: vbPinned, + DeploymentOptions: tv1.WorkerDeploymentOptions(true), + }) + + if tc.targetPresentInChildTaskQueue { + s.pollWftAndHandle(env, childTV2, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + s.Equal(childTV1.WorkflowID(), task.GetWorkflowExecution().GetWorkflowId()) + s.requireOneTimeOverride(env, task.GetWorkflowExecution(), childTV2) + return respondEmptyWft(childTV2, false, vbPinned), nil + }) + + s.requireNoVersioningOverride(env, childTV1.WorkflowExecution()) + s.verifyWorkflowVersioning(env, childTV2, vbPinned, childTV2.Deployment(), nil, nil) + } else { + s.pollWftAndHandle(env, childTV1, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + s.Equal(childTV1.WorkflowID(), task.GetWorkflowExecution().GetWorkflowId()) + s.requireNoVersioningOverride(env, task.GetWorkflowExecution()) + return respondEmptyWft(childTV1, false, vbPinned), nil + }) + + s.verifyWorkflowVersioning(env, childTV1, vbPinned, childTV1.Deployment(), nil, nil) + } + }) + } +} + +// TestOneTimeOverride_InvalidTargetVersionRejected verifies that the operator +// cannot set a one-time override to a version that is not registered on the +// workflow's task queue. The update should fail before persistence, leaving the +// workflow without a pending override. +func (s *Versioning3Suite) TestOneTimeOverride_InvalidTargetVersionRejected() { + env := s.setupEnv() + tv1 := env.Tv().WithBuildIDNumber(1) + missingTV := tv1.WithBuildIDNumber(2) + + execution, _ := s.drainWorkflowTaskAfterSetCurrent(env, tv1) + + _, err := env.FrontendClient().UpdateWorkflowExecutionOptions(s.Context(), &workflowservice.UpdateWorkflowExecutionOptionsRequest{ + Namespace: env.Namespace().String(), + WorkflowExecution: execution, + WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{VersioningOverride: s.makeOneTimeOverride(missingTV)}, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override"}}, + }) + s.Error(err) + var failedPrecondition *serviceerror.FailedPrecondition + s.ErrorAs(err, &failedPrecondition) + s.Contains(err.Error(), worker_versioning.ErrPinnedVersionNotInTaskQueueSubstring) + s.requireNoVersioningOverride(env, execution) +} + // testPinnedCaN_UpgradeOnCaN tests ContinueAsNew of a Pinned workflow with InitialVersioningBehavior // set to AUTO_UPGRADE using task polling directly (without SDK). This allows testing the feature // before it's exposed in the SDK.