Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,40 @@ public class ContainerHealthSchemaManager {
*/
static final int MAX_IN_CLAUSE_CHUNK_SIZE = 1_000;

/**
* Minimum length of a contiguous container-id run that is deleted with a
* range ({@code BETWEEN}) term instead of being expanded into individual
* {@code IN} values.
*
* <p>Derby compiles every statement to JVM bytecode whose size grows with the
* number of id literals in the predicate. A contiguous run of {@code L} ids
* costs {@code L} operands when listed in an {@code IN} clause, but only
* {@link #RANGE_OPERAND_COST} (the two endpoints) when written as
* {@code BETWEEN lo AND hi}, independent of {@code L}. The range form is
* therefore strictly smaller once {@code L > RANGE_OPERAND_COST}, i.e. for
* runs of 3 or more ids; runs of 1–2 ids are left as {@code IN} values since
* the range form would save nothing. This lets the optimization benefit
* realistic, fragmented unhealthy-container sets rather than only rare
* 1,000+ contiguous ranges.</p>
*/
static final int MIN_RANGE_RUN_LENGTH = 3;

/** Estimated predicate operand cost of a single {@code BETWEEN} range term. */
private static final int RANGE_OPERAND_COST = 2;

/**
* Upper bound on the estimated number of id operands packed into one combined
* DELETE predicate before it is flushed.
*
* <p>Range terms count as {@link #RANGE_OPERAND_COST} operands and scattered
* ids as one operand each. The cap reuses {@link #MAX_IN_CLAUSE_CHUNK_SIZE} —
* the id count already proven to keep a pure {@code IN} delete well under
* Derby's 64 KB per-method bytecode limit — so a mixed range/{@code IN}
* predicate of the same operand budget stays within the same safe
* envelope.</p>
*/
static final int MAX_PREDICATE_OPERANDS = MAX_IN_CLAUSE_CHUNK_SIZE;

private final ContainerSchemaDefinition containerSchemaDefinition;
private final int unhealthyContainersFetchSize;

Expand Down Expand Up @@ -205,28 +239,101 @@ public void replaceUnhealthyContainerRecordsAtomically(

private int deleteScmStatesForContainers(DSLContext dslContext,
List<Long> containerIds) {

// Sort and de-duplicate so contiguous runs are detectable regardless of the
// order in which the caller supplied the ids.
List<Long> sortedIds = containerIds.stream()
.distinct()
.sorted()
.collect(Collectors.toList());

int totalDeleted = 0;

for (int from = 0; from < containerIds.size(); from += MAX_IN_CLAUSE_CHUNK_SIZE) {
int to = Math.min(from + MAX_IN_CLAUSE_CHUNK_SIZE, containerIds.size());
List<Long> chunk = containerIds.subList(from, to);

int deleted = dslContext.deleteFrom(UNHEALTHY_CONTAINERS)
.where(UNHEALTHY_CONTAINERS.CONTAINER_ID.in(chunk))
.and(UNHEALTHY_CONTAINERS.CONTAINER_STATE.in(
UnHealthyContainerStates.MISSING.toString(),
UnHealthyContainerStates.EMPTY_MISSING.toString(),
UnHealthyContainerStates.UNDER_REPLICATED.toString(),
UnHealthyContainerStates.OVER_REPLICATED.toString(),
UnHealthyContainerStates.MIS_REPLICATED.toString(),
UnHealthyContainerStates.NEGATIVE_SIZE.toString(),
UnHealthyContainerStates.REPLICA_MISMATCH.toString()))
.execute();
totalDeleted += deleted;
// Build each DELETE predicate as the combination of compact range terms (one
// BETWEEN per contiguous run of at least MIN_RANGE_RUN_LENGTH ids) and a
// single IN list for the remaining scattered ids. Terms are accumulated
// until the estimated operand count reaches MAX_PREDICATE_OPERANDS, then the
// statement is flushed so the generated Derby bytecode stays within budget.
List<long[]> ranges = new ArrayList<>();
List<Long> points = new ArrayList<>();
int operandsUsed = 0;

int i = 0;
while (i < sortedIds.size()) {
int runStart = i;
while (i + 1 < sortedIds.size()
&& sortedIds.get(i + 1) == sortedIds.get(i) + 1) {
i++;
}
int runLength = i - runStart + 1;

// A contiguous run is cheaper as a BETWEEN (RANGE_OPERAND_COST operands,
// regardless of length) than as IN values (one operand each) once it spans
// at least MIN_RANGE_RUN_LENGTH ids; shorter runs stay as IN points.
boolean asRange = runLength >= MIN_RANGE_RUN_LENGTH;
int termOperands = asRange ? RANGE_OPERAND_COST : runLength;

if (operandsUsed > 0
&& operandsUsed + termOperands > MAX_PREDICATE_OPERANDS) {
totalDeleted += executeCombinedDelete(dslContext, ranges, points);
ranges.clear();
points.clear();
operandsUsed = 0;
}

if (asRange) {
ranges.add(new long[] {sortedIds.get(runStart), sortedIds.get(i)});
} else {
for (int j = runStart; j <= i; j++) {
points.add(sortedIds.get(j));
}
}
operandsUsed += termOperands;
i++;
}

if (operandsUsed > 0) {
totalDeleted += executeCombinedDelete(dslContext, ranges, points);
}
return totalDeleted;
}

/**
* Executes a single DELETE whose container-id predicate is the combination of the
* given contiguous ranges (as {@code BETWEEN} terms) and scattered ids (as a
* single {@code IN} list), restricted to SCM-generated states.
*/
private int executeCombinedDelete(DSLContext dslContext,
List<long[]> ranges, List<Long> points) {
Condition idCondition = null;
if (!points.isEmpty()) {
idCondition = UNHEALTHY_CONTAINERS.CONTAINER_ID.in(points);
}
for (long[] range : ranges) {
Condition rangeCondition =
UNHEALTHY_CONTAINERS.CONTAINER_ID.between(range[0], range[1]);
idCondition = (idCondition == null)
? rangeCondition : idCondition.or(rangeCondition);
}
if (idCondition == null) {
return 0;
}
return dslContext.deleteFrom(UNHEALTHY_CONTAINERS)
.where(idCondition.and(scmGeneratedStatesCondition()))
.execute();
}

private Condition scmGeneratedStatesCondition() {
return UNHEALTHY_CONTAINERS.CONTAINER_STATE.in(
UnHealthyContainerStates.MISSING.toString(),
UnHealthyContainerStates.EMPTY_MISSING.toString(),
UnHealthyContainerStates.UNDER_REPLICATED.toString(),
UnHealthyContainerStates.OVER_REPLICATED.toString(),
UnHealthyContainerStates.MIS_REPLICATED.toString(),
UnHealthyContainerStates.NEGATIVE_SIZE.toString(),
UnHealthyContainerStates.REPLICA_MISMATCH.toString());
}

/**
* Returns previous in-state-since timestamps for tracked unhealthy states.
* The key is a stable containerId + state tuple.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.recon.persistence;

import static org.apache.ozone.recon.schema.generated.tables.UnhealthyContainersTable.UNHEALTHY_CONTAINERS;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager.UnhealthyContainerRecord;
import org.apache.ozone.recon.schema.ContainerSchemaDefinition;
import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates;
import org.jooq.DSLContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/**
* Functional tests for {@link ContainerHealthSchemaManager} delete handling.
*
* <p>{@code deleteScmStatesForContainers} coalesces long contiguous container
* id runs into single range ({@code BETWEEN}) deletes while folding shorter
* runs and scattered ids into chunked {@code IN} deletes. These tests assert
* that both paths remove exactly the requested rows for contiguous, scattered,
* and mixed id distributions, and that non-SCM states are never removed.</p>
*/
public class TestContainerHealthSchemaManager extends AbstractReconSqlDBTest {

private ContainerSchemaDefinition schemaDefinition;
private ContainerHealthSchemaManager schemaManager;

@BeforeEach
public void setUpSchemaManager() {
schemaDefinition = getSchemaDefinition(ContainerSchemaDefinition.class);
schemaManager =
new ContainerHealthSchemaManager(schemaDefinition, new OzoneConfiguration());
}

@Test
public void testDeleteContiguousRangeRemovesExactlyRequestedRows() {
// A contiguous run (>= MIN_RANGE_RUN_LENGTH) is removed via the range path.
int runLength = ContainerHealthSchemaManager.MIN_RANGE_RUN_LENGTH + 50;
insertRange(1, runLength, UnHealthyContainerStates.UNDER_REPLICATED);
// Survivors that are not part of the delete list.
insertIds(UnHealthyContainerStates.UNDER_REPLICATED, 5000L, 5002L, 5004L);

List<Long> idsToDelete = contiguous(1, runLength);
idsToDelete.add(1L); // duplicate -> must be de-duplicated
Collections.shuffle(idsToDelete); // unsorted -> must be sorted internally

schemaManager.batchDeleteSCMStatesForContainers(idsToDelete);

assertEquals(Arrays.asList(5000L, 5002L, 5004L), remainingContainerIds());
}

@Test
public void testDeleteScatteredIdsRemovesExactlyRequestedRows() {
for (long id = 1; id <= 10; id++) {
insertIds(UnHealthyContainerStates.MISSING, id);
}

// Delete only the odd ids, supplied out of order.
schemaManager.batchDeleteSCMStatesForContainers(
new ArrayList<>(Arrays.asList(9L, 1L, 5L, 7L, 3L)));

assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), remainingContainerIds());
}

@Test
public void testDeleteMixedRunAndScatteredRemovesExactlyRequestedRows() {
int runLength = ContainerHealthSchemaManager.MIN_RANGE_RUN_LENGTH + 50;
insertRange(1, runLength, UnHealthyContainerStates.UNDER_REPLICATED);
insertIds(UnHealthyContainerStates.UNDER_REPLICATED, 9001L, 9003L, 9005L);
// Survivor not present in the delete list.
insertIds(UnHealthyContainerStates.UNDER_REPLICATED, 9004L);

List<Long> idsToDelete = contiguous(1, runLength);
idsToDelete.add(9001L);
idsToDelete.add(9003L);
idsToDelete.add(9005L);

schemaManager.batchDeleteSCMStatesForContainers(idsToDelete);

assertEquals(Collections.singletonList(9004L), remainingContainerIds());
}

@Test
public void testDeletePreservesNonScmStateInsideRange() {
int runLength = ContainerHealthSchemaManager.MIN_RANGE_RUN_LENGTH + 100;
insertRange(1, runLength, UnHealthyContainerStates.UNDER_REPLICATED);
// ALL_REPLICAS_BAD is not an SCM-generated state, so it must survive even
// though container 50 falls inside the deleted [1, runLength] range.
insertIds(UnHealthyContainerStates.ALL_REPLICAS_BAD, 50L);

schemaManager.batchDeleteSCMStatesForContainers(contiguous(1, runLength));

assertEquals(Collections.singletonList(50L), remainingContainerIds());
assertEquals(0L, countByState(UnHealthyContainerStates.UNDER_REPLICATED));
assertEquals(1L, countByState(UnHealthyContainerStates.ALL_REPLICAS_BAD));
}

@Test
public void testDeleteShortRunsAndPointsCombinedInOneStatement() {
// Short contiguous runs of length >= MIN_RANGE_RUN_LENGTH become BETWEEN
// terms; runs of 1-2 ids stay as IN points. All are combined into a single
// bounded predicate. Inserted rows cover both deleted and surviving ids.
insertRange(1, 5, UnHealthyContainerStates.MISSING); // range term
insertRange(20, 24, UnHealthyContainerStates.MISSING); // range term
insertIds(UnHealthyContainerStates.MISSING, 40L, 41L); // 2 points (kept short)
insertIds(UnHealthyContainerStates.MISSING, 60L); // single point
// Survivors interleaved with the deleted ids.
insertIds(UnHealthyContainerStates.MISSING, 10L, 30L, 42L, 61L);

List<Long> idsToDelete = new ArrayList<>();
idsToDelete.addAll(contiguous(1, 5));
idsToDelete.addAll(contiguous(20, 24));
idsToDelete.addAll(Arrays.asList(40L, 41L, 60L));
Collections.shuffle(idsToDelete);

schemaManager.batchDeleteSCMStatesForContainers(idsToDelete);

assertEquals(Arrays.asList(10L, 30L, 42L, 61L), remainingContainerIds());
}

@Test
public void testDeleteScatteredIdsSpanningMultipleStatements() {
// More scattered ids than fit in a single predicate, forcing the operand
// budget to flush across multiple DELETE statements.
int count = ContainerHealthSchemaManager.MAX_PREDICATE_OPERANDS + 50;
List<Long> idsToDelete = new ArrayList<>(count);
for (int k = 0; k < count; k++) {
// Even ids only -> every id is isolated (no contiguous runs).
long id = 2L * (k + 1);
insertIds(UnHealthyContainerStates.UNDER_REPLICATED, id);
idsToDelete.add(id);
}
// A survivor that must remain untouched.
insertIds(UnHealthyContainerStates.UNDER_REPLICATED, 1L);

schemaManager.batchDeleteSCMStatesForContainers(idsToDelete);

assertEquals(Collections.singletonList(1L), remainingContainerIds());
}

private void insertRange(long startInclusive, long endInclusive,
UnHealthyContainerStates state) {
List<UnhealthyContainerRecord> records = new ArrayList<>();
for (long id = startInclusive; id <= endInclusive; id++) {
records.add(record(id, state));
}
schemaManager.insertUnhealthyContainerRecords(records);
}

private void insertIds(UnHealthyContainerStates state, long... ids) {
List<UnhealthyContainerRecord> records = new ArrayList<>();
for (long id : ids) {
records.add(record(id, state));
}
schemaManager.insertUnhealthyContainerRecords(records);
}

private UnhealthyContainerRecord record(long id, UnHealthyContainerStates state) {
return new UnhealthyContainerRecord(id, state.toString(), 1L, 3, 2, 1, "test");
}

private List<Long> contiguous(long startInclusive, long endInclusive) {
List<Long> ids = new ArrayList<>();
for (long id = startInclusive; id <= endInclusive; id++) {
ids.add(id);
}
return ids;
}

private List<Long> remainingContainerIds() {
DSLContext dsl = schemaDefinition.getDSLContext();
return dsl.selectDistinct(UNHEALTHY_CONTAINERS.CONTAINER_ID)
.from(UNHEALTHY_CONTAINERS)
.orderBy(UNHEALTHY_CONTAINERS.CONTAINER_ID.asc())
.fetch(UNHEALTHY_CONTAINERS.CONTAINER_ID);
}

private long countByState(UnHealthyContainerStates state) {
DSLContext dsl = schemaDefinition.getDSLContext();
return dsl.fetchCount(dsl.selectFrom(UNHEALTHY_CONTAINERS)
.where(UNHEALTHY_CONTAINERS.CONTAINER_STATE.eq(state.toString())));
}
}