diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index 0b055a1c442..6c710fc3daf 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -5742,8 +5742,7 @@ func (wh *WorkflowHandler) StartBatchOperation( escapedSearchValue := sqlparser.String(sqlparser.NewStrVal([]byte(searchValue))) input.Request.VisibilityQuery = fmt.Sprintf("%s = %s", sadefs.TemporalPauseInfo, escapedSearchValue) case *batchpb.BatchOperationUnpauseActivities_MatchAll: - wildCardUnpause := fmt.Sprintf("%s STARTS_WITH 'property:activityType='", sadefs.TemporalPauseInfo) - input.Request.VisibilityQuery = fmt.Sprintf("(%s) AND (%s)", visibilityQuery, wildCardUnpause) + input.Request.VisibilityQuery = visibilityQuery } case *workflowservice.StartBatchOperationRequest_ResetActivitiesOperation: input.BatchType = enumspb.BATCH_OPERATION_TYPE_RESET_ACTIVITY diff --git a/service/history/api/resetactivity/api.go b/service/history/api/resetactivity/api.go index 6b46ee16fff..2c3550e494d 100644 --- a/service/history/api/resetactivity/api.go +++ b/service/history/api/resetactivity/api.go @@ -45,6 +45,10 @@ func Invoke( activityIDs = append(activityIDs, ai.ActivityId) } } + case *workflowservice.ResetActivityRequest_MatchAll: + for _, ai := range mutableState.GetPendingActivityInfos() { + activityIDs = append(activityIDs, ai.ActivityId) + } } if len(activityIDs) == 0 { diff --git a/service/history/api/unpauseactivity/api.go b/service/history/api/unpauseactivity/api.go index d1c2865b239..f3137ccb481 100644 --- a/service/history/api/unpauseactivity/api.go +++ b/service/history/api/unpauseactivity/api.go @@ -97,6 +97,10 @@ func processUnpauseActivityRequest( activityIDs = append(activityIDs, ai.ActivityId) } } + case *workflowservice.UnpauseActivityRequest_UnpauseAll: + for _, ai := range mutableState.GetPendingActivityInfos() { + activityIDs = append(activityIDs, ai.ActivityId) + } } if len(activityIDs) == 0 { diff --git a/service/history/api/updateactivityoptions/api.go b/service/history/api/updateactivityoptions/api.go index 8d7406206c8..951b330da6f 100644 --- a/service/history/api/updateactivityoptions/api.go +++ b/service/history/api/updateactivityoptions/api.go @@ -354,6 +354,10 @@ func getActivityIDs(updateRequest *workflowservice.UpdateActivityOptionsRequest, activityIDs = append(activityIDs, ai.ActivityId) } } + case *workflowservice.UpdateActivityOptionsRequest_MatchAll: + for _, ai := range ms.GetPendingActivityInfos() { + activityIDs = append(activityIDs, ai.ActivityId) + } } return activityIDs } diff --git a/tests/activity_api_batch_reset_test.go b/tests/activity_api_batch_reset_test.go index 332e945bf5a..ffcb19644a5 100644 --- a/tests/activity_api_batch_reset_test.go +++ b/tests/activity_api_batch_reset_test.go @@ -11,9 +11,12 @@ import ( "github.com/temporalio/sqlparser" batchpb "go.temporal.io/api/batch/v1" commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" sdkclient "go.temporal.io/sdk/client" + sdkworker "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/searchattribute/sadefs" "go.temporal.io/server/common/testing/parallelsuite" @@ -305,6 +308,100 @@ func (s *ActivityAPIBatchResetClientTestSuite) TestActivityBatchReset_Success_Pr s.NoError(err) } +func (s *ActivityAPIBatchResetClientTestSuite) TestActivityBatchReset_RunningWorkflowsResetAttempts() { + env := newBatchResetEnv(s.T()) + ctx := env.Context() + + const workflowCount = 10 + workflowTypeName := testcore.RandomizeStr("activity-batch-reset-running-workflow") + + internalWorkflow := newInternalWorkflow() + internalWorkflow.initialRetryInterval = 100 * time.Millisecond + internalWorkflow.activityRetryPolicy.InitialInterval = internalWorkflow.initialRetryInterval + + env.SdkWorker().RegisterWorkflowWithOptions(internalWorkflow.WorkflowFunc, workflow.RegisterOptions{Name: workflowTypeName}) + env.SdkWorker().RegisterActivity(internalWorkflow.ActivityFunc) + + workflowRuns := make([]sdkclient.WorkflowRun, 0, workflowCount) + for range workflowCount { + workflowRun, err := env.SdkClient().ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{ + ID: testcore.RandomizeStr("wf_id-" + s.T().Name()), + TaskQueue: env.WorkerTaskQueue(), + }, workflowTypeName) + s.NoError(err) + s.NotNil(workflowRun) + workflowRuns = append(workflowRuns, workflowRun) + } + + s.Await(func(s *ActivityAPIBatchResetClientTestSuite) { + for _, workflowRun := range workflowRuns { + description, err := env.SdkClient().DescribeWorkflowExecution(s.Context(), workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + s.Len(description.PendingActivities, 1) + s.Greater(description.PendingActivities[0].Attempt, int32(3)) + } + }, 15*time.Second, 100*time.Millisecond) + + env.SdkWorker().Stop() + + query := fmt.Sprintf("WorkflowType='%s' AND ExecutionStatus = 'Running'", workflowTypeName) + s.Await(func(s *ActivityAPIBatchResetClientTestSuite) { + listResp, err := env.FrontendClient().ListWorkflowExecutions(s.Context(), &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: env.Namespace().String(), + PageSize: workflowCount, + Query: query, + }) + s.NoError(err) + s.Len(listResp.GetExecutions(), workflowCount) + }, 5*time.Second, 500*time.Millisecond) + + jobID := uuid.NewString() + _, err := env.SdkClient().WorkflowService().StartBatchOperation(ctx, &workflowservice.StartBatchOperationRequest{ + Namespace: env.Namespace().String(), + Operation: &workflowservice.StartBatchOperationRequest_ResetActivitiesOperation{ + ResetActivitiesOperation: &batchpb.BatchOperationResetActivities{ + ResetAttempts: true, + ResetHeartbeat: true, + Activity: &batchpb.BatchOperationResetActivities_MatchAll{MatchAll: true}, + }, + }, + VisibilityQuery: query, + JobId: jobID, + Reason: "test", + }) + s.NoError(err) + + s.Await(func(s *ActivityAPIBatchResetClientTestSuite) { + descResp, err := env.FrontendClient().DescribeBatchOperation(s.Context(), &workflowservice.DescribeBatchOperationRequest{ + Namespace: env.Namespace().String(), + JobId: jobID, + }) + s.NoError(err) + s.Equal(enumspb.BATCH_OPERATION_STATE_COMPLETED, descResp.GetState()) + }, 15*time.Second, 100*time.Millisecond) + + for _, workflowRun := range workflowRuns { + description, err := env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + s.Len(description.PendingActivities, 1) + s.Equal(int32(1), description.PendingActivities[0].Attempt) + } + + internalWorkflow.letActivitySucceed.Store(true) + + replacementWorker := sdkworker.New(env.SdkClient(), env.WorkerTaskQueue(), sdkworker.Options{}) + replacementWorker.RegisterWorkflowWithOptions(internalWorkflow.WorkflowFunc, workflow.RegisterOptions{Name: workflowTypeName}) + replacementWorker.RegisterActivity(internalWorkflow.ActivityFunc) + s.NoError(replacementWorker.Start()) + defer replacementWorker.Stop() + + for _, workflowRun := range workflowRuns { + var out string + err = workflowRun.Get(ctx, &out) + s.NoError(err) + } +} + func (s *ActivityAPIBatchResetClientTestSuite) TestActivityBatchReset_DontResetAttempts() { env := newBatchResetEnv(s.T()) diff --git a/tests/activity_api_batch_unpause_test.go b/tests/activity_api_batch_unpause_test.go index f8261ea06cb..357d1c88b04 100644 --- a/tests/activity_api_batch_unpause_test.go +++ b/tests/activity_api_batch_unpause_test.go @@ -198,6 +198,109 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Succes s.NoError(err) } +func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_MatchAll() { + env := testcore.NewEnv(s.T(), testcore.WithWorkerService("batch operations")) + ctx := env.Context() + + const workflowCount = 10 + workflowTypeName := testcore.RandomizeStr("activity-batch-unpause-match-all-workflow") + + internalWorkflow := newInternalWorkflow() + + env.SdkWorker().RegisterWorkflowWithOptions(internalWorkflow.WorkflowFunc, workflow.RegisterOptions{Name: workflowTypeName}) + env.SdkWorker().RegisterActivity(internalWorkflow.ActivityFunc) + + workflowRuns := make([]sdkclient.WorkflowRun, 0, workflowCount) + for range workflowCount { + workflowRun, err := env.SdkClient().ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{ + ID: testcore.RandomizeStr("wf_id-" + s.T().Name()), + TaskQueue: env.WorkerTaskQueue(), + }, workflowTypeName) + s.NoError(err) + s.NotNil(workflowRun) + workflowRuns = append(workflowRuns, workflowRun) + } + + s.Await(func(s *ActivityApiBatchUnpauseClientTestSuite) { + for _, workflowRun := range workflowRuns { + description, err := env.SdkClient().DescribeWorkflowExecution(s.Context(), workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + s.Len(description.GetPendingActivities(), 1) + s.Positive(internalWorkflow.startedActivityCount.Load()) + } + }, 5*time.Second, 100*time.Millisecond) + + for _, workflowRun := range workflowRuns { + resp, err := env.FrontendClient().PauseActivity(ctx, &workflowservice.PauseActivityRequest{ + Namespace: env.Namespace().String(), + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowRun.GetID(), + }, + Activity: &workflowservice.PauseActivityRequest_Id{Id: "activity-id"}, + }) + s.NoError(err) + s.NotNil(resp) + } + + s.Await(func(s *ActivityApiBatchUnpauseClientTestSuite) { + for _, workflowRun := range workflowRuns { + description, err := env.SdkClient().DescribeWorkflowExecution(s.Context(), workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + s.Len(description.PendingActivities, 1) + s.True(description.PendingActivities[0].Paused) + } + }, 5*time.Second, 100*time.Millisecond) + + query := fmt.Sprintf("WorkflowType='%s' AND ExecutionStatus = 'Running'", workflowTypeName) + s.Await(func(s *ActivityApiBatchUnpauseClientTestSuite) { + listResp, err := env.FrontendClient().ListWorkflowExecutions(s.Context(), &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: env.Namespace().String(), + PageSize: workflowCount, + Query: query, + }) + s.NoError(err) + s.Len(listResp.GetExecutions(), workflowCount) + }, 5*time.Second, 500*time.Millisecond) + + jobID := uuid.NewString() + _, err := env.SdkClient().WorkflowService().StartBatchOperation(ctx, &workflowservice.StartBatchOperationRequest{ + Namespace: env.Namespace().String(), + Operation: &workflowservice.StartBatchOperationRequest_UnpauseActivitiesOperation{ + UnpauseActivitiesOperation: &batchpb.BatchOperationUnpauseActivities{ + Activity: &batchpb.BatchOperationUnpauseActivities_MatchAll{MatchAll: true}, + }, + }, + VisibilityQuery: query, + JobId: jobID, + Reason: "test", + }) + s.NoError(err) + + s.Await(func(s *ActivityApiBatchUnpauseClientTestSuite) { + descResp, err := env.FrontendClient().DescribeBatchOperation(s.Context(), &workflowservice.DescribeBatchOperationRequest{ + Namespace: env.Namespace().String(), + JobId: jobID, + }) + s.NoError(err) + s.Equal(enumspb.BATCH_OPERATION_STATE_COMPLETED, descResp.GetState()) + }, 15*time.Second, 100*time.Millisecond) + + for _, workflowRun := range workflowRuns { + description, err := env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + s.Len(description.PendingActivities, 1) + s.False(description.PendingActivities[0].Paused) + } + + internalWorkflow.letActivitySucceed.Store(true) + + for _, workflowRun := range workflowRuns { + var out string + err = workflowRun.Get(ctx, &out) + s.NoError(err) + } +} + func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Failed() { env := testcore.NewEnv(s.T(), testcore.WithWorkerService("batch operations")) diff --git a/tests/activity_api_batch_update_options_test.go b/tests/activity_api_batch_update_options_test.go index 5c34ca96de6..1c4913a9a54 100644 --- a/tests/activity_api_batch_update_options_test.go +++ b/tests/activity_api_batch_update_options_test.go @@ -12,9 +12,11 @@ import ( activitypb "go.temporal.io/api/activity/v1" batchpb "go.temporal.io/api/batch/v1" commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" sdkclient "go.temporal.io/sdk/client" + "go.temporal.io/sdk/workflow" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/searchattribute/sadefs" "go.temporal.io/server/common/testing/parallelsuite" @@ -192,6 +194,98 @@ func (s *ActivityAPIBatchUpdateOptionsSuite) TestActivityBatchUpdateOptionsSucce env.NoError(err) } +func (s *ActivityAPIBatchUpdateOptionsSuite) TestActivityBatchUpdateOptionsMatchAll() { + env := testcore.NewEnv(s.T(), + testcore.WithWorkerService("batch operations"), + testcore.WithDynamicConfig(dynamicconfig.FrontendMaxConcurrentBatchOperationPerNamespace, testcore.ClientSuiteLimit), + ) + + ctx := env.Context() + const workflowCount = 10 + workflowTypeName := testcore.RandomizeStr("activity-batch-update-options-match-all-workflow") + updatedScheduleToClose := 10 * time.Second + + internalWorkflow := newInternalWorkflow() + + env.SdkWorker().RegisterWorkflowWithOptions(internalWorkflow.WorkflowFunc, workflow.RegisterOptions{Name: workflowTypeName}) + env.SdkWorker().RegisterActivity(internalWorkflow.ActivityFunc) + + workflowRuns := make([]sdkclient.WorkflowRun, 0, workflowCount) + for range workflowCount { + workflowRun, err := env.SdkClient().ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{ + ID: testcore.RandomizeStr("wf_id-" + s.T().Name()), + TaskQueue: env.WorkerTaskQueue(), + }, workflowTypeName) + s.NoError(err) + s.NotNil(workflowRun) + workflowRuns = append(workflowRuns, workflowRun) + } + + s.Await(func(s *ActivityAPIBatchUpdateOptionsSuite) { + for _, workflowRun := range workflowRuns { + description, err := env.SdkClient().DescribeWorkflowExecution(s.Context(), workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + s.Len(description.GetPendingActivities(), 1) + s.Positive(internalWorkflow.startedActivityCount.Load()) + } + }, 5*time.Second, 100*time.Millisecond) + + query := fmt.Sprintf("WorkflowType='%s' AND ExecutionStatus = 'Running'", workflowTypeName) + s.Await(func(s *ActivityAPIBatchUpdateOptionsSuite) { + listResp, err := env.FrontendClient().ListWorkflowExecutions(s.Context(), &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: env.Namespace().String(), + PageSize: workflowCount, + Query: query, + }) + s.NoError(err) + s.Len(listResp.GetExecutions(), workflowCount) + }, 5*time.Second, 500*time.Millisecond) + + jobID := uuid.NewString() + _, err := env.SdkClient().WorkflowService().StartBatchOperation(ctx, &workflowservice.StartBatchOperationRequest{ + Namespace: env.Namespace().String(), + Operation: &workflowservice.StartBatchOperationRequest_UpdateActivityOptionsOperation{ + UpdateActivityOptionsOperation: &batchpb.BatchOperationUpdateActivityOptions{ + Activity: &batchpb.BatchOperationUpdateActivityOptions_MatchAll{MatchAll: true}, + ActivityOptions: &activitypb.ActivityOptions{ + ScheduleToCloseTimeout: durationpb.New(updatedScheduleToClose), + }, + UpdateMask: &fieldmaskpb.FieldMask{ + Paths: []string{"schedule_to_close_timeout"}, + }, + }, + }, + VisibilityQuery: query, + JobId: jobID, + Reason: "test", + }) + s.NoError(err) + + s.Await(func(s *ActivityAPIBatchUpdateOptionsSuite) { + descResp, err := env.FrontendClient().DescribeBatchOperation(s.Context(), &workflowservice.DescribeBatchOperationRequest{ + Namespace: env.Namespace().String(), + JobId: jobID, + }) + s.NoError(err) + s.Equal(enumspb.BATCH_OPERATION_STATE_COMPLETED, descResp.GetState()) + }, 15*time.Second, 100*time.Millisecond) + + for _, workflowRun := range workflowRuns { + description, err := env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + s.Len(description.PendingActivities, 1) + s.Equal(updatedScheduleToClose, description.PendingActivities[0].ActivityOptions.ScheduleToCloseTimeout.AsDuration()) + } + + internalWorkflow.letActivitySucceed.Store(true) + + for _, workflowRun := range workflowRuns { + var out string + err = workflowRun.Get(ctx, &out) + s.NoError(err) + } +} + func (s *ActivityAPIBatchUpdateOptionsSuite) TestActivityBatchUpdateOptionsFailed() { env := testcore.NewEnv(s.T(), testcore.WithWorkerService("batch operations"),