-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Add one-time versioning override #10763
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -657,10 +657,22 @@ func GetOverridePinnedVersion(override *workflowpb.VersioningOverride) *deployme | |
| } | ||
| return nil | ||
| } | ||
|
|
||
| func GetOverrideOneTimeTargetVersion(override *workflowpb.VersioningOverride) *deploymentpb.WorkerDeploymentVersion { | ||
| return override.GetOneTime().GetTargetDeploymentVersion() | ||
| } | ||
|
|
||
| func GetOverrideTargetDeploymentVersion(override *workflowpb.VersioningOverride) *deploymentpb.WorkerDeploymentVersion { | ||
| if OverrideIsPinned(override) { | ||
| return GetOverridePinnedVersion(override) | ||
| } | ||
| return GetOverrideOneTimeTargetVersion(override) | ||
| } | ||
|
|
||
| func ExtractVersioningBehaviorFromOverride(override *workflowpb.VersioningOverride) enumspb.VersioningBehavior { | ||
| if override.GetAutoUpgrade() { | ||
| return enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE | ||
| } else if override.GetPinned() != nil { | ||
| } else if override.GetPinned() != nil || override.GetOneTime() != nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggest a comment here stating ... this option is treated similar to pinned, except the override is automatically removed after the move has been fulfilled. |
||
| return enumspb.VERSIONING_BEHAVIOR_PINNED | ||
| } | ||
|
|
||
|
|
@@ -737,6 +749,11 @@ func ValidateVersioningOverrideAndGetReactivationEligibility(ctx context.Context | |
| return false, 0, serviceerror.NewInvalidArgument("must specify pinned override behavior if override is pinned.") | ||
| } | ||
| return validateVersionAndGetReactivationEligibility(ctx, p.GetVersion(), matchingClient, versionCache, tq, tqType, namespaceID) | ||
| } else if oneTime := override.GetOneTime(); oneTime != nil { | ||
| if oneTime.GetTargetDeploymentVersion() == nil { | ||
| return false, 0, serviceerror.NewInvalidArgument("must provide target deployment version if override is one-time.") | ||
| } | ||
| return validateVersionAndGetReactivationEligibility(ctx, oneTime.GetTargetDeploymentVersion(), matchingClient, versionCache, tq, tqType, namespaceID) | ||
| } | ||
|
|
||
| //nolint:staticcheck // SA1019: worker versioning v0.31 | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -223,6 +223,38 @@ func mergeWorkflowExecutionOptions( | |
| mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride() | ||
| } | ||
|
|
||
| if _, ok := updateFields["versioningOverride.pinned"]; ok { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is something which i just over thought while working on this, or something we missed while we wrote update workflow options? question: why don't we have these mask options for all the paths for api's that were added in the v0.32 phase? cc - @carlydf have added them here, but could be totally off hence the q
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. still working on the rest of my review, but here is one comment I had pending from when I started the review last week! I think we should only allow writes to the full versioningOverride (no paths that go beyond versioningOverride). Before there was a oneof, the relevant fields were behavior and deployment (deprecated v0.30 versioning with Deployment Series). As you can see above, I required if any longer paths were specified, all should be specified. We could follow that pattern and accept longer paths but only if all long paths are sent, or we could reject anything longer than versioningOverride. I think the second option is simpler to reason about, especially with the oneof complexity. Thoughts? |
||
| mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride() | ||
| } | ||
|
|
||
| if _, ok := updateFields["versioningOverride.pinned.behavior"]; ok { | ||
| mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride() | ||
| } | ||
|
|
||
| if _, ok := updateFields["versioningOverride.pinned.version"]; ok { | ||
| mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride() | ||
| } | ||
|
|
||
| if _, ok := updateFields["versioningOverride.autoUpgrade"]; ok { | ||
| mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride() | ||
| } | ||
|
|
||
| if _, ok := updateFields["versioningOverride.oneTime"]; ok { | ||
| mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride() | ||
| } | ||
|
|
||
| if _, ok := updateFields["versioningOverride.oneTime.targetDeploymentVersion"]; ok { | ||
| mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride() | ||
| } | ||
|
|
||
| if _, ok := updateFields["versioningOverride.oneTime.targetDeploymentVersion.deploymentName"]; ok { | ||
| mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride() | ||
| } | ||
|
|
||
| if _, ok := updateFields["versioningOverride.oneTime.targetDeploymentVersion.buildId"]; ok { | ||
| mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride() | ||
| } | ||
|
|
||
| // ==== Priority | ||
|
|
||
| if _, ok := updateFields["priority"]; ok { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -984,14 +984,14 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution( | |
| } | ||
| } | ||
|
|
||
| // Pinned override is inherited if Task Queue of new run is compatible with the override version. | ||
| var inheritedPinnedOverride *workflowpb.VersioningOverride | ||
| if o := mutableState.GetExecutionInfo().GetVersioningInfo().GetVersioningOverride(); worker_versioning.OverrideIsPinned(o) { | ||
| inheritedPinnedOverride = o | ||
| // Pinned and one-time overrides are inherited if Task Queue of new run is compatible with the override version. | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the reason why the child workflow path is receiving this special treatment but none of CAN/retry/cron paths are is because of the following idea:
|
||
| var inheritedVersioningOverride *workflowpb.VersioningOverride | ||
| if o := mutableState.GetExecutionInfo().GetVersioningInfo().GetVersioningOverride(); worker_versioning.GetOverrideTargetDeploymentVersion(o) != nil { | ||
| inheritedVersioningOverride = o | ||
| newTQ := attributes.GetTaskQueue().GetName() | ||
| if newTQ != mutableState.GetExecutionInfo().GetTaskQueue() && !newTQInPinnedVersion || | ||
| attributes.GetNamespaceId() != mutableState.GetExecutionInfo().GetNamespaceId() { // don't inherit pinned version if child is in a different namespace | ||
| inheritedPinnedOverride = nil | ||
| attributes.GetNamespaceId() != mutableState.GetExecutionInfo().GetNamespaceId() { // don't inherit override if child is in a different namespace | ||
| inheritedVersioningOverride = nil | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1091,7 +1091,7 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution( | |
| inheritedBuildId, | ||
| initiatedEvent.GetUserMetadata(), | ||
| shouldTerminateAndStartChild, | ||
| inheritedPinnedOverride, | ||
| inheritedVersioningOverride, | ||
| inheritedPinnedVersion, | ||
| priorities.Merge(mutableState.GetExecutionInfo().Priority, attributes.Priority), | ||
| inheritedAutoUpgradeInfo, | ||
|
|
@@ -1665,7 +1665,7 @@ func (t *transferQueueActiveTaskExecutor) startWorkflow( | |
| inheritedBuildId string, | ||
| userMetadata *sdkpb.UserMetadata, | ||
| shouldTerminateAndStartChild bool, | ||
| inheritedPinnedOverride *workflowpb.VersioningOverride, | ||
| inheritedVersioningOverride *workflowpb.VersioningOverride, | ||
| inheritedPinnedVersion *deploymentpb.WorkerDeploymentVersion, | ||
| priority *commonpb.Priority, | ||
| inheritedAutoUpgradeInfo *deploymentpb.InheritedAutoUpgradeInfo, | ||
|
|
@@ -1690,7 +1690,7 @@ func (t *transferQueueActiveTaskExecutor) startWorkflow( | |
| Memo: attributes.Memo, | ||
| SearchAttributes: attributes.SearchAttributes, | ||
| UserMetadata: userMetadata, | ||
| VersioningOverride: inheritedPinnedOverride, | ||
| VersioningOverride: inheritedVersioningOverride, | ||
| Priority: priority, | ||
| TimeSkippingConfig: attributes.GetTimeSkippingConfig(), | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use switch case here, and return nil for auto upgrade case? This tells for auto upgrade the actual version is chosen at task dispatch time.