From f57fc4bd9edd14f2c2b9730f0e20ea2aa95c08b8 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Wed, 17 Jun 2026 17:43:13 -0600 Subject: [PATCH 1/4] fix: support MatchAll for activity unpause,reset,update-options --- service/frontend/workflow_handler.go | 3 +- service/history/api/resetactivity/api.go | 4 + service/history/api/unpauseactivity/api.go | 4 + .../history/api/updateactivityoptions/api.go | 4 + tests/activity_api_batch_reset_test.go | 97 ++++++++++++++++ tests/activity_api_batch_unpause_test.go | 104 ++++++++++++++++++ .../activity_api_batch_update_options_test.go | 94 ++++++++++++++++ 7 files changed, 308 insertions(+), 2 deletions(-) diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index 0b055a1c442..6c710fc3daf 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -5742,8 +5742,7 @@ func (wh *WorkflowHandler) StartBatchOperation( escapedSearchValue := sqlparser.String(sqlparser.NewStrVal([]byte(searchValue))) input.Request.VisibilityQuery = fmt.Sprintf("%s = %s", sadefs.TemporalPauseInfo, escapedSearchValue) case *batchpb.BatchOperationUnpauseActivities_MatchAll: - wildCardUnpause := fmt.Sprintf("%s STARTS_WITH 'property:activityType='", sadefs.TemporalPauseInfo) - input.Request.VisibilityQuery = fmt.Sprintf("(%s) AND (%s)", visibilityQuery, wildCardUnpause) + input.Request.VisibilityQuery = visibilityQuery } case *workflowservice.StartBatchOperationRequest_ResetActivitiesOperation: input.BatchType = enumspb.BATCH_OPERATION_TYPE_RESET_ACTIVITY diff --git a/service/history/api/resetactivity/api.go b/service/history/api/resetactivity/api.go index 6b46ee16fff..2c3550e494d 100644 --- a/service/history/api/resetactivity/api.go +++ b/service/history/api/resetactivity/api.go @@ -45,6 +45,10 @@ func Invoke( activityIDs = append(activityIDs, ai.ActivityId) } } + case *workflowservice.ResetActivityRequest_MatchAll: + for _, ai := range mutableState.GetPendingActivityInfos() { + activityIDs = append(activityIDs, ai.ActivityId) + } } if len(activityIDs) == 0 { diff --git a/service/history/api/unpauseactivity/api.go b/service/history/api/unpauseactivity/api.go index d1c2865b239..f3137ccb481 100644 --- a/service/history/api/unpauseactivity/api.go +++ b/service/history/api/unpauseactivity/api.go @@ -97,6 +97,10 @@ func processUnpauseActivityRequest( activityIDs = append(activityIDs, ai.ActivityId) } } + case *workflowservice.UnpauseActivityRequest_UnpauseAll: + for _, ai := range mutableState.GetPendingActivityInfos() { + activityIDs = append(activityIDs, ai.ActivityId) + } } if len(activityIDs) == 0 { diff --git a/service/history/api/updateactivityoptions/api.go b/service/history/api/updateactivityoptions/api.go index 8d7406206c8..951b330da6f 100644 --- a/service/history/api/updateactivityoptions/api.go +++ b/service/history/api/updateactivityoptions/api.go @@ -354,6 +354,10 @@ func getActivityIDs(updateRequest *workflowservice.UpdateActivityOptionsRequest, activityIDs = append(activityIDs, ai.ActivityId) } } + case *workflowservice.UpdateActivityOptionsRequest_MatchAll: + for _, ai := range ms.GetPendingActivityInfos() { + activityIDs = append(activityIDs, ai.ActivityId) + } } return activityIDs } diff --git a/tests/activity_api_batch_reset_test.go b/tests/activity_api_batch_reset_test.go index 332e945bf5a..7bca9d8c2ab 100644 --- a/tests/activity_api_batch_reset_test.go +++ b/tests/activity_api_batch_reset_test.go @@ -11,9 +11,12 @@ import ( "github.com/temporalio/sqlparser" batchpb "go.temporal.io/api/batch/v1" commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" sdkclient "go.temporal.io/sdk/client" + sdkworker "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/searchattribute/sadefs" "go.temporal.io/server/common/testing/parallelsuite" @@ -305,6 +308,100 @@ func (s *ActivityAPIBatchResetClientTestSuite) TestActivityBatchReset_Success_Pr s.NoError(err) } +func (s *ActivityAPIBatchResetClientTestSuite) TestActivityBatchReset_RunningWorkflowsResetAttempts() { + env := newBatchResetEnv(s.T()) + ctx := env.Context() + + const workflowCount = 10 + workflowTypeName := testcore.RandomizeStr("activity-batch-reset-running-workflow") + + internalWorkflow := newInternalWorkflow() + internalWorkflow.initialRetryInterval = 100 * time.Millisecond + internalWorkflow.activityRetryPolicy.InitialInterval = internalWorkflow.initialRetryInterval + + env.SdkWorker().RegisterWorkflowWithOptions(internalWorkflow.WorkflowFunc, workflow.RegisterOptions{Name: workflowTypeName}) + env.SdkWorker().RegisterActivity(internalWorkflow.ActivityFunc) + + workflowRuns := make([]sdkclient.WorkflowRun, 0, workflowCount) + for i := 0; i < workflowCount; i++ { + workflowRun, err := env.SdkClient().ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{ + ID: testcore.RandomizeStr("wf_id-" + s.T().Name()), + TaskQueue: env.WorkerTaskQueue(), + }, workflowTypeName) + s.NoError(err) + s.NotNil(workflowRun) + workflowRuns = append(workflowRuns, workflowRun) + } + + s.EventuallyWithT(func(t *assert.CollectT) { + for _, workflowRun := range workflowRuns { + description, err := env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + require.NoError(t, err) + require.Len(t, description.PendingActivities, 1) + require.Greater(t, description.PendingActivities[0].Attempt, int32(3)) + } + }, 15*time.Second, 100*time.Millisecond) + + env.SdkWorker().Stop() + + query := fmt.Sprintf("WorkflowType='%s' AND ExecutionStatus = 'Running'", workflowTypeName) + s.EventuallyWithT(func(t *assert.CollectT) { + listResp, err := env.FrontendClient().ListWorkflowExecutions(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: env.Namespace().String(), + PageSize: workflowCount, + Query: query, + }) + require.NoError(t, err) + require.Len(t, listResp.GetExecutions(), workflowCount) + }, 5*time.Second, 500*time.Millisecond) + + jobID := uuid.NewString() + _, err := env.SdkClient().WorkflowService().StartBatchOperation(ctx, &workflowservice.StartBatchOperationRequest{ + Namespace: env.Namespace().String(), + Operation: &workflowservice.StartBatchOperationRequest_ResetActivitiesOperation{ + ResetActivitiesOperation: &batchpb.BatchOperationResetActivities{ + ResetAttempts: true, + ResetHeartbeat: true, + Activity: &batchpb.BatchOperationResetActivities_MatchAll{MatchAll: true}, + }, + }, + VisibilityQuery: query, + JobId: jobID, + Reason: "test", + }) + s.NoError(err) + + s.EventuallyWithT(func(t *assert.CollectT) { + descResp, err := env.FrontendClient().DescribeBatchOperation(ctx, &workflowservice.DescribeBatchOperationRequest{ + Namespace: env.Namespace().String(), + JobId: jobID, + }) + require.NoError(t, err) + require.Equal(t, enumspb.BATCH_OPERATION_STATE_COMPLETED, descResp.GetState()) + }, 15*time.Second, 100*time.Millisecond) + + for _, workflowRun := range workflowRuns { + description, err := env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + s.Len(description.PendingActivities, 1) + s.Equal(int32(1), description.PendingActivities[0].Attempt) + } + + internalWorkflow.letActivitySucceed.Store(true) + + replacementWorker := sdkworker.New(env.SdkClient(), env.WorkerTaskQueue(), sdkworker.Options{}) + replacementWorker.RegisterWorkflowWithOptions(internalWorkflow.WorkflowFunc, workflow.RegisterOptions{Name: workflowTypeName}) + replacementWorker.RegisterActivity(internalWorkflow.ActivityFunc) + s.NoError(replacementWorker.Start()) + defer replacementWorker.Stop() + + for _, workflowRun := range workflowRuns { + var out string + err = workflowRun.Get(ctx, &out) + s.NoError(err) + } +} + func (s *ActivityAPIBatchResetClientTestSuite) TestActivityBatchReset_DontResetAttempts() { env := newBatchResetEnv(s.T()) diff --git a/tests/activity_api_batch_unpause_test.go b/tests/activity_api_batch_unpause_test.go index f8261ea06cb..2888eda082f 100644 --- a/tests/activity_api_batch_unpause_test.go +++ b/tests/activity_api_batch_unpause_test.go @@ -198,6 +198,110 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Succes s.NoError(err) } +func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_MatchAll() { + env := testcore.NewEnv(s.T(), testcore.WithWorkerService("batch operations")) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + const workflowCount = 10 + workflowTypeName := testcore.RandomizeStr("activity-batch-unpause-match-all-workflow") + + internalWorkflow := newInternalWorkflow() + + env.SdkWorker().RegisterWorkflowWithOptions(internalWorkflow.WorkflowFunc, workflow.RegisterOptions{Name: workflowTypeName}) + env.SdkWorker().RegisterActivity(internalWorkflow.ActivityFunc) + + workflowRuns := make([]sdkclient.WorkflowRun, 0, workflowCount) + for i := 0; i < workflowCount; i++ { + workflowRun, err := env.SdkClient().ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{ + ID: testcore.RandomizeStr("wf_id-" + s.T().Name()), + TaskQueue: env.WorkerTaskQueue(), + }, workflowTypeName) + s.NoError(err) + s.NotNil(workflowRun) + workflowRuns = append(workflowRuns, workflowRun) + } + + s.EventuallyWithT(func(t *assert.CollectT) { + for _, workflowRun := range workflowRuns { + description, err := env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + require.NoError(t, err) + require.Len(t, description.GetPendingActivities(), 1) + require.Positive(t, internalWorkflow.startedActivityCount.Load()) + } + }, 5*time.Second, 100*time.Millisecond) + + for _, workflowRun := range workflowRuns { + resp, err := env.FrontendClient().PauseActivity(ctx, &workflowservice.PauseActivityRequest{ + Namespace: env.Namespace().String(), + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowRun.GetID(), + }, + Activity: &workflowservice.PauseActivityRequest_Id{Id: "activity-id"}, + }) + s.NoError(err) + s.NotNil(resp) + } + + s.EventuallyWithT(func(t *assert.CollectT) { + for _, workflowRun := range workflowRuns { + description, err := env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + require.NoError(t, err) + require.Len(t, description.PendingActivities, 1) + require.True(t, description.PendingActivities[0].Paused) + } + }, 5*time.Second, 100*time.Millisecond) + + query := fmt.Sprintf("WorkflowType='%s' AND ExecutionStatus = 'Running'", workflowTypeName) + s.EventuallyWithT(func(t *assert.CollectT) { + listResp, err := env.FrontendClient().ListWorkflowExecutions(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: env.Namespace().String(), + PageSize: workflowCount, + Query: query, + }) + require.NoError(t, err) + require.Len(t, listResp.GetExecutions(), workflowCount) + }, 5*time.Second, 500*time.Millisecond) + + jobID := uuid.NewString() + _, err := env.SdkClient().WorkflowService().StartBatchOperation(ctx, &workflowservice.StartBatchOperationRequest{ + Namespace: env.Namespace().String(), + Operation: &workflowservice.StartBatchOperationRequest_UnpauseActivitiesOperation{ + UnpauseActivitiesOperation: &batchpb.BatchOperationUnpauseActivities{ + Activity: &batchpb.BatchOperationUnpauseActivities_MatchAll{MatchAll: true}, + }, + }, + VisibilityQuery: query, + JobId: jobID, + Reason: "test", + }) + s.NoError(err) + + s.EventuallyWithT(func(t *assert.CollectT) { + descResp, err := env.FrontendClient().DescribeBatchOperation(ctx, &workflowservice.DescribeBatchOperationRequest{ + Namespace: env.Namespace().String(), + JobId: jobID, + }) + require.NoError(t, err) + require.Equal(t, enumspb.BATCH_OPERATION_STATE_COMPLETED, descResp.GetState()) + }, 15*time.Second, 100*time.Millisecond) + + for _, workflowRun := range workflowRuns { + description, err := env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + s.Len(description.PendingActivities, 1) + s.False(description.PendingActivities[0].Paused) + } + + internalWorkflow.letActivitySucceed.Store(true) + + for _, workflowRun := range workflowRuns { + var out string + err = workflowRun.Get(ctx, &out) + s.NoError(err) + } +} + func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Failed() { env := testcore.NewEnv(s.T(), testcore.WithWorkerService("batch operations")) diff --git a/tests/activity_api_batch_update_options_test.go b/tests/activity_api_batch_update_options_test.go index 5c34ca96de6..33fb4dc2be3 100644 --- a/tests/activity_api_batch_update_options_test.go +++ b/tests/activity_api_batch_update_options_test.go @@ -12,9 +12,11 @@ import ( activitypb "go.temporal.io/api/activity/v1" batchpb "go.temporal.io/api/batch/v1" commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" sdkclient "go.temporal.io/sdk/client" + "go.temporal.io/sdk/workflow" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/searchattribute/sadefs" "go.temporal.io/server/common/testing/parallelsuite" @@ -192,6 +194,98 @@ func (s *ActivityAPIBatchUpdateOptionsSuite) TestActivityBatchUpdateOptionsSucce env.NoError(err) } +func (s *ActivityAPIBatchUpdateOptionsSuite) TestActivityBatchUpdateOptionsMatchAll() { + env := testcore.NewEnv(s.T(), + testcore.WithWorkerService("batch operations"), + testcore.WithDynamicConfig(dynamicconfig.FrontendMaxConcurrentBatchOperationPerNamespace, testcore.ClientSuiteLimit), + ) + + ctx := env.Context() + const workflowCount = 10 + workflowTypeName := testcore.RandomizeStr("activity-batch-update-options-match-all-workflow") + updatedScheduleToClose := 10 * time.Second + + internalWorkflow := newInternalWorkflow() + + env.SdkWorker().RegisterWorkflowWithOptions(internalWorkflow.WorkflowFunc, workflow.RegisterOptions{Name: workflowTypeName}) + env.SdkWorker().RegisterActivity(internalWorkflow.ActivityFunc) + + workflowRuns := make([]sdkclient.WorkflowRun, 0, workflowCount) + for i := 0; i < workflowCount; i++ { + workflowRun, err := env.SdkClient().ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{ + ID: testcore.RandomizeStr("wf_id-" + s.T().Name()), + TaskQueue: env.WorkerTaskQueue(), + }, workflowTypeName) + env.NoError(err) + env.NotNil(workflowRun) + workflowRuns = append(workflowRuns, workflowRun) + } + + env.EventuallyWithT(func(t *assert.CollectT) { + for _, workflowRun := range workflowRuns { + description, err := env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + require.NoError(t, err) + require.Len(t, description.GetPendingActivities(), 1) + require.Positive(t, internalWorkflow.startedActivityCount.Load()) + } + }, 5*time.Second, 100*time.Millisecond) + + query := fmt.Sprintf("WorkflowType='%s' AND ExecutionStatus = 'Running'", workflowTypeName) + env.EventuallyWithT(func(t *assert.CollectT) { + listResp, err := env.FrontendClient().ListWorkflowExecutions(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: env.Namespace().String(), + PageSize: workflowCount, + Query: query, + }) + require.NoError(t, err) + require.Len(t, listResp.GetExecutions(), workflowCount) + }, 5*time.Second, 500*time.Millisecond) + + jobID := uuid.NewString() + _, err := env.SdkClient().WorkflowService().StartBatchOperation(ctx, &workflowservice.StartBatchOperationRequest{ + Namespace: env.Namespace().String(), + Operation: &workflowservice.StartBatchOperationRequest_UpdateActivityOptionsOperation{ + UpdateActivityOptionsOperation: &batchpb.BatchOperationUpdateActivityOptions{ + Activity: &batchpb.BatchOperationUpdateActivityOptions_MatchAll{MatchAll: true}, + ActivityOptions: &activitypb.ActivityOptions{ + ScheduleToCloseTimeout: durationpb.New(updatedScheduleToClose), + }, + UpdateMask: &fieldmaskpb.FieldMask{ + Paths: []string{"schedule_to_close_timeout"}, + }, + }, + }, + VisibilityQuery: query, + JobId: jobID, + Reason: "test", + }) + env.NoError(err) + + env.EventuallyWithT(func(t *assert.CollectT) { + descResp, err := env.FrontendClient().DescribeBatchOperation(ctx, &workflowservice.DescribeBatchOperationRequest{ + Namespace: env.Namespace().String(), + JobId: jobID, + }) + require.NoError(t, err) + require.Equal(t, enumspb.BATCH_OPERATION_STATE_COMPLETED, descResp.GetState()) + }, 15*time.Second, 100*time.Millisecond) + + for _, workflowRun := range workflowRuns { + description, err := env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + env.NoError(err) + env.Len(description.PendingActivities, 1) + env.Equal(updatedScheduleToClose, description.PendingActivities[0].ActivityOptions.ScheduleToCloseTimeout.AsDuration()) + } + + internalWorkflow.letActivitySucceed.Store(true) + + for _, workflowRun := range workflowRuns { + var out string + err = workflowRun.Get(ctx, &out) + env.NoError(err) + } +} + func (s *ActivityAPIBatchUpdateOptionsSuite) TestActivityBatchUpdateOptionsFailed() { env := testcore.NewEnv(s.T(), testcore.WithWorkerService("batch operations"), From 1651dd965cdb19aba4460a6977b12a3e3a01afee Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Thu, 18 Jun 2026 09:30:11 -0600 Subject: [PATCH 2/4] linters --- tests/activity_api_batch_reset_test.go | 26 ++++++------ tests/activity_api_batch_unpause_test.go | 36 ++++++++--------- .../activity_api_batch_update_options_test.go | 40 +++++++++---------- 3 files changed, 51 insertions(+), 51 deletions(-) diff --git a/tests/activity_api_batch_reset_test.go b/tests/activity_api_batch_reset_test.go index 7bca9d8c2ab..ba5dbdb5d9b 100644 --- a/tests/activity_api_batch_reset_test.go +++ b/tests/activity_api_batch_reset_test.go @@ -333,26 +333,26 @@ func (s *ActivityAPIBatchResetClientTestSuite) TestActivityBatchReset_RunningWor workflowRuns = append(workflowRuns, workflowRun) } - s.EventuallyWithT(func(t *assert.CollectT) { + s.Await(func(s *ActivityAPIBatchResetClientTestSuite) { for _, workflowRun := range workflowRuns { - description, err := env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) - require.NoError(t, err) - require.Len(t, description.PendingActivities, 1) - require.Greater(t, description.PendingActivities[0].Attempt, int32(3)) + description, err := env.SdkClient().DescribeWorkflowExecution(s.Context(), workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + s.Len(description.PendingActivities, 1) + s.Greater(description.PendingActivities[0].Attempt, int32(3)) } }, 15*time.Second, 100*time.Millisecond) env.SdkWorker().Stop() query := fmt.Sprintf("WorkflowType='%s' AND ExecutionStatus = 'Running'", workflowTypeName) - s.EventuallyWithT(func(t *assert.CollectT) { - listResp, err := env.FrontendClient().ListWorkflowExecutions(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + s.Await(func(s *ActivityAPIBatchResetClientTestSuite) { + listResp, err := env.FrontendClient().ListWorkflowExecutions(s.Context(), &workflowservice.ListWorkflowExecutionsRequest{ Namespace: env.Namespace().String(), PageSize: workflowCount, Query: query, }) - require.NoError(t, err) - require.Len(t, listResp.GetExecutions(), workflowCount) + s.NoError(err) + s.Len(listResp.GetExecutions(), workflowCount) }, 5*time.Second, 500*time.Millisecond) jobID := uuid.NewString() @@ -371,13 +371,13 @@ func (s *ActivityAPIBatchResetClientTestSuite) TestActivityBatchReset_RunningWor }) s.NoError(err) - s.EventuallyWithT(func(t *assert.CollectT) { - descResp, err := env.FrontendClient().DescribeBatchOperation(ctx, &workflowservice.DescribeBatchOperationRequest{ + s.Await(func(s *ActivityAPIBatchResetClientTestSuite) { + descResp, err := env.FrontendClient().DescribeBatchOperation(s.Context(), &workflowservice.DescribeBatchOperationRequest{ Namespace: env.Namespace().String(), JobId: jobID, }) - require.NoError(t, err) - require.Equal(t, enumspb.BATCH_OPERATION_STATE_COMPLETED, descResp.GetState()) + s.NoError(err) + s.Equal(enumspb.BATCH_OPERATION_STATE_COMPLETED, descResp.GetState()) }, 15*time.Second, 100*time.Millisecond) for _, workflowRun := range workflowRuns { diff --git a/tests/activity_api_batch_unpause_test.go b/tests/activity_api_batch_unpause_test.go index 2888eda082f..f75fadef40d 100644 --- a/tests/activity_api_batch_unpause_test.go +++ b/tests/activity_api_batch_unpause_test.go @@ -222,12 +222,12 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_MatchA workflowRuns = append(workflowRuns, workflowRun) } - s.EventuallyWithT(func(t *assert.CollectT) { + s.Await(func(s *ActivityApiBatchUnpauseClientTestSuite) { for _, workflowRun := range workflowRuns { - description, err := env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) - require.NoError(t, err) - require.Len(t, description.GetPendingActivities(), 1) - require.Positive(t, internalWorkflow.startedActivityCount.Load()) + description, err := env.SdkClient().DescribeWorkflowExecution(s.Context(), workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + s.Len(description.GetPendingActivities(), 1) + s.Positive(internalWorkflow.startedActivityCount.Load()) } }, 5*time.Second, 100*time.Millisecond) @@ -243,24 +243,24 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_MatchA s.NotNil(resp) } - s.EventuallyWithT(func(t *assert.CollectT) { + s.Await(func(s *ActivityApiBatchUnpauseClientTestSuite) { for _, workflowRun := range workflowRuns { - description, err := env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) - require.NoError(t, err) - require.Len(t, description.PendingActivities, 1) - require.True(t, description.PendingActivities[0].Paused) + description, err := env.SdkClient().DescribeWorkflowExecution(s.Context(), workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + s.Len(description.PendingActivities, 1) + s.True(description.PendingActivities[0].Paused) } }, 5*time.Second, 100*time.Millisecond) query := fmt.Sprintf("WorkflowType='%s' AND ExecutionStatus = 'Running'", workflowTypeName) - s.EventuallyWithT(func(t *assert.CollectT) { - listResp, err := env.FrontendClient().ListWorkflowExecutions(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + s.Await(func(s *ActivityApiBatchUnpauseClientTestSuite) { + listResp, err := env.FrontendClient().ListWorkflowExecutions(s.Context(), &workflowservice.ListWorkflowExecutionsRequest{ Namespace: env.Namespace().String(), PageSize: workflowCount, Query: query, }) - require.NoError(t, err) - require.Len(t, listResp.GetExecutions(), workflowCount) + s.NoError(err) + s.Len(listResp.GetExecutions(), workflowCount) }, 5*time.Second, 500*time.Millisecond) jobID := uuid.NewString() @@ -277,13 +277,13 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_MatchA }) s.NoError(err) - s.EventuallyWithT(func(t *assert.CollectT) { - descResp, err := env.FrontendClient().DescribeBatchOperation(ctx, &workflowservice.DescribeBatchOperationRequest{ + s.Await(func(s *ActivityApiBatchUnpauseClientTestSuite) { + descResp, err := env.FrontendClient().DescribeBatchOperation(s.Context(), &workflowservice.DescribeBatchOperationRequest{ Namespace: env.Namespace().String(), JobId: jobID, }) - require.NoError(t, err) - require.Equal(t, enumspb.BATCH_OPERATION_STATE_COMPLETED, descResp.GetState()) + s.NoError(err) + s.Equal(enumspb.BATCH_OPERATION_STATE_COMPLETED, descResp.GetState()) }, 15*time.Second, 100*time.Millisecond) for _, workflowRun := range workflowRuns { diff --git a/tests/activity_api_batch_update_options_test.go b/tests/activity_api_batch_update_options_test.go index 33fb4dc2be3..df75e973211 100644 --- a/tests/activity_api_batch_update_options_test.go +++ b/tests/activity_api_batch_update_options_test.go @@ -216,29 +216,29 @@ func (s *ActivityAPIBatchUpdateOptionsSuite) TestActivityBatchUpdateOptionsMatch ID: testcore.RandomizeStr("wf_id-" + s.T().Name()), TaskQueue: env.WorkerTaskQueue(), }, workflowTypeName) - env.NoError(err) - env.NotNil(workflowRun) + require.NoError(s.T(), err) + require.NotNil(s.T(), workflowRun) workflowRuns = append(workflowRuns, workflowRun) } - env.EventuallyWithT(func(t *assert.CollectT) { + s.Await(func(s *ActivityAPIBatchUpdateOptionsSuite) { for _, workflowRun := range workflowRuns { - description, err := env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) - require.NoError(t, err) - require.Len(t, description.GetPendingActivities(), 1) - require.Positive(t, internalWorkflow.startedActivityCount.Load()) + description, err := env.SdkClient().DescribeWorkflowExecution(s.Context(), workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + s.Len(description.GetPendingActivities(), 1) + s.Positive(internalWorkflow.startedActivityCount.Load()) } }, 5*time.Second, 100*time.Millisecond) query := fmt.Sprintf("WorkflowType='%s' AND ExecutionStatus = 'Running'", workflowTypeName) - env.EventuallyWithT(func(t *assert.CollectT) { - listResp, err := env.FrontendClient().ListWorkflowExecutions(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + s.Await(func(s *ActivityAPIBatchUpdateOptionsSuite) { + listResp, err := env.FrontendClient().ListWorkflowExecutions(s.Context(), &workflowservice.ListWorkflowExecutionsRequest{ Namespace: env.Namespace().String(), PageSize: workflowCount, Query: query, }) - require.NoError(t, err) - require.Len(t, listResp.GetExecutions(), workflowCount) + s.NoError(err) + s.Len(listResp.GetExecutions(), workflowCount) }, 5*time.Second, 500*time.Millisecond) jobID := uuid.NewString() @@ -259,22 +259,22 @@ func (s *ActivityAPIBatchUpdateOptionsSuite) TestActivityBatchUpdateOptionsMatch JobId: jobID, Reason: "test", }) - env.NoError(err) + require.NoError(s.T(), err) - env.EventuallyWithT(func(t *assert.CollectT) { - descResp, err := env.FrontendClient().DescribeBatchOperation(ctx, &workflowservice.DescribeBatchOperationRequest{ + s.Await(func(s *ActivityAPIBatchUpdateOptionsSuite) { + descResp, err := env.FrontendClient().DescribeBatchOperation(s.Context(), &workflowservice.DescribeBatchOperationRequest{ Namespace: env.Namespace().String(), JobId: jobID, }) - require.NoError(t, err) - require.Equal(t, enumspb.BATCH_OPERATION_STATE_COMPLETED, descResp.GetState()) + s.NoError(err) + s.Equal(enumspb.BATCH_OPERATION_STATE_COMPLETED, descResp.GetState()) }, 15*time.Second, 100*time.Millisecond) for _, workflowRun := range workflowRuns { description, err := env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) - env.NoError(err) - env.Len(description.PendingActivities, 1) - env.Equal(updatedScheduleToClose, description.PendingActivities[0].ActivityOptions.ScheduleToCloseTimeout.AsDuration()) + require.NoError(s.T(), err) + require.Len(s.T(), description.PendingActivities, 1) + require.Equal(s.T(), updatedScheduleToClose, description.PendingActivities[0].ActivityOptions.ScheduleToCloseTimeout.AsDuration()) } internalWorkflow.letActivitySucceed.Store(true) @@ -282,7 +282,7 @@ func (s *ActivityAPIBatchUpdateOptionsSuite) TestActivityBatchUpdateOptionsMatch for _, workflowRun := range workflowRuns { var out string err = workflowRun.Get(ctx, &out) - env.NoError(err) + require.NoError(s.T(), err) } } From f83fe0b518f3506bf8fbdb522b0fbe70f33cb222 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Thu, 18 Jun 2026 09:33:33 -0600 Subject: [PATCH 3/4] context --- tests/activity_api_batch_unpause_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/activity_api_batch_unpause_test.go b/tests/activity_api_batch_unpause_test.go index f75fadef40d..9a3646b7b49 100644 --- a/tests/activity_api_batch_unpause_test.go +++ b/tests/activity_api_batch_unpause_test.go @@ -200,8 +200,7 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Succes func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_MatchAll() { env := testcore.NewEnv(s.T(), testcore.WithWorkerService("batch operations")) - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() + ctx := env.Context() const workflowCount = 10 workflowTypeName := testcore.RandomizeStr("activity-batch-unpause-match-all-workflow") From 9b1dd6a0dd3c7cccaf5e773f65e37a9e938faf24 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Thu, 18 Jun 2026 11:22:05 -0600 Subject: [PATCH 4/4] more lint fixes --- tests/activity_api_batch_reset_test.go | 2 +- tests/activity_api_batch_unpause_test.go | 2 +- tests/activity_api_batch_update_options_test.go | 16 ++++++++-------- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/activity_api_batch_reset_test.go b/tests/activity_api_batch_reset_test.go index ba5dbdb5d9b..ffcb19644a5 100644 --- a/tests/activity_api_batch_reset_test.go +++ b/tests/activity_api_batch_reset_test.go @@ -323,7 +323,7 @@ func (s *ActivityAPIBatchResetClientTestSuite) TestActivityBatchReset_RunningWor env.SdkWorker().RegisterActivity(internalWorkflow.ActivityFunc) workflowRuns := make([]sdkclient.WorkflowRun, 0, workflowCount) - for i := 0; i < workflowCount; i++ { + for range workflowCount { workflowRun, err := env.SdkClient().ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{ ID: testcore.RandomizeStr("wf_id-" + s.T().Name()), TaskQueue: env.WorkerTaskQueue(), diff --git a/tests/activity_api_batch_unpause_test.go b/tests/activity_api_batch_unpause_test.go index 9a3646b7b49..357d1c88b04 100644 --- a/tests/activity_api_batch_unpause_test.go +++ b/tests/activity_api_batch_unpause_test.go @@ -211,7 +211,7 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_MatchA env.SdkWorker().RegisterActivity(internalWorkflow.ActivityFunc) workflowRuns := make([]sdkclient.WorkflowRun, 0, workflowCount) - for i := 0; i < workflowCount; i++ { + for range workflowCount { workflowRun, err := env.SdkClient().ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{ ID: testcore.RandomizeStr("wf_id-" + s.T().Name()), TaskQueue: env.WorkerTaskQueue(), diff --git a/tests/activity_api_batch_update_options_test.go b/tests/activity_api_batch_update_options_test.go index df75e973211..1c4913a9a54 100644 --- a/tests/activity_api_batch_update_options_test.go +++ b/tests/activity_api_batch_update_options_test.go @@ -211,13 +211,13 @@ func (s *ActivityAPIBatchUpdateOptionsSuite) TestActivityBatchUpdateOptionsMatch env.SdkWorker().RegisterActivity(internalWorkflow.ActivityFunc) workflowRuns := make([]sdkclient.WorkflowRun, 0, workflowCount) - for i := 0; i < workflowCount; i++ { + for range workflowCount { workflowRun, err := env.SdkClient().ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{ ID: testcore.RandomizeStr("wf_id-" + s.T().Name()), TaskQueue: env.WorkerTaskQueue(), }, workflowTypeName) - require.NoError(s.T(), err) - require.NotNil(s.T(), workflowRun) + s.NoError(err) + s.NotNil(workflowRun) workflowRuns = append(workflowRuns, workflowRun) } @@ -259,7 +259,7 @@ func (s *ActivityAPIBatchUpdateOptionsSuite) TestActivityBatchUpdateOptionsMatch JobId: jobID, Reason: "test", }) - require.NoError(s.T(), err) + s.NoError(err) s.Await(func(s *ActivityAPIBatchUpdateOptionsSuite) { descResp, err := env.FrontendClient().DescribeBatchOperation(s.Context(), &workflowservice.DescribeBatchOperationRequest{ @@ -272,9 +272,9 @@ func (s *ActivityAPIBatchUpdateOptionsSuite) TestActivityBatchUpdateOptionsMatch for _, workflowRun := range workflowRuns { description, err := env.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) - require.NoError(s.T(), err) - require.Len(s.T(), description.PendingActivities, 1) - require.Equal(s.T(), updatedScheduleToClose, description.PendingActivities[0].ActivityOptions.ScheduleToCloseTimeout.AsDuration()) + s.NoError(err) + s.Len(description.PendingActivities, 1) + s.Equal(updatedScheduleToClose, description.PendingActivities[0].ActivityOptions.ScheduleToCloseTimeout.AsDuration()) } internalWorkflow.letActivitySucceed.Store(true) @@ -282,7 +282,7 @@ func (s *ActivityAPIBatchUpdateOptionsSuite) TestActivityBatchUpdateOptionsMatch for _, workflowRun := range workflowRuns { var out string err = workflowRun.Get(ctx, &out) - require.NoError(s.T(), err) + s.NoError(err) } }