-
Notifications
You must be signed in to change notification settings - Fork 615
HDDS-14187. Use BatchOperation to batch writes to tables of FSORepairTool #10578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
46852b4
020e56b
4fde712
c322eab
b038d11
f72e970
66ea234
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -101,6 +101,12 @@ public class FSORepairTool extends RepairTool { | |
| description = "Filter by bucket name") | ||
| private String bucketFilter; | ||
|
|
||
| @CommandLine.Option(names = {"--batch-size"}, | ||
| defaultValue = "10000", | ||
| showDefaultValue = CommandLine.Help.Visibility.ALWAYS, | ||
| description = "Number of entries to buffer before flushing a batch of writes to temp.db.") | ||
| private int tempDbBatchSize; | ||
|
|
||
| @Nonnull | ||
| @Override | ||
| protected Component serviceToBeOffline() { | ||
|
|
@@ -109,6 +115,9 @@ protected Component serviceToBeOffline() { | |
|
|
||
| @Override | ||
| public void execute() throws Exception { | ||
| if (tempDbBatchSize < 1) { | ||
| throw new IllegalArgumentException("--batch-size must be at least 1, but was " + tempDbBatchSize); | ||
| } | ||
| try { | ||
| Impl repairTool = new Impl(); | ||
| repairTool.run(); | ||
|
|
@@ -287,29 +296,31 @@ private void markReachableObjectsInBucket(OmVolumeArgs volume, OmBucketInfo buck | |
| // Directory keys should have the form /volumeID/bucketID/parentID/name. | ||
| Stack<String> dirKeyStack = new Stack<>(); | ||
|
|
||
| // Since the tool uses parent directories to check for reachability, add | ||
| // a reachable entry for the bucket as well. | ||
| addReachableEntry(volume, bucket, bucket); | ||
| // Initialize the stack with all immediate child directories of the | ||
| // bucket, and mark them all as reachable. | ||
| Collection<String> childDirs = getChildDirectoriesAndMarkAsReachable(volume, bucket, bucket); | ||
| dirKeyStack.addAll(childDirs); | ||
|
|
||
| while (!dirKeyStack.isEmpty()) { | ||
| // Get one directory and process its immediate children. | ||
| String currentDirKey = dirKeyStack.pop(); | ||
| OmDirectoryInfo currentDir = directoryTable.get(currentDirKey); | ||
| if (currentDir == null) { | ||
| if (isVerbose()) { | ||
| info("Directory key" + currentDirKey + "to be processed was not found in the directory table."); | ||
| try (BatchedTempWriter writer = new BatchedTempWriter(reachableTable)) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Most of this diff is re-indentation from wrapping the existing DFS in this try-with-resources. The only behavioral change is opening a single |
||
| // Since the tool uses parent directories to check for reachability, add | ||
| // a reachable entry for the bucket as well. | ||
| addReachableEntry(volume, bucket, bucket, writer); | ||
| // Initialize the stack with all immediate child directories of the | ||
| // bucket, and mark them all as reachable. | ||
| Collection<String> childDirs = getChildDirectoriesAndMarkAsReachable(volume, bucket, bucket, writer); | ||
| dirKeyStack.addAll(childDirs); | ||
|
|
||
| while (!dirKeyStack.isEmpty()) { | ||
| // Get one directory and process its immediate children. | ||
| String currentDirKey = dirKeyStack.pop(); | ||
| OmDirectoryInfo currentDir = directoryTable.get(currentDirKey); | ||
| if (currentDir == null) { | ||
| if (isVerbose()) { | ||
| info("Directory key" + currentDirKey + "to be processed was not found in the directory table."); | ||
| } | ||
| continue; | ||
| } | ||
| continue; | ||
| } | ||
|
|
||
| // TODO revisit this for a more memory efficient implementation, | ||
| // possibly making better use of RocksDB iterators. | ||
| childDirs = getChildDirectoriesAndMarkAsReachable(volume, bucket, currentDir); | ||
| dirKeyStack.addAll(childDirs); | ||
| // TODO revisit this for a more memory efficient implementation, | ||
| // possibly making better use of RocksDB iterators. | ||
| childDirs = getChildDirectoriesAndMarkAsReachable(volume, bucket, currentDir, writer); | ||
| dirKeyStack.addAll(childDirs); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -321,48 +332,50 @@ private void markPendingToDeleteObjectsInBucket(OmVolumeArgs volume, OmBucketInf | |
| // Find all deleted directories in this bucket and process their children | ||
| String bucketPrefix = OM_KEY_PREFIX + volume.getObjectID() + OM_KEY_PREFIX + bucket.getObjectID(); | ||
|
|
||
| try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> deletedDirIterator = | ||
| deletedDirectoryTable.iterator()) { | ||
| deletedDirIterator.seek(bucketPrefix); | ||
| while (deletedDirIterator.hasNext()) { | ||
| Table.KeyValue<String, OmKeyInfo> deletedDirEntry = deletedDirIterator.next(); | ||
| String deletedDirKey = deletedDirEntry.getKey(); | ||
|
|
||
| // Only process deleted directories in this bucket | ||
| if (!deletedDirKey.startsWith(bucketPrefix)) { | ||
| break; | ||
| } | ||
| try (BatchedTempWriter writer = new BatchedTempWriter(pendingToDeleteTable)) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as |
||
| try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> deletedDirIterator = | ||
| deletedDirectoryTable.iterator()) { | ||
| deletedDirIterator.seek(bucketPrefix); | ||
| while (deletedDirIterator.hasNext()) { | ||
| Table.KeyValue<String, OmKeyInfo> deletedDirEntry = deletedDirIterator.next(); | ||
| String deletedDirKey = deletedDirEntry.getKey(); | ||
|
|
||
| // Only process deleted directories in this bucket | ||
| if (!deletedDirKey.startsWith(bucketPrefix)) { | ||
| break; | ||
| } | ||
|
|
||
| // Extract the objectID from the deleted directory entry | ||
| OmKeyInfo deletedDirInfo = deletedDirEntry.getValue(); | ||
| long deletedObjectID = deletedDirInfo.getObjectID(); | ||
| // Extract the objectID from the deleted directory entry | ||
| OmKeyInfo deletedDirInfo = deletedDirEntry.getValue(); | ||
| long deletedObjectID = deletedDirInfo.getObjectID(); | ||
|
|
||
| // Build the prefix that children would have: /volID/bucketID/deletedObjectID/ | ||
| String childPrefix = OM_KEY_PREFIX + volume.getObjectID() + OM_KEY_PREFIX + bucket.getObjectID() + | ||
| OM_KEY_PREFIX + deletedObjectID + OM_KEY_PREFIX; | ||
| // Build the prefix that children would have: /volID/bucketID/deletedObjectID/ | ||
| String childPrefix = OM_KEY_PREFIX + volume.getObjectID() + OM_KEY_PREFIX + bucket.getObjectID() + | ||
| OM_KEY_PREFIX + deletedObjectID + OM_KEY_PREFIX; | ||
|
|
||
| // Find all children of this deleted directory and mark as pendingToDelete | ||
| Collection<String> childDirs = getChildDirectoriesAndMarkAsPendingToDelete(childPrefix); | ||
| dirKeyStack.addAll(childDirs); | ||
| // Find all children of this deleted directory and mark as pendingToDelete | ||
| Collection<String> childDirs = getChildDirectoriesAndMarkAsPendingToDelete(childPrefix, writer); | ||
| dirKeyStack.addAll(childDirs); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| while (!dirKeyStack.isEmpty()) { | ||
| // Get one directory and process its immediate children. | ||
| String currentDirKey = dirKeyStack.pop(); | ||
| OmDirectoryInfo currentDir = directoryTable.get(currentDirKey); | ||
| if (currentDir == null) { | ||
| if (isVerbose()) { | ||
| info("Directory key" + currentDirKey + "to be processed was not found in the directory table."); | ||
| while (!dirKeyStack.isEmpty()) { | ||
| // Get one directory and process its immediate children. | ||
| String currentDirKey = dirKeyStack.pop(); | ||
| OmDirectoryInfo currentDir = directoryTable.get(currentDirKey); | ||
| if (currentDir == null) { | ||
| if (isVerbose()) { | ||
| info("Directory key" + currentDirKey + "to be processed was not found in the directory table."); | ||
| } | ||
| continue; | ||
| } | ||
| continue; | ||
| } | ||
|
|
||
| // For pendingToDelete directories, we need to build the prefix based on their objectID | ||
| String childPrefix = OM_KEY_PREFIX + volume.getObjectID() + OM_KEY_PREFIX + bucket.getObjectID() + | ||
| OM_KEY_PREFIX + currentDir.getObjectID() + OM_KEY_PREFIX; | ||
| Collection<String> childDirs = getChildDirectoriesAndMarkAsPendingToDelete(childPrefix); | ||
| dirKeyStack.addAll(childDirs); | ||
| // For pendingToDelete directories, we need to build the prefix based on their objectID | ||
| String childPrefix = OM_KEY_PREFIX + volume.getObjectID() + OM_KEY_PREFIX + bucket.getObjectID() + | ||
| OM_KEY_PREFIX + currentDir.getObjectID() + OM_KEY_PREFIX; | ||
| Collection<String> childDirs = getChildDirectoriesAndMarkAsPendingToDelete(childPrefix, writer); | ||
| dirKeyStack.addAll(childDirs); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -467,7 +480,7 @@ protected void markDirectoryForDeletion(String volumeName, String bucketName, | |
| } | ||
|
|
||
| private Collection<String> getChildDirectoriesAndMarkAsReachable(OmVolumeArgs volume, OmBucketInfo bucket, | ||
| WithObjectID currentDir) throws IOException { | ||
| WithObjectID currentDir, BatchedTempWriter writer) throws IOException { | ||
|
|
||
| Collection<String> childDirs = new ArrayList<>(); | ||
|
|
||
|
|
@@ -486,7 +499,7 @@ private Collection<String> getChildDirectoriesAndMarkAsReachable(OmVolumeArgs vo | |
| break; | ||
| } | ||
| // This directory was reached by search. | ||
| addReachableEntry(volume, bucket, childDirEntry.getValue()); | ||
| addReachableEntry(volume, bucket, childDirEntry.getValue(), writer); | ||
| childDirs.add(childDirKey); | ||
| reachableStats.addDir(); | ||
| } | ||
|
|
@@ -495,7 +508,8 @@ private Collection<String> getChildDirectoriesAndMarkAsReachable(OmVolumeArgs vo | |
| return childDirs; | ||
| } | ||
|
|
||
| private Collection<String> getChildDirectoriesAndMarkAsPendingToDelete(String dirPrefix) throws IOException { | ||
| private Collection<String> getChildDirectoriesAndMarkAsPendingToDelete(String dirPrefix, | ||
| BatchedTempWriter writer) throws IOException { | ||
| Collection<String> childDirs = new ArrayList<>(); | ||
|
|
||
| // Find child directories and mark them as pendingToDelete | ||
|
|
@@ -515,7 +529,7 @@ private Collection<String> getChildDirectoriesAndMarkAsPendingToDelete(String di | |
| // Ensure this is an immediate child, not a deeper descendant | ||
| String relativePath = childDirKey.substring(dirPrefix.length()); | ||
| if (!relativePath.contains(OM_KEY_PREFIX)) { | ||
| addPendingToDeleteEntry(childDirKey); | ||
| addPendingToDeleteEntry(childDirKey, writer); | ||
| childDirs.add(childDirKey); | ||
| pendingToDeleteStats.addDir(); | ||
| } | ||
|
|
@@ -537,7 +551,7 @@ private Collection<String> getChildDirectoriesAndMarkAsPendingToDelete(String di | |
| // Ensure this is an immediate child, not a deeper descendant | ||
| String relativePath = childFileKey.substring(dirPrefix.length()); | ||
| if (!relativePath.contains(OM_KEY_PREFIX)) { | ||
| addPendingToDeleteEntry(childFileKey); | ||
| addPendingToDeleteEntry(childFileKey, writer); | ||
| pendingToDeleteStats.addFile(childFileEntry.getValue().getDataSize()); | ||
| } | ||
| } | ||
|
|
@@ -546,23 +560,61 @@ private Collection<String> getChildDirectoriesAndMarkAsPendingToDelete(String di | |
| return childDirs; | ||
| } | ||
|
|
||
| /** Buffers writes to a temp.db table and flushes them in bounded batches. */ | ||
| private final class BatchedTempWriter implements AutoCloseable { | ||
| private final Table<String, CodecBuffer> table; | ||
| private BatchOperation batch; | ||
| private int pending; | ||
|
|
||
| BatchedTempWriter(Table<String, CodecBuffer> table) { | ||
| this.table = table; | ||
| this.batch = tempDB.initBatchOperation(); | ||
| } | ||
|
|
||
| void put(String key) throws IOException { | ||
| table.putWithBatch(batch, key, CodecBuffer.getEmptyBuffer()); | ||
| if (++pending >= tempDbBatchSize) { | ||
| flush(); | ||
| } | ||
| } | ||
|
|
||
| private void flush() throws IOException { | ||
| commitPending(); | ||
| batch = tempDB.initBatchOperation(); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() throws IOException { | ||
| commitPending(); | ||
| } | ||
|
|
||
| private void commitPending() throws IOException { | ||
| try { | ||
| if (pending > 0) { | ||
| tempDB.commitBatchOperation(batch); | ||
| } | ||
| } finally { | ||
| pending = 0; | ||
| batch.close(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Add the specified object to the reachable table, indicating it is part | ||
| * of the connected FSO tree. | ||
| */ | ||
| private void addReachableEntry(OmVolumeArgs volume, OmBucketInfo bucket, WithObjectID object) throws IOException { | ||
| String reachableKey = buildReachableKey(volume, bucket, object); | ||
| // No value is needed for this table. | ||
| reachableTable.put(reachableKey, CodecBuffer.getEmptyBuffer()); | ||
| private void addReachableEntry(OmVolumeArgs volume, OmBucketInfo bucket, WithObjectID object, | ||
| BatchedTempWriter writer) throws IOException { | ||
| writer.put(buildReachableKey(volume, bucket, object)); | ||
| } | ||
|
|
||
| /** | ||
| * Add the specified object to the pendingToDelete table, indicating it is part | ||
| * of the disconnected FSO tree. | ||
| */ | ||
| private void addPendingToDeleteEntry(String originalKey) throws IOException { | ||
| // No value is needed for this table. | ||
| pendingToDeleteTable.put(originalKey, CodecBuffer.getEmptyBuffer()); | ||
| private void addPendingToDeleteEntry(String originalKey, BatchedTempWriter writer) throws IOException { | ||
| writer.put(originalKey); | ||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Add
showDefaultValue = Visibility.ALWAYSThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, done 66ea234