Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;

import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -88,6 +89,9 @@ public class FSORepairTool extends RepairTool {
private static final String REACHABLE_TABLE = "reachable";
private static final String PENDING_TO_DELETE_TABLE = "pendingToDelete";

@VisibleForTesting
static int tempDbBatchSize = 10_000;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to add a CLI option for batch size:

  • allow user to adjust it without rebuild (in case it is needed)
  • avoids @VisibleForTesting

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick feedback! @adoroszlai Good call, replaced with a --batch-size CLI option in 4fde712


@CommandLine.Option(names = {"--db"},
required = true,
description = "Path to OM RocksDB")
Expand Down Expand Up @@ -287,29 +291,31 @@ private void markReachableObjectsInBucket(OmVolumeArgs volume, OmBucketInfo buck
// Directory keys should have the form /volumeID/bucketID/parentID/name.
Stack<String> dirKeyStack = new Stack<>();

// Since the tool uses parent directories to check for reachability, add
// a reachable entry for the bucket as well.
addReachableEntry(volume, bucket, bucket);
// Initialize the stack with all immediate child directories of the
// bucket, and mark them all as reachable.
Collection<String> childDirs = getChildDirectoriesAndMarkAsReachable(volume, bucket, bucket);
dirKeyStack.addAll(childDirs);

while (!dirKeyStack.isEmpty()) {
// Get one directory and process its immediate children.
String currentDirKey = dirKeyStack.pop();
OmDirectoryInfo currentDir = directoryTable.get(currentDirKey);
if (currentDir == null) {
if (isVerbose()) {
info("Directory key" + currentDirKey + "to be processed was not found in the directory table.");
try (BatchedTempWriter writer = new BatchedTempWriter(reachableTable)) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of this diff is re-indentation from wrapping the existing DFS in this try-with-resources. The only behavioral change is opening a single BatchedTempWriter for the bucket and threading it into addReachableEntry/getChildDirectoriesAndMarkAsReachable. The walk itself is unchanged.

// Since the tool uses parent directories to check for reachability, add
// a reachable entry for the bucket as well.
addReachableEntry(volume, bucket, bucket, writer);
// Initialize the stack with all immediate child directories of the
// bucket, and mark them all as reachable.
Collection<String> childDirs = getChildDirectoriesAndMarkAsReachable(volume, bucket, bucket, writer);
dirKeyStack.addAll(childDirs);

while (!dirKeyStack.isEmpty()) {
// Get one directory and process its immediate children.
String currentDirKey = dirKeyStack.pop();
OmDirectoryInfo currentDir = directoryTable.get(currentDirKey);
if (currentDir == null) {
if (isVerbose()) {
info("Directory key" + currentDirKey + "to be processed was not found in the directory table.");
}
continue;
}
continue;
}

// TODO revisit this for a more memory efficient implementation,
// possibly making better use of RocksDB iterators.
childDirs = getChildDirectoriesAndMarkAsReachable(volume, bucket, currentDir);
dirKeyStack.addAll(childDirs);
// TODO revisit this for a more memory efficient implementation,
// possibly making better use of RocksDB iterators.
childDirs = getChildDirectoriesAndMarkAsReachable(volume, bucket, currentDir, writer);
dirKeyStack.addAll(childDirs);
}
}
}

Expand All @@ -321,48 +327,50 @@ private void markPendingToDeleteObjectsInBucket(OmVolumeArgs volume, OmBucketInf
// Find all deleted directories in this bucket and process their children
String bucketPrefix = OM_KEY_PREFIX + volume.getObjectID() + OM_KEY_PREFIX + bucket.getObjectID();

try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> deletedDirIterator =
deletedDirectoryTable.iterator()) {
deletedDirIterator.seek(bucketPrefix);
while (deletedDirIterator.hasNext()) {
Table.KeyValue<String, OmKeyInfo> deletedDirEntry = deletedDirIterator.next();
String deletedDirKey = deletedDirEntry.getKey();

// Only process deleted directories in this bucket
if (!deletedDirKey.startsWith(bucketPrefix)) {
break;
}
try (BatchedTempWriter writer = new BatchedTempWriter(pendingToDeleteTable)) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as markReachableObjectsInBucket: the large diff here is re-indentation from the try-with-resources wrap. Logic is unchanged apart from threading the writer.

try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> deletedDirIterator =
deletedDirectoryTable.iterator()) {
deletedDirIterator.seek(bucketPrefix);
while (deletedDirIterator.hasNext()) {
Table.KeyValue<String, OmKeyInfo> deletedDirEntry = deletedDirIterator.next();
String deletedDirKey = deletedDirEntry.getKey();

// Only process deleted directories in this bucket
if (!deletedDirKey.startsWith(bucketPrefix)) {
break;
}

// Extract the objectID from the deleted directory entry
OmKeyInfo deletedDirInfo = deletedDirEntry.getValue();
long deletedObjectID = deletedDirInfo.getObjectID();
// Extract the objectID from the deleted directory entry
OmKeyInfo deletedDirInfo = deletedDirEntry.getValue();
long deletedObjectID = deletedDirInfo.getObjectID();

// Build the prefix that children would have: /volID/bucketID/deletedObjectID/
String childPrefix = OM_KEY_PREFIX + volume.getObjectID() + OM_KEY_PREFIX + bucket.getObjectID() +
OM_KEY_PREFIX + deletedObjectID + OM_KEY_PREFIX;
// Build the prefix that children would have: /volID/bucketID/deletedObjectID/
String childPrefix = OM_KEY_PREFIX + volume.getObjectID() + OM_KEY_PREFIX + bucket.getObjectID() +
OM_KEY_PREFIX + deletedObjectID + OM_KEY_PREFIX;

// Find all children of this deleted directory and mark as pendingToDelete
Collection<String> childDirs = getChildDirectoriesAndMarkAsPendingToDelete(childPrefix);
dirKeyStack.addAll(childDirs);
// Find all children of this deleted directory and mark as pendingToDelete
Collection<String> childDirs = getChildDirectoriesAndMarkAsPendingToDelete(childPrefix, writer);
dirKeyStack.addAll(childDirs);
}
}
}

while (!dirKeyStack.isEmpty()) {
// Get one directory and process its immediate children.
String currentDirKey = dirKeyStack.pop();
OmDirectoryInfo currentDir = directoryTable.get(currentDirKey);
if (currentDir == null) {
if (isVerbose()) {
info("Directory key" + currentDirKey + "to be processed was not found in the directory table.");
while (!dirKeyStack.isEmpty()) {
// Get one directory and process its immediate children.
String currentDirKey = dirKeyStack.pop();
OmDirectoryInfo currentDir = directoryTable.get(currentDirKey);
if (currentDir == null) {
if (isVerbose()) {
info("Directory key" + currentDirKey + "to be processed was not found in the directory table.");
}
continue;
}
continue;
}

// For pendingToDelete directories, we need to build the prefix based on their objectID
String childPrefix = OM_KEY_PREFIX + volume.getObjectID() + OM_KEY_PREFIX + bucket.getObjectID() +
OM_KEY_PREFIX + currentDir.getObjectID() + OM_KEY_PREFIX;
Collection<String> childDirs = getChildDirectoriesAndMarkAsPendingToDelete(childPrefix);
dirKeyStack.addAll(childDirs);
// For pendingToDelete directories, we need to build the prefix based on their objectID
String childPrefix = OM_KEY_PREFIX + volume.getObjectID() + OM_KEY_PREFIX + bucket.getObjectID() +
OM_KEY_PREFIX + currentDir.getObjectID() + OM_KEY_PREFIX;
Collection<String> childDirs = getChildDirectoriesAndMarkAsPendingToDelete(childPrefix, writer);
dirKeyStack.addAll(childDirs);
}
}
}

Expand Down Expand Up @@ -467,7 +475,7 @@ protected void markDirectoryForDeletion(String volumeName, String bucketName,
}

private Collection<String> getChildDirectoriesAndMarkAsReachable(OmVolumeArgs volume, OmBucketInfo bucket,
WithObjectID currentDir) throws IOException {
WithObjectID currentDir, BatchedTempWriter writer) throws IOException {

Collection<String> childDirs = new ArrayList<>();

Expand All @@ -486,7 +494,7 @@ private Collection<String> getChildDirectoriesAndMarkAsReachable(OmVolumeArgs vo
break;
}
// This directory was reached by search.
addReachableEntry(volume, bucket, childDirEntry.getValue());
addReachableEntry(volume, bucket, childDirEntry.getValue(), writer);
childDirs.add(childDirKey);
reachableStats.addDir();
}
Expand All @@ -495,7 +503,8 @@ private Collection<String> getChildDirectoriesAndMarkAsReachable(OmVolumeArgs vo
return childDirs;
}

private Collection<String> getChildDirectoriesAndMarkAsPendingToDelete(String dirPrefix) throws IOException {
private Collection<String> getChildDirectoriesAndMarkAsPendingToDelete(String dirPrefix,
BatchedTempWriter writer) throws IOException {
Collection<String> childDirs = new ArrayList<>();

// Find child directories and mark them as pendingToDelete
Expand All @@ -515,7 +524,7 @@ private Collection<String> getChildDirectoriesAndMarkAsPendingToDelete(String di
// Ensure this is an immediate child, not a deeper descendant
String relativePath = childDirKey.substring(dirPrefix.length());
if (!relativePath.contains(OM_KEY_PREFIX)) {
addPendingToDeleteEntry(childDirKey);
addPendingToDeleteEntry(childDirKey, writer);
childDirs.add(childDirKey);
pendingToDeleteStats.addDir();
}
Expand All @@ -537,7 +546,7 @@ private Collection<String> getChildDirectoriesAndMarkAsPendingToDelete(String di
// Ensure this is an immediate child, not a deeper descendant
String relativePath = childFileKey.substring(dirPrefix.length());
if (!relativePath.contains(OM_KEY_PREFIX)) {
addPendingToDeleteEntry(childFileKey);
addPendingToDeleteEntry(childFileKey, writer);
pendingToDeleteStats.addFile(childFileEntry.getValue().getDataSize());
}
}
Expand All @@ -546,23 +555,55 @@ private Collection<String> getChildDirectoriesAndMarkAsPendingToDelete(String di
return childDirs;
}

/** Buffers writes to a temp.db table and flushes them in bounded batches. */
private final class BatchedTempWriter implements AutoCloseable {
private final Table<String, CodecBuffer> table;
private BatchOperation batch;
private int pending;

BatchedTempWriter(Table<String, CodecBuffer> table) {
this.table = table;
this.batch = tempDB.initBatchOperation();
}

void put(String key) throws IOException {
table.putWithBatch(batch, key, CodecBuffer.getEmptyBuffer());
if (++pending >= tempDbBatchSize) {
flush();
}
}

private void flush() throws IOException {
tempDB.commitBatchOperation(batch);
batch.close();
batch = tempDB.initBatchOperation();
pending = 0;
}

@Override
public void close() throws IOException {
if (pending > 0) {
tempDB.commitBatchOperation(batch);
}
batch.close();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set pending = 0?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strictly needed since the writer isn't reused after close() but agreed it's more defensive. Updated in c322eab

}
}

/**
* Add the specified object to the reachable table, indicating it is part
* of the connected FSO tree.
*/
private void addReachableEntry(OmVolumeArgs volume, OmBucketInfo bucket, WithObjectID object) throws IOException {
String reachableKey = buildReachableKey(volume, bucket, object);
// No value is needed for this table.
reachableTable.put(reachableKey, CodecBuffer.getEmptyBuffer());
private void addReachableEntry(OmVolumeArgs volume, OmBucketInfo bucket, WithObjectID object,
BatchedTempWriter writer) throws IOException {
writer.put(buildReachableKey(volume, bucket, object));
}

/**
* Add the specified object to the pendingToDelete table, indicating it is part
* of the disconnected FSO tree.
*/
private void addPendingToDeleteEntry(String originalKey) throws IOException {
// No value is needed for this table.
pendingToDeleteTable.put(originalKey, CodecBuffer.getEmptyBuffer());
private void addPendingToDeleteEntry(String originalKey, BatchedTempWriter writer) throws IOException {
writer.put(originalKey);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,28 @@ void testConnectedTreeOneBucket(boolean dryRun) {
assertEquals(expectedOutput, reportOutput);
}

/**
* Flush temp.db writes after every entry so the batch commit/reset path runs for both the
* reachable and pendingToDelete tables across all trees, and verify the report is unchanged.
*/
@Order(ORDER_DRY_RUN)
@Test
public void testBatchedTempWrites() {
int originalBatchSize = FSORepairTool.tempDbBatchSize;
FSORepairTool.tempDbBatchSize = 1;
try {
String expectedOutput = serializeReport(fullReport);

int exitCode = dryRun();
assertEquals(0, exitCode, err.getOutput());

String reportOutput = extractRelevantSection(out.getOutput());
assertEquals(expectedOutput, reportOutput);
} finally {
FSORepairTool.tempDbBatchSize = originalBatchSize;
}
}

/**
* Test to verify the file size of the tree.
*/
Expand Down