Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,24 @@ public class ContainerHealthSchemaManager {
*/
static final int MAX_IN_CLAUSE_CHUNK_SIZE = 1_000;

/**
* Minimum length of a contiguous container-id run that justifies deleting it
* with a single range ({@code BETWEEN}) statement instead of folding the ids
* into a chunked {@code IN} delete.
*
* <p>A {@code BETWEEN} predicate is constant size and removes an entire
* contiguous range in one statement, whereas an {@code IN} delete is capped
* at {@link #MAX_IN_CLAUSE_CHUNK_SIZE} ids per statement to stay under
* Derby's 64 KB bytecode limit. Pulling a run out into its own
* {@code BETWEEN} only lowers the total statement count once the run is at
* least one IN-chunk long, so shorter runs and scattered ids stay in the IN
* batches. Real container-id lists are usually dense ranges broken up by gaps
* from deleted containers, so this keeps the common case to a handful of
* statements while never doing worse than plain IN chunking for sparse
* ids.</p>
*/
static final int MIN_RANGE_DELETE_RUN = MAX_IN_CLAUSE_CHUNK_SIZE;

private final ContainerSchemaDefinition containerSchemaDefinition;
private final int unhealthyContainersFetchSize;

Expand Down Expand Up @@ -205,28 +223,76 @@ public void replaceUnhealthyContainerRecordsAtomically(

private int deleteScmStatesForContainers(DSLContext dslContext,
List<Long> containerIds) {

// Sort and de-duplicate so contiguous runs are detectable regardless of the
// order in which the caller supplied the ids.
List<Long> sortedIds = containerIds.stream()
.distinct()
.sorted()
.collect(Collectors.toList());

int totalDeleted = 0;
List<Long> inClauseBatch = new ArrayList<>(MAX_IN_CLAUSE_CHUNK_SIZE);

int i = 0;
while (i < sortedIds.size()) {
int runStart = i;
while (i + 1 < sortedIds.size()
&& sortedIds.get(i + 1) == sortedIds.get(i) + 1) {
i++;
}

for (int from = 0; from < containerIds.size(); from += MAX_IN_CLAUSE_CHUNK_SIZE) {
int to = Math.min(from + MAX_IN_CLAUSE_CHUNK_SIZE, containerIds.size());
List<Long> chunk = containerIds.subList(from, to);

int deleted = dslContext.deleteFrom(UNHEALTHY_CONTAINERS)
.where(UNHEALTHY_CONTAINERS.CONTAINER_ID.in(chunk))
.and(UNHEALTHY_CONTAINERS.CONTAINER_STATE.in(
UnHealthyContainerStates.MISSING.toString(),
UnHealthyContainerStates.EMPTY_MISSING.toString(),
UnHealthyContainerStates.UNDER_REPLICATED.toString(),
UnHealthyContainerStates.OVER_REPLICATED.toString(),
UnHealthyContainerStates.MIS_REPLICATED.toString(),
UnHealthyContainerStates.NEGATIVE_SIZE.toString(),
UnHealthyContainerStates.REPLICA_MISMATCH.toString()))
.execute();
totalDeleted += deleted;
if (i - runStart + 1 >= MIN_RANGE_DELETE_RUN) {
// Long contiguous run: a single constant-size BETWEEN removes the whole
// range without straining Derby's per-statement bytecode budget.
totalDeleted += deleteScmStatesByRange(dslContext,
sortedIds.get(runStart), sortedIds.get(i));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the suggestion to include multiple BETWEEN ranges in the same delete statement?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to keep the DELETE statements with BETWEEN and IN clause separate to issue 2 DELETE statements. So in the worst case it falls back to the same IN chunking the code used before this change, so it's never worse than the old baseline of ceil(n / 1000). In best case it will issue just 1 statement for a fully contiguous block.

As the comments says above combining both might overflow Derby's per-statement bytecode limit (Bytecode per method).

If you prefer, I can combine it both into one. Thanks!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using BETWEEN instead of IN for contiguous range of IDs is a good idea.

But I don't think the current change has much effect on production behavior, a range of 1000+ "unhealthy" containers in Recon is not real life scenario. So it adds complexity without benefit.

We should apply this logic for shorter ranges, without executing too many small statements. For that, we need to find the threshold (number of container IDs) over which BETWEEN n AND n+k OR yields shorter code than the corresponding IN clause. Any number of IDs beyond that is a win.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me apply that change here, thanks.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using BETWEEN instead of IN for contiguous range of IDs is a good idea.

But I don't think the current change has much effect on production behavior, a range of 1000+ "unhealthy" containers in Recon is not real life scenario. So it adds complexity without benefit.

We should apply this logic for shorter ranges, without executing too many small statements. For that, we need to find the threshold (number of container IDs) over which BETWEEN n AND n+k OR yields shorter code than the corresponding IN clause. Any number of IDs beyond that is a win.

@adoroszlai I have applied the changes, please take a look. Thanks!

} else {
// Short run or isolated id: fold into IN batches that stay within the
// bytecode-safe chunk size.
for (int j = runStart; j <= i; j++) {
inClauseBatch.add(sortedIds.get(j));
if (inClauseBatch.size() == MAX_IN_CLAUSE_CHUNK_SIZE) {
totalDeleted += deleteScmStatesByIds(dslContext, inClauseBatch);
inClauseBatch.clear();
}
}
}
i++;
}

if (!inClauseBatch.isEmpty()) {
totalDeleted += deleteScmStatesByIds(dslContext, inClauseBatch);
}
return totalDeleted;
}

private int deleteScmStatesByRange(DSLContext dslContext,
long startContainerId, long endContainerId) {
return dslContext.deleteFrom(UNHEALTHY_CONTAINERS)
.where(UNHEALTHY_CONTAINERS.CONTAINER_ID.between(startContainerId, endContainerId))
.and(scmGeneratedStatesCondition())
.execute();
}

private int deleteScmStatesByIds(DSLContext dslContext, List<Long> containerIds) {
return dslContext.deleteFrom(UNHEALTHY_CONTAINERS)
.where(UNHEALTHY_CONTAINERS.CONTAINER_ID.in(containerIds))
.and(scmGeneratedStatesCondition())
.execute();
}

private Condition scmGeneratedStatesCondition() {
return UNHEALTHY_CONTAINERS.CONTAINER_STATE.in(
UnHealthyContainerStates.MISSING.toString(),
UnHealthyContainerStates.EMPTY_MISSING.toString(),
UnHealthyContainerStates.UNDER_REPLICATED.toString(),
UnHealthyContainerStates.OVER_REPLICATED.toString(),
UnHealthyContainerStates.MIS_REPLICATED.toString(),
UnHealthyContainerStates.NEGATIVE_SIZE.toString(),
UnHealthyContainerStates.REPLICA_MISMATCH.toString());
}

/**
* Returns previous in-state-since timestamps for tracked unhealthy states.
* The key is a stable containerId + state tuple.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.recon.persistence;

import static org.apache.ozone.recon.schema.generated.tables.UnhealthyContainersTable.UNHEALTHY_CONTAINERS;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager.UnhealthyContainerRecord;
import org.apache.ozone.recon.schema.ContainerSchemaDefinition;
import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates;
import org.jooq.DSLContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/**
* Functional tests for {@link ContainerHealthSchemaManager} delete handling.
*
* <p>{@code deleteScmStatesForContainers} coalesces long contiguous container
* id runs into single range ({@code BETWEEN}) deletes while folding shorter
* runs and scattered ids into chunked {@code IN} deletes. These tests assert
* that both paths remove exactly the requested rows for contiguous, scattered,
* and mixed id distributions, and that non-SCM states are never removed.</p>
*/
public class TestContainerHealthSchemaManager extends AbstractReconSqlDBTest {

private ContainerSchemaDefinition schemaDefinition;
private ContainerHealthSchemaManager schemaManager;

@BeforeEach
public void setUpSchemaManager() {
schemaDefinition = getSchemaDefinition(ContainerSchemaDefinition.class);
schemaManager =
new ContainerHealthSchemaManager(schemaDefinition, new OzoneConfiguration());
}

@Test
public void testDeleteContiguousRangeRemovesExactlyRequestedRows() {
// A run long enough to be removed through the range-delete path.
int runLength = ContainerHealthSchemaManager.MIN_RANGE_DELETE_RUN + 200;
insertRange(1, runLength, UnHealthyContainerStates.UNDER_REPLICATED);
// Survivors that are not part of the delete list.
insertIds(UnHealthyContainerStates.UNDER_REPLICATED, 5000L, 5002L, 5004L);

List<Long> idsToDelete = contiguous(1, runLength);
idsToDelete.add(1L); // duplicate -> must be de-duplicated
Collections.shuffle(idsToDelete); // unsorted -> must be sorted internally

schemaManager.batchDeleteSCMStatesForContainers(idsToDelete);

assertEquals(Arrays.asList(5000L, 5002L, 5004L), remainingContainerIds());
}

@Test
public void testDeleteScatteredIdsRemovesExactlyRequestedRows() {
for (long id = 1; id <= 10; id++) {
insertIds(UnHealthyContainerStates.MISSING, id);
}

// Delete only the odd ids, supplied out of order.
schemaManager.batchDeleteSCMStatesForContainers(
new ArrayList<>(Arrays.asList(9L, 1L, 5L, 7L, 3L)));

assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), remainingContainerIds());
}

@Test
public void testDeleteMixedRunAndScatteredRemovesExactlyRequestedRows() {
int runLength = ContainerHealthSchemaManager.MIN_RANGE_DELETE_RUN + 100;
insertRange(1, runLength, UnHealthyContainerStates.UNDER_REPLICATED);
insertIds(UnHealthyContainerStates.UNDER_REPLICATED, 9001L, 9003L, 9005L);
// Survivor not present in the delete list.
insertIds(UnHealthyContainerStates.UNDER_REPLICATED, 9004L);

List<Long> idsToDelete = contiguous(1, runLength);
idsToDelete.add(9001L);
idsToDelete.add(9003L);
idsToDelete.add(9005L);

schemaManager.batchDeleteSCMStatesForContainers(idsToDelete);

assertEquals(Collections.singletonList(9004L), remainingContainerIds());
}

@Test
public void testDeletePreservesNonScmStateInsideRange() {
int runLength = ContainerHealthSchemaManager.MIN_RANGE_DELETE_RUN + 100;
insertRange(1, runLength, UnHealthyContainerStates.UNDER_REPLICATED);
// ALL_REPLICAS_BAD is not an SCM-generated state, so it must survive even
// though container 50 falls inside the deleted [1, runLength] range.
insertIds(UnHealthyContainerStates.ALL_REPLICAS_BAD, 50L);

schemaManager.batchDeleteSCMStatesForContainers(contiguous(1, runLength));

assertEquals(Collections.singletonList(50L), remainingContainerIds());
assertEquals(0L, countByState(UnHealthyContainerStates.UNDER_REPLICATED));
assertEquals(1L, countByState(UnHealthyContainerStates.ALL_REPLICAS_BAD));
}

private void insertRange(long startInclusive, long endInclusive,
UnHealthyContainerStates state) {
List<UnhealthyContainerRecord> records = new ArrayList<>();
for (long id = startInclusive; id <= endInclusive; id++) {
records.add(record(id, state));
}
schemaManager.insertUnhealthyContainerRecords(records);
}

private void insertIds(UnHealthyContainerStates state, long... ids) {
List<UnhealthyContainerRecord> records = new ArrayList<>();
for (long id : ids) {
records.add(record(id, state));
}
schemaManager.insertUnhealthyContainerRecords(records);
}

private UnhealthyContainerRecord record(long id, UnHealthyContainerStates state) {
return new UnhealthyContainerRecord(id, state.toString(), 1L, 3, 2, 1, "test");
}

private List<Long> contiguous(long startInclusive, long endInclusive) {
List<Long> ids = new ArrayList<>();
for (long id = startInclusive; id <= endInclusive; id++) {
ids.add(id);
}
return ids;
}

private List<Long> remainingContainerIds() {
DSLContext dsl = schemaDefinition.getDSLContext();
return dsl.selectDistinct(UNHEALTHY_CONTAINERS.CONTAINER_ID)
.from(UNHEALTHY_CONTAINERS)
.orderBy(UNHEALTHY_CONTAINERS.CONTAINER_ID.asc())
.fetch(UNHEALTHY_CONTAINERS.CONTAINER_ID);
}

private long countByState(UnHealthyContainerStates state) {
DSLContext dsl = schemaDefinition.getDSLContext();
return dsl.fetchCount(dsl.selectFrom(UNHEALTHY_CONTAINERS)
.where(UNHEALTHY_CONTAINERS.CONTAINER_STATE.eq(state.toString())));
}
}