diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java index 15d3a1f4425..59f156febaf 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java @@ -69,6 +69,40 @@ public class ContainerHealthSchemaManager { */ static final int MAX_IN_CLAUSE_CHUNK_SIZE = 1_000; + /** + * Minimum length of a contiguous container-id run that is deleted with a + * range ({@code BETWEEN}) term instead of being expanded into individual + * {@code IN} values. + * + *

Derby compiles every statement to JVM bytecode whose size grows with the + * number of id literals in the predicate. A contiguous run of {@code L} ids + * costs {@code L} operands when listed in an {@code IN} clause, but only + * {@link #RANGE_OPERAND_COST} (the two endpoints) when written as + * {@code BETWEEN lo AND hi}, independent of {@code L}. The range form is + * therefore strictly smaller once {@code L > RANGE_OPERAND_COST}, i.e. for + * runs of 3 or more ids; runs of 1–2 ids are left as {@code IN} values since + * the range form would save nothing. This lets the optimization benefit + * realistic, fragmented unhealthy-container sets rather than only rare + * 1,000+ contiguous ranges.

+ */ + static final int MIN_RANGE_RUN_LENGTH = 3; + + /** Estimated predicate operand cost of a single {@code BETWEEN} range term. */ + private static final int RANGE_OPERAND_COST = 2; + + /** + * Upper bound on the estimated number of id operands packed into one combined + * DELETE predicate before it is flushed. + * + *

Range terms count as {@link #RANGE_OPERAND_COST} operands and scattered + * ids as one operand each. The cap reuses {@link #MAX_IN_CLAUSE_CHUNK_SIZE} — + * the id count already proven to keep a pure {@code IN} delete well under + * Derby's 64 KB per-method bytecode limit — so a mixed range/{@code IN} + * predicate of the same operand budget stays within the same safe + * envelope.

+ */ + static final int MAX_PREDICATE_OPERANDS = MAX_IN_CLAUSE_CHUNK_SIZE; + private final ContainerSchemaDefinition containerSchemaDefinition; private final int unhealthyContainersFetchSize; @@ -205,28 +239,101 @@ public void replaceUnhealthyContainerRecordsAtomically( private int deleteScmStatesForContainers(DSLContext dslContext, List containerIds) { + + // Sort and de-duplicate so contiguous runs are detectable regardless of the + // order in which the caller supplied the ids. + List sortedIds = containerIds.stream() + .distinct() + .sorted() + .collect(Collectors.toList()); + int totalDeleted = 0; - for (int from = 0; from < containerIds.size(); from += MAX_IN_CLAUSE_CHUNK_SIZE) { - int to = Math.min(from + MAX_IN_CLAUSE_CHUNK_SIZE, containerIds.size()); - List chunk = containerIds.subList(from, to); - - int deleted = dslContext.deleteFrom(UNHEALTHY_CONTAINERS) - .where(UNHEALTHY_CONTAINERS.CONTAINER_ID.in(chunk)) - .and(UNHEALTHY_CONTAINERS.CONTAINER_STATE.in( - UnHealthyContainerStates.MISSING.toString(), - UnHealthyContainerStates.EMPTY_MISSING.toString(), - UnHealthyContainerStates.UNDER_REPLICATED.toString(), - UnHealthyContainerStates.OVER_REPLICATED.toString(), - UnHealthyContainerStates.MIS_REPLICATED.toString(), - UnHealthyContainerStates.NEGATIVE_SIZE.toString(), - UnHealthyContainerStates.REPLICA_MISMATCH.toString())) - .execute(); - totalDeleted += deleted; + // Build each DELETE predicate as the combination of compact range terms (one + // BETWEEN per contiguous run of at least MIN_RANGE_RUN_LENGTH ids) and a + // single IN list for the remaining scattered ids. Terms are accumulated + // until the estimated operand count reaches MAX_PREDICATE_OPERANDS, then the + // statement is flushed so the generated Derby bytecode stays within budget. + List ranges = new ArrayList<>(); + List points = new ArrayList<>(); + int operandsUsed = 0; + + int i = 0; + while (i < sortedIds.size()) { + int runStart = i; + while (i + 1 < sortedIds.size() + && sortedIds.get(i + 1) == sortedIds.get(i) + 1) { + i++; + } + int runLength = i - runStart + 1; + + // A contiguous run is cheaper as a BETWEEN (RANGE_OPERAND_COST operands, + // regardless of length) than as IN values (one operand each) once it spans + // at least MIN_RANGE_RUN_LENGTH ids; shorter runs stay as IN points. + boolean asRange = runLength >= MIN_RANGE_RUN_LENGTH; + int termOperands = asRange ? RANGE_OPERAND_COST : runLength; + + if (operandsUsed > 0 + && operandsUsed + termOperands > MAX_PREDICATE_OPERANDS) { + totalDeleted += executeCombinedDelete(dslContext, ranges, points); + ranges.clear(); + points.clear(); + operandsUsed = 0; + } + + if (asRange) { + ranges.add(new long[] {sortedIds.get(runStart), sortedIds.get(i)}); + } else { + for (int j = runStart; j <= i; j++) { + points.add(sortedIds.get(j)); + } + } + operandsUsed += termOperands; + i++; + } + + if (operandsUsed > 0) { + totalDeleted += executeCombinedDelete(dslContext, ranges, points); } return totalDeleted; } + /** + * Executes a single DELETE whose container-id predicate is the combination of the + * given contiguous ranges (as {@code BETWEEN} terms) and scattered ids (as a + * single {@code IN} list), restricted to SCM-generated states. + */ + private int executeCombinedDelete(DSLContext dslContext, + List ranges, List points) { + Condition idCondition = null; + if (!points.isEmpty()) { + idCondition = UNHEALTHY_CONTAINERS.CONTAINER_ID.in(points); + } + for (long[] range : ranges) { + Condition rangeCondition = + UNHEALTHY_CONTAINERS.CONTAINER_ID.between(range[0], range[1]); + idCondition = (idCondition == null) + ? rangeCondition : idCondition.or(rangeCondition); + } + if (idCondition == null) { + return 0; + } + return dslContext.deleteFrom(UNHEALTHY_CONTAINERS) + .where(idCondition.and(scmGeneratedStatesCondition())) + .execute(); + } + + private Condition scmGeneratedStatesCondition() { + return UNHEALTHY_CONTAINERS.CONTAINER_STATE.in( + UnHealthyContainerStates.MISSING.toString(), + UnHealthyContainerStates.EMPTY_MISSING.toString(), + UnHealthyContainerStates.UNDER_REPLICATED.toString(), + UnHealthyContainerStates.OVER_REPLICATED.toString(), + UnHealthyContainerStates.MIS_REPLICATED.toString(), + UnHealthyContainerStates.NEGATIVE_SIZE.toString(), + UnHealthyContainerStates.REPLICA_MISMATCH.toString()); + } + /** * Returns previous in-state-since timestamps for tracked unhealthy states. * The key is a stable containerId + state tuple. diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/TestContainerHealthSchemaManager.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/TestContainerHealthSchemaManager.java new file mode 100644 index 00000000000..79eec1bbd9e --- /dev/null +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/TestContainerHealthSchemaManager.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.persistence; + +import static org.apache.ozone.recon.schema.generated.tables.UnhealthyContainersTable.UNHEALTHY_CONTAINERS; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager.UnhealthyContainerRecord; +import org.apache.ozone.recon.schema.ContainerSchemaDefinition; +import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; +import org.jooq.DSLContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Functional tests for {@link ContainerHealthSchemaManager} delete handling. + * + *

{@code deleteScmStatesForContainers} coalesces long contiguous container + * id runs into single range ({@code BETWEEN}) deletes while folding shorter + * runs and scattered ids into chunked {@code IN} deletes. These tests assert + * that both paths remove exactly the requested rows for contiguous, scattered, + * and mixed id distributions, and that non-SCM states are never removed.

+ */ +public class TestContainerHealthSchemaManager extends AbstractReconSqlDBTest { + + private ContainerSchemaDefinition schemaDefinition; + private ContainerHealthSchemaManager schemaManager; + + @BeforeEach + public void setUpSchemaManager() { + schemaDefinition = getSchemaDefinition(ContainerSchemaDefinition.class); + schemaManager = + new ContainerHealthSchemaManager(schemaDefinition, new OzoneConfiguration()); + } + + @Test + public void testDeleteContiguousRangeRemovesExactlyRequestedRows() { + // A contiguous run (>= MIN_RANGE_RUN_LENGTH) is removed via the range path. + int runLength = ContainerHealthSchemaManager.MIN_RANGE_RUN_LENGTH + 50; + insertRange(1, runLength, UnHealthyContainerStates.UNDER_REPLICATED); + // Survivors that are not part of the delete list. + insertIds(UnHealthyContainerStates.UNDER_REPLICATED, 5000L, 5002L, 5004L); + + List idsToDelete = contiguous(1, runLength); + idsToDelete.add(1L); // duplicate -> must be de-duplicated + Collections.shuffle(idsToDelete); // unsorted -> must be sorted internally + + schemaManager.batchDeleteSCMStatesForContainers(idsToDelete); + + assertEquals(Arrays.asList(5000L, 5002L, 5004L), remainingContainerIds()); + } + + @Test + public void testDeleteScatteredIdsRemovesExactlyRequestedRows() { + for (long id = 1; id <= 10; id++) { + insertIds(UnHealthyContainerStates.MISSING, id); + } + + // Delete only the odd ids, supplied out of order. + schemaManager.batchDeleteSCMStatesForContainers( + new ArrayList<>(Arrays.asList(9L, 1L, 5L, 7L, 3L))); + + assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), remainingContainerIds()); + } + + @Test + public void testDeleteMixedRunAndScatteredRemovesExactlyRequestedRows() { + int runLength = ContainerHealthSchemaManager.MIN_RANGE_RUN_LENGTH + 50; + insertRange(1, runLength, UnHealthyContainerStates.UNDER_REPLICATED); + insertIds(UnHealthyContainerStates.UNDER_REPLICATED, 9001L, 9003L, 9005L); + // Survivor not present in the delete list. + insertIds(UnHealthyContainerStates.UNDER_REPLICATED, 9004L); + + List idsToDelete = contiguous(1, runLength); + idsToDelete.add(9001L); + idsToDelete.add(9003L); + idsToDelete.add(9005L); + + schemaManager.batchDeleteSCMStatesForContainers(idsToDelete); + + assertEquals(Collections.singletonList(9004L), remainingContainerIds()); + } + + @Test + public void testDeletePreservesNonScmStateInsideRange() { + int runLength = ContainerHealthSchemaManager.MIN_RANGE_RUN_LENGTH + 100; + insertRange(1, runLength, UnHealthyContainerStates.UNDER_REPLICATED); + // ALL_REPLICAS_BAD is not an SCM-generated state, so it must survive even + // though container 50 falls inside the deleted [1, runLength] range. + insertIds(UnHealthyContainerStates.ALL_REPLICAS_BAD, 50L); + + schemaManager.batchDeleteSCMStatesForContainers(contiguous(1, runLength)); + + assertEquals(Collections.singletonList(50L), remainingContainerIds()); + assertEquals(0L, countByState(UnHealthyContainerStates.UNDER_REPLICATED)); + assertEquals(1L, countByState(UnHealthyContainerStates.ALL_REPLICAS_BAD)); + } + + @Test + public void testDeleteShortRunsAndPointsCombinedInOneStatement() { + // Short contiguous runs of length >= MIN_RANGE_RUN_LENGTH become BETWEEN + // terms; runs of 1-2 ids stay as IN points. All are combined into a single + // bounded predicate. Inserted rows cover both deleted and surviving ids. + insertRange(1, 5, UnHealthyContainerStates.MISSING); // range term + insertRange(20, 24, UnHealthyContainerStates.MISSING); // range term + insertIds(UnHealthyContainerStates.MISSING, 40L, 41L); // 2 points (kept short) + insertIds(UnHealthyContainerStates.MISSING, 60L); // single point + // Survivors interleaved with the deleted ids. + insertIds(UnHealthyContainerStates.MISSING, 10L, 30L, 42L, 61L); + + List idsToDelete = new ArrayList<>(); + idsToDelete.addAll(contiguous(1, 5)); + idsToDelete.addAll(contiguous(20, 24)); + idsToDelete.addAll(Arrays.asList(40L, 41L, 60L)); + Collections.shuffle(idsToDelete); + + schemaManager.batchDeleteSCMStatesForContainers(idsToDelete); + + assertEquals(Arrays.asList(10L, 30L, 42L, 61L), remainingContainerIds()); + } + + @Test + public void testDeleteScatteredIdsSpanningMultipleStatements() { + // More scattered ids than fit in a single predicate, forcing the operand + // budget to flush across multiple DELETE statements. + int count = ContainerHealthSchemaManager.MAX_PREDICATE_OPERANDS + 50; + List idsToDelete = new ArrayList<>(count); + for (int k = 0; k < count; k++) { + // Even ids only -> every id is isolated (no contiguous runs). + long id = 2L * (k + 1); + insertIds(UnHealthyContainerStates.UNDER_REPLICATED, id); + idsToDelete.add(id); + } + // A survivor that must remain untouched. + insertIds(UnHealthyContainerStates.UNDER_REPLICATED, 1L); + + schemaManager.batchDeleteSCMStatesForContainers(idsToDelete); + + assertEquals(Collections.singletonList(1L), remainingContainerIds()); + } + + private void insertRange(long startInclusive, long endInclusive, + UnHealthyContainerStates state) { + List records = new ArrayList<>(); + for (long id = startInclusive; id <= endInclusive; id++) { + records.add(record(id, state)); + } + schemaManager.insertUnhealthyContainerRecords(records); + } + + private void insertIds(UnHealthyContainerStates state, long... ids) { + List records = new ArrayList<>(); + for (long id : ids) { + records.add(record(id, state)); + } + schemaManager.insertUnhealthyContainerRecords(records); + } + + private UnhealthyContainerRecord record(long id, UnHealthyContainerStates state) { + return new UnhealthyContainerRecord(id, state.toString(), 1L, 3, 2, 1, "test"); + } + + private List contiguous(long startInclusive, long endInclusive) { + List ids = new ArrayList<>(); + for (long id = startInclusive; id <= endInclusive; id++) { + ids.add(id); + } + return ids; + } + + private List remainingContainerIds() { + DSLContext dsl = schemaDefinition.getDSLContext(); + return dsl.selectDistinct(UNHEALTHY_CONTAINERS.CONTAINER_ID) + .from(UNHEALTHY_CONTAINERS) + .orderBy(UNHEALTHY_CONTAINERS.CONTAINER_ID.asc()) + .fetch(UNHEALTHY_CONTAINERS.CONTAINER_ID); + } + + private long countByState(UnHealthyContainerStates state) { + DSLContext dsl = schemaDefinition.getDSLContext(); + return dsl.fetchCount(dsl.selectFrom(UNHEALTHY_CONTAINERS) + .where(UNHEALTHY_CONTAINERS.CONTAINER_STATE.eq(state.toString()))); + } +}