Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_KEY_NAME;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TIMEOUT;
import static org.apache.hadoop.ozone.om.snapshot.SnapshotDiffManager.getSnapshotRootPath;
import static org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.checkSnapshotActive;
import static org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.dropColumnFamilyHandle;
import static org.apache.hadoop.ozone.om.snapshot.db.SnapshotDiffDBDefinition.SNAP_DIFF_PURGED_JOB_TABLE_NAME;
Expand Down Expand Up @@ -842,7 +841,7 @@ public SnapshotDiffResponse getSnapshotDiffReport(final String volume,
// Check if fromSnapshot and toSnapshot are equal.
if (Objects.equals(fromSnapshot, toSnapshot)) {
SnapshotDiffReportOzone diffReport = new SnapshotDiffReportOzone(
getSnapshotRootPath(volume, bucket).toString(), volume, bucket,
snapshotDiffManager.getSnapshotRootPath(volume, bucket).toString(), volume, bucket,
fromSnapshot, toSnapshot, Collections.emptyList(), null);
return new SnapshotDiffResponse(diffReport, DONE, 0L);
}
Expand Down Expand Up @@ -873,7 +872,7 @@ public SnapshotDiffResponse getSnapshotDiffResponse(final String volume,
// Check if fromSnapshot and toSnapshot are equal.
if (Objects.equals(fromSnapshot, toSnapshot)) {
SnapshotDiffReportOzone diffReport = new SnapshotDiffReportOzone(
getSnapshotRootPath(volume, bucket).toString(), volume, bucket,
snapshotDiffManager.getSnapshotRootPath(volume, bucket).toString(), volume, bucket,
fromSnapshot, toSnapshot, Collections.emptyList(), null);
return new SnapshotDiffResponse(diffReport, DONE, 0L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,10 +692,10 @@ public synchronized SubmitSnapshotDiffResponse submitSnapshotDiff(
}

@Nonnull
public static OFSPath getSnapshotRootPath(String volume, String bucket) {
public OFSPath getSnapshotRootPath(String volume, String bucket) {
org.apache.hadoop.fs.Path bucketPath = new org.apache.hadoop.fs.Path(
OZONE_URI_DELIMITER + volume + OZONE_URI_DELIMITER + bucket);
return new OFSPath(bucketPath, new OzoneConfiguration());
return new OFSPath(bucketPath, ozoneManager.getConfiguration());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,8 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -1170,105 +1167,110 @@ private void uploadSnapshotDiffJobToDb(SnapshotInfo fromSnapshot,

private static Stream<Arguments> threadPoolFullScenarios() {
return Stream.of(
Arguments.of("When there is a wait time between job batches",
500L, 45, 0),
Arguments.of("When there is no wait time between job batches",
0L, 20, 25)
Arguments.of("When the pool drains between job batches",
true, 45, 0),
// 10 running + 10 queued = 20 accepted, remaining 25 rejected
Arguments.of("When the pool does not drain between job batches",
false, 20, 25)

@SaketaChalamchala SaketaChalamchala Jun 22, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch @rhalm.
nit: Use full_thread_pool_size = 2 * OZONE_OM_SNAPSHOT_DIFF_THREAD_POOL_SIZE and 45 - full_thread_pool_size for the expected accepted and rejected jobs here.
Otherwise, LGTM.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @SaketaChalamchala, done.

);
}

@ParameterizedTest(name = "{0}")
@MethodSource("threadPoolFullScenarios")
public void testThreadPoolIsFull(String description,
long waitBetweenBatches,
boolean drainBetweenBatches,
int expectInProgressJobsCount,
int expectRejectedJobsCount)
throws Exception {
ExecutorService executorService = new ThreadPoolExecutor(100, 100, 0,
TimeUnit.MILLISECONDS, new SynchronousQueue<>()
);

List<SnapshotInfo> snapshotInfos = new ArrayList<>();

for (int i = 0; i < 10; i++) {
UUID snapshotId = UUID.randomUUID();
String snapshotName = "snap-" + snapshotId;
SnapshotInfo snapInfo = new SnapshotInfo.Builder()
.setSnapshotId(snapshotId)
.setVolumeName(VOLUME_NAME)
.setBucketName(BUCKET_NAME)
.setName(snapshotName)
.setSnapshotPath("fromSnapshotPath")
.build();
snapshotInfos.add(snapInfo);

when(snapshotInfoTable.get(getTableKey(VOLUME_NAME, BUCKET_NAME,
snapshotName))).thenReturn(snapInfo);
}

List<SnapshotInfo> snapshotInfos = createTestSnapshots(10);
SnapshotDiffManager spy = spy(snapshotDiffManager);

for (int i = 0; i < snapshotInfos.size(); i++) {
for (int j = i + 1; j < snapshotInfos.size(); j++) {
String fromSnapshotName = snapshotInfos.get(i).getName();
String toSnapshotName = snapshotInfos.get(j).getName();
CountDownLatch blockWorkers = new CountDownLatch(1);
AtomicInteger completedJobs = new AtomicInteger(0);
doAnswer(invocation -> {
blockWorkers.await();
completedJobs.incrementAndGet();
return null;
}).when(spy).generateSnapshotDiffReport(anyString(), anyString(),
eq(VOLUME_NAME), eq(BUCKET_NAME), anyString(), anyString(),
eq(false), eq(false));

doAnswer(invocation -> {
Thread.sleep(250L);
return null;
}).when(spy).generateSnapshotDiffReport(anyString(), anyString(),
eq(VOLUME_NAME), eq(BUCKET_NAME), eq(fromSnapshotName),
eq(toSnapshotName), eq(false), eq(false));
}
if (drainBetweenBatches) {
blockWorkers.countDown();
}

List<Future<SnapshotDiffResponse>> futures = new ArrayList<>();
for (int i = 0; i < snapshotInfos.size(); i++) {
for (int j = i + 1; j < snapshotInfos.size(); j++) {
String fromSnapshotName = snapshotInfos.get(i).getName();
String toSnapshotName = snapshotInfos.get(j).getName();

Future<SnapshotDiffResponse> future = executorService.submit(
() -> submitJob(spy, fromSnapshotName, toSnapshotName));
futures.add(future);
try {
List<SnapshotDiffResponse> responses = new ArrayList<>();
int totalSubmitted = 0;
for (int i = 0; i < snapshotInfos.size(); i++) {
for (int j = i + 1; j < snapshotInfos.size(); j++) {
String fromSnapshotName = snapshotInfos.get(i).getName();
String toSnapshotName = snapshotInfos.get(j).getName();
responses.add(submitJob(spy, fromSnapshotName, toSnapshotName));
totalSubmitted++;
}
if (drainBetweenBatches) {
final int expected = totalSubmitted;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in scope for the current PR to fix flakiness but may be considered as a follow up test improvement:
In the drainBetweenBatches scenario would a better check be to

  1. Keep the latch closed until full_thread_pool_size jobs are submitted
  2. Open the latch
  3. Submit new jobs when totalSubmitted - completedJobs.get() < full_thread_pool_size

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea, I opened HDDS-15648 as a follow-up.

attempt(() -> {
if (completedJobs.get() < expected) {
throw new IllegalStateException("Waiting for jobs to complete");
}
return null;
}, 50, TimeDuration.valueOf(100, TimeUnit.MILLISECONDS), null, null);
}
}
Thread.sleep(waitBetweenBatches);
}

// Wait to make sure that all jobs finish before assertion.
Thread.sleep(1000L);
int inProgressJobsCount = 0;
int rejectedJobsCount = 0;

for (Future<SnapshotDiffResponse> future : futures) {
SnapshotDiffResponse response = future.get();
if (response.getJobStatus() == IN_PROGRESS) {
inProgressJobsCount++;
} else if (response.getJobStatus() == REJECTED) {
rejectedJobsCount++;
} else {
throw new IllegalStateException("Unexpected job status.");
int inProgressJobsCount = 0;
int rejectedJobsCount = 0;
for (SnapshotDiffResponse response : responses) {
if (response.getJobStatus() == IN_PROGRESS) {
inProgressJobsCount++;
} else if (response.getJobStatus() == REJECTED) {
rejectedJobsCount++;
} else {
throw new IllegalStateException("Unexpected job status.");
}
}
}

assertEquals(expectInProgressJobsCount, inProgressJobsCount);
assertEquals(expectRejectedJobsCount, rejectedJobsCount);

int notFoundJobs = 0;
for (int i = 0; i < snapshotInfos.size(); i++) {
for (int j = i + 1; j < snapshotInfos.size(); j++) {
SnapshotDiffJob diffJob =
getSnapshotDiffJobFromDb(snapshotInfos.get(i),
snapshotInfos.get(j));
if (diffJob == null) {
notFoundJobs++;
assertEquals(expectInProgressJobsCount, inProgressJobsCount);
assertEquals(expectRejectedJobsCount, rejectedJobsCount);

int notFoundJobs = 0;
for (int i = 0; i < snapshotInfos.size(); i++) {
for (int j = i + 1; j < snapshotInfos.size(); j++) {
SnapshotDiffJob diffJob =
getSnapshotDiffJobFromDb(snapshotInfos.get(i),
snapshotInfos.get(j));
if (diffJob == null) {
notFoundJobs++;
}
}
}

// assert that rejected jobs were removed from the job table as well.
assertEquals(expectRejectedJobsCount, notFoundJobs);
} finally {
blockWorkers.countDown();
}
}

// assert that rejected jobs were removed from the job table as well.
assertEquals(expectRejectedJobsCount, notFoundJobs);
executorService.shutdown();
private List<SnapshotInfo> createTestSnapshots(int count) throws IOException {
List<SnapshotInfo> snapshotInfos = new ArrayList<>();
for (int i = 0; i < count; i++) {
UUID snapshotId = UUID.randomUUID();
String snapshotName = "snap-" + snapshotId;
SnapshotInfo snapInfo = new SnapshotInfo.Builder()
.setSnapshotId(snapshotId)
.setVolumeName(VOLUME_NAME)
.setBucketName(BUCKET_NAME)
.setName(snapshotName)
.setSnapshotPath("fromSnapshotPath")
.build();
snapshotInfos.add(snapInfo);
when(snapshotInfoTable.get(getTableKey(VOLUME_NAME, BUCKET_NAME, snapshotName)))
.thenReturn(snapInfo);
}
return snapshotInfos;
}

private SnapshotDiffResponse submitJob(SnapshotDiffManager diffManager,
Expand Down Expand Up @@ -1641,7 +1643,7 @@ public void testGetSnapshotDiffReportWhenDone() throws Exception {

SnapshotDiffManager spy = spy(snapshotDiffManager);
SnapshotDiffReportOzone dummyReport = new SnapshotDiffReportOzone(
SnapshotDiffManager.getSnapshotRootPath(ctx.volumeName, ctx.bucketName).toString(),
spy.getSnapshotRootPath(ctx.volumeName, ctx.bucketName).toString(),
ctx.volumeName, ctx.bucketName, ctx.fromSnapshotName, ctx.toSnapshotName,
expectedEntries, null);
doReturn(dummyReport).when(spy).createPageResponse(any(SnapshotDiffJob.class),
Expand Down
Loading