From 9c683c5b10244525b8110c55c368fac548734b65 Mon Sep 17 00:00:00 2001 From: Xi Chen <32928346+xichen01@users.noreply.github.com> Date: Fri, 20 Sep 2024 11:13:47 +0800 Subject: [PATCH] HDDS-15409. SCM Pipeline Add SupportedStorageTier Field --- .../hadoop/hdds/client/StorageTier.java | 7 + .../hadoop/hdds/client/StorageTierUtil.java | 43 +++-- .../hadoop/hdds/scm/pipeline/Pipeline.java | 53 ++++++ .../hdds/client/StorageTierUtilTest.java | 151 ++++++++++++++++++ .../common/impl/StorageLocationReport.java | 25 --- .../src/main/proto/hdds.proto | 1 + .../hadoop/hdds/scm/node/NodeUtils.java | 72 +++++++++ .../hadoop/hdds/scm/node/SCMNodeManager.java | 45 ++++++ .../hdds/scm/pipeline/ECPipelineProvider.java | 14 +- .../hdds/scm/pipeline/PipelineProvider.java | 2 +- .../scm/pipeline/PipelineReportHandler.java | 29 ++++ .../scm/pipeline/RatisPipelineProvider.java | 21 ++- .../scm/pipeline/SimplePipelineProvider.java | 12 +- .../hdds/scm/container/MockNodeManager.java | 16 +- .../pipeline/MockRatisPipelineProvider.java | 7 + .../scm/pipeline/TestPipelineManagerImpl.java | 86 +++++++++- .../pipeline/TestRatisPipelineProvider.java | 32 ++-- .../pipeline/TestSimplePipelineProvider.java | 10 +- .../hadoop/hdds/protocol/StorageTierTest.java | 3 +- .../hdds/scm/pipeline/TestSCMRestart.java | 4 + 20 files changed, 558 insertions(+), 75 deletions(-) create mode 100644 hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/client/StorageTierUtilTest.java create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeUtils.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/StorageTier.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/StorageTier.java index 1e5c40dfe64b..24b783f1b14b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/StorageTier.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/StorageTier.java @@ -104,6 +104,13 @@ public boolean isUniform() { return isUniform; } + public StorageType getUniformStorageType() { + if (!isUniform()) { + throw new IllegalArgumentException("Uniform storage type is not supported"); + } + return storageTypes.get(0); + } + /** * Maps a StorageTier to its corresponding StorageType based on replication type. * diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/StorageTierUtil.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/StorageTierUtil.java index d68c71a31da0..f7ff2afe2b73 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/StorageTierUtil.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/StorageTierUtil.java @@ -17,7 +17,9 @@ package org.apache.hadoop.hdds.client; +import java.util.ArrayList; import java.util.List; +import java.util.Set; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdds.scm.exceptions.SCMException; @@ -45,22 +47,31 @@ public static void validateNotEmpty(StorageTier storageTier) throws SCMException } } - /** - * Returns the StorageType for uniform StorageTier. - * - * @param storageTier the StorageTier to get StorageType from - * @return The uniform StorageTier corresponding StorageType - * @throws SCMException if the StorageTier is non-uniform or the EMPTY StorageTier - */ - public static StorageType getStorageTypeForUniformStorageTier(StorageTier storageTier, ReplicationConfig config) - throws SCMException { - validateNotEmpty(storageTier); - List storageTypes = storageTier.getStorageTypes(config.getRequiredNodes()); - if (storageTier.isUniform()) { - return storageTypes.get(0); - } else { - throw new SCMException("Unsupported non-uniform storage tier " + storageTier, - SCMException.ResultCodes.UNSUPPORTED_NON_UNIFORM_STORAGE_TIER); + public static List findSupportedStorageTiers( + List> dnStorageTypes) { + List supportedStorageTiers = new ArrayList<>(); + if (dnStorageTypes.isEmpty()) { + return supportedStorageTiers; + } + // We only support uniform storage tiers currently + for (StorageTier storageTier : StorageTier.values()) { + if (storageTier.equals(StorageTier.EMPTY)) { + continue; + } + if (!storageTier.isUniform()) { + throw new UnsupportedOperationException(storageTier + " is not a uniform storage tier"); + } + boolean supportedTier = true; + for (Set dnStorageType : dnStorageTypes) { + if (!dnStorageType.contains(storageTier.getUniformStorageType())) { + supportedTier = false; + break; + } + } + if (supportedTier) { + supportedStorageTiers.add(storageTier); + } } + return supportedStorageTiers; } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index d168fdc5a4d7..4f0df43014b3 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -33,6 +33,8 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -42,6 +44,7 @@ import org.apache.hadoop.hdds.client.ReplicatedReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; +import org.apache.hadoop.hdds.client.StorageTier; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -86,6 +89,9 @@ public final class Pipeline { private Instant creationTimestamp; // suggested leader id with high priority private final DatanodeID suggestedLeaderId; + private List supportedStorageTier; + + private final ReadWriteLock lock; private final Instant stateEnterTime; @@ -114,6 +120,8 @@ private Pipeline(Builder b) { replicaIndexes = b.replicaIndexes; creationTimestamp = b.creationTimestamp != null ? b.creationTimestamp : Instant.now(); stateEnterTime = Instant.now(); + supportedStorageTier = b.supportedStorageTier; + lock = new ReentrantReadWriteLock(); } public static Codec getCodec() { @@ -170,6 +178,32 @@ public DatanodeID getSuggestedLeaderId() { return suggestedLeaderId; } + /** + * Return the Pipeline supported StorageTier. + * + * @return Supported StorageTier + */ + public List getSupportedStorageTier() { + lock.readLock().lock(); + try { + return supportedStorageTier; + } finally { + lock.readLock().unlock(); + } + } + + /** + * Set the storageTier supported by the pipeline. + */ + public void setSupportedStorageTier(List supportedStorageTier) { + lock.writeLock().lock(); + try { + this.supportedStorageTier = supportedStorageTier; + } finally { + lock.writeLock().unlock(); + } + } + /** * Set the creation timestamp. Only for protobuf now. */ @@ -399,6 +433,12 @@ public HddsProtos.Pipeline getProtobufMessage(int clientVersion, Set supportedStorageTier = new ArrayList<>(); + for (HddsProtos.StorageTierProto storageTierProto : pipeline.getSupportedStorageTierList()) { + supportedStorageTier.add(StorageTier.fromProto(storageTierProto)); + } final ReplicationConfig config = ReplicationConfig .fromProto(pipeline.getType(), pipeline.getFactor(), @@ -501,6 +545,7 @@ public static Builder toBuilder(HddsProtos.Pipeline pipeline) { .setLeaderId(leaderId) .setSuggestedLeaderId(suggestedLeaderId) .setNodeOrder(pipeline.getMemberOrdersList()) + .setSupportedStorageTier(supportedStorageTier) .setCreateTimestamp(pipeline.getCreationTimeStamp()); } @@ -548,6 +593,7 @@ public String toString() { b.append(']') .append(", ReplicationConfig: ").append(replicationConfig) .append(", State:").append(getPipelineState()) + .append(", SupportStorageTier:").append(getSupportedStorageTier()) .append(", leaderId:").append(leaderId != null ? leaderId.toString() : "") .append(", CreationTimestamp").append(getCreationTimestamp().atZone(ZoneId.systemDefault())) .append('}'); @@ -572,6 +618,7 @@ public static final class Builder { private Instant creationTimestamp = null; private DatanodeID suggestedLeaderId = null; private Map replicaIndexes = ImmutableMap.of(); + private List supportedStorageTier; private Builder() { } @@ -594,6 +641,7 @@ private Builder(Pipeline pipeline) { } replicaIndexes = b.build(); } + this.supportedStorageTier = pipeline.getSupportedStorageTier(); } public Builder setId(DatanodeID datanodeID) { @@ -676,6 +724,11 @@ public Builder setReplicaIndexes(Map indexes) { return this; } + public Builder setSupportedStorageTier(List supportedStorageTier) { + this.supportedStorageTier = supportedStorageTier; + return this; + } + public Pipeline build() { Objects.requireNonNull(id, "id == null"); Objects.requireNonNull(replicationConfig, "replicationConfig == null"); diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/client/StorageTierUtilTest.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/client/StorageTierUtilTest.java new file mode 100644 index 000000000000..f443d72f390b --- /dev/null +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/client/StorageTierUtilTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.client; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.hadoop.fs.StorageType; +import org.junit.jupiter.api.Test; + +/** + * Provides {@link StorageTierUtil} factory methods for testing. + */ +class StorageTierUtilTest { + @Test + void testFindSupportedStorageTiers() { + assertStorageTiers(createDnStorageTypes( + setOf(StorageType.SSD), + setOf(StorageType.SSD), + setOf(StorageType.SSD)), + 1, setOf(StorageTier.SSD)); + + assertStorageTiers(createDnStorageTypes( + setOf(StorageType.SSD, StorageType.DISK), + setOf(StorageType.SSD, StorageType.DISK), + setOf(StorageType.SSD)), + 1, setOf(StorageTier.SSD)); + + assertStorageTiers(createDnStorageTypes( + setOf(StorageType.SSD, StorageType.DISK), + setOf(StorageType.SSD, StorageType.DISK), + setOf(StorageType.SSD, StorageType.DISK)), + 2, setOf(StorageTier.SSD, StorageTier.DISK)); + + assertStorageTiers(createDnStorageTypes( + setOf(StorageType.SSD, StorageType.DISK, StorageType.ARCHIVE), + setOf(StorageType.SSD, StorageType.DISK), + setOf(StorageType.SSD, StorageType.DISK)), + 2, setOf(StorageTier.SSD, StorageTier.DISK)); + + assertStorageTiers(createDnStorageTypes( + setOf(StorageType.SSD, StorageType.DISK, StorageType.ARCHIVE), + setOf(StorageType.SSD, StorageType.DISK, StorageType.ARCHIVE), + setOf(StorageType.SSD, StorageType.DISK, StorageType.ARCHIVE)), + 3, setOf(StorageTier.SSD, StorageTier.DISK, StorageTier.ARCHIVE)); + + assertStorageTiers(createDnStorageTypes( + setOf(StorageType.SSD, StorageType.DISK)), + 2, setOf(StorageTier.SSD, StorageTier.DISK)); + + assertStorageTiers(createDnStorageTypes( + setOf(StorageType.SSD)), + 1, setOf(StorageTier.SSD)); + + assertStorageTiers(createDnStorageTypes( + setOf(StorageType.DISK)), + 1, setOf(StorageTier.DISK)); + + assertStorageTiers(createDnStorageTypes( + setOf(StorageType.ARCHIVE)), + 1, setOf(StorageTier.ARCHIVE)); + + assertStorageTiers(createDnStorageTypes( + setOf(StorageType.SSD, StorageType.DISK), + setOf(StorageType.SSD, StorageType.DISK), + setOf(StorageType.SSD, StorageType.DISK), + setOf(StorageType.SSD, StorageType.DISK), + setOf(StorageType.SSD, StorageType.DISK)), + 2, setOf(StorageTier.SSD, StorageTier.DISK)); + } + + @Test + void testFindInvalidStorageTiers() { + assertStorageTiers(createDnStorageTypes(), 0, new HashSet<>()); + + assertStorageTiers(createDnStorageTypes( + setOf(StorageType.DISK), + setOf(StorageType.SSD), + setOf(StorageType.SSD)), + 0, new HashSet<>()); + + assertStorageTiers(createDnStorageTypes( + setOf(StorageType.DISK), + setOf(StorageType.DISK), + setOf(StorageType.SSD)), + 0, new HashSet<>()); + + assertStorageTiers(createDnStorageTypes( + setOf(StorageType.ARCHIVE), + setOf(StorageType.DISK), + setOf(StorageType.SSD)), + 0, new HashSet<>()); + + assertStorageTiers(createDnStorageTypes( + setOf(StorageType.RAM_DISK), + setOf(StorageType.SSD), + setOf(StorageType.SSD)), + 0, new HashSet<>()); + + assertStorageTiers(createDnStorageTypes( + setOf(StorageType.RAM_DISK), + setOf(StorageType.RAM_DISK), + setOf(StorageType.RAM_DISK)), + 0, new HashSet<>()); + + assertStorageTiers(createDnStorageTypes( + setOf(StorageType.PROVIDED), + setOf(StorageType.PROVIDED), + setOf(StorageType.PROVIDED)), + 0, new HashSet<>()); + } + + private void assertStorageTiers(List> dnStorageTypes, + int expectedSize, Set expectedTiers) { + List supportedTiers = + StorageTierUtil.findSupportedStorageTiers(dnStorageTypes); + HashSet tiers = new HashSet<>(supportedTiers); + assertEquals(expectedSize, tiers.size()); + assertEquals(expectedTiers, tiers); + } + + private List> createDnStorageTypes(Set... storageSets) { + return Arrays.asList(storageSets); + } + + private Set setOf(StorageType... types) { + return new HashSet<>(Arrays.asList(types)); + } + + private Set setOf(StorageTier... tiers) { + return new HashSet<>(Arrays.asList(tiers)); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java index 75c2f9732223..5bfcc78e9e67 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java @@ -132,31 +132,6 @@ public long getFsAvailable() { return fsAvailable; } - public static StorageType getStorageType(StorageTypeProto proto) throws - IllegalArgumentException { - StorageType storageType; - switch (proto) { - case SSD: - storageType = StorageType.SSD; - break; - case DISK: - storageType = StorageType.DISK; - break; - case ARCHIVE: - storageType = StorageType.ARCHIVE; - break; - case PROVIDED: - storageType = StorageType.PROVIDED; - break; - case RAM_DISK: - storageType = StorageType.RAM_DISK; - break; - default: - throw new IllegalArgumentException("Illegal Storage Type specified: " + proto); - } - return storageType; - } - /** * Returns the StorageReportProto protoBuf message for the Storage Location * report. diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto index 5b7399215d8e..81b1e89089e9 100644 --- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto +++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto @@ -143,6 +143,7 @@ message Pipeline { optional DatanodeIDProto leaderDatanodeID = 101; optional DatanodeIDProto suggestedLeaderDatanodeID = 102; + repeated StorageTierProto supportedStorageTier = 103; } message KeyValue { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeUtils.java new file mode 100644 index 000000000000..b49719b5b2ca --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeUtils.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdds.client.StorageTier; +import org.apache.hadoop.hdds.client.StorageTierUtil; +import org.apache.hadoop.hdds.client.StorageTypeUtils; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.StorageTypeProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; + +/** + * Util class for Node operations. + */ +public final class NodeUtils { + + private NodeUtils() { + } + + public static List getDatanodesStorageTypes( + List dns, NodeManager nodeManager) { + List> dnStorageTypes = new ArrayList<>(); + for (DatanodeDetails dn : dns) { + DatanodeInfo datanodeInfo = nodeManager.getDatanodeInfo(dn); + dnStorageTypes.add(getDatanodeStorageTypes(datanodeInfo)); + } + return StorageTierUtil.findSupportedStorageTiers(dnStorageTypes); + } + + public static Set getDatanodeStorageTypes( + DatanodeInfo datanodeInfo) { + List storageReportProtos = + datanodeInfo.getStorageReports(); + if (storageReportProtos == null || storageReportProtos.isEmpty()) { + return Collections.emptySet(); + } + + Set uniqueStorageTypes = new HashSet<>(); + for (StorageReportProto storageReportProto : storageReportProtos) { + uniqueStorageTypes.add(StorageTypeUtils.getFromProtobuf(( + storageReportProto.getStorageType()))); + } + return uniqueStorageTypes; + } + + public static StorageTypeProto getStorageTypeFromStorageReportProto( + StorageReportProto storageReportProto) { + return storageReportProto.hasStorageType() + ? storageReportProto.getStorageType() : StorageTypeProto.DISK; + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 5e3d333e2b42..ca7a32217132 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -53,6 +53,7 @@ import javax.management.ObjectName; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.client.StorageTier; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.StorageUnit; @@ -713,6 +714,7 @@ public void processNodeReport(DatanodeDetails datanodeDetails, datanodeInfo.updateStorageReports(nodeReport.getStorageReportList()); datanodeInfo.updateMetaDataStorageReports(nodeReport. getMetadataStorageReportList()); + updateSupportedStorageTier(datanodeInfo); metrics.incNumNodeReportProcessed(); } } catch (NodeNotFoundException e) { @@ -722,6 +724,49 @@ public void processNodeReport(DatanodeDetails datanodeDetails, } } + private void updateSupportedStorageTier(DatanodeInfo datanodeInfo) { + if (scmContext.getScm() == null) { + LOG.debug("Skip the updating of Pipeline supported StorageTier for Recon"); + return; + } + + PipelineManager pipelineManager = scmContext.getScm().getPipelineManager(); + if (pipelineManager == null) { + LOG.debug("Skip the updating of Pipeline supported StorageTier for Recon"); + return; + } + + Set pipelines = nodeStateManager.getPipelineByDnID( + datanodeInfo.getID()); + for (PipelineID pipelineId : pipelines) { + try { + Pipeline pipeline = pipelineManager.getPipeline(pipelineId); + Set currentTiers = + pipeline.getSupportedStorageTier() != null + ? new HashSet<>(pipeline.getSupportedStorageTier()) + : Collections.emptySet(); + List newSupportedTiers = + NodeUtils.getDatanodesStorageTypes(pipeline.getNodes(), this); + Set newSupportedTierSet = + new HashSet<>(newSupportedTiers); + if (!currentTiers.equals(newSupportedTierSet)) { + pipeline.setSupportedStorageTier(newSupportedTiers); + LOG.info("Updated supported storage tiers for Pipeline ID {} from {} " + + "to {} by Datanode {}", + pipeline.getId(), currentTiers, newSupportedTierSet, + datanodeInfo.getID()); + } + } catch (PipelineNotFoundException e) { + LOG.debug("Reported Datanode {} pipeline {} is not found", + datanodeInfo.getID(), pipelineId); + } catch (RuntimeException e) { + LOG.warn("Failed to update supported storage tiers for Pipeline ID {} " + + "by Datanode {} due to: {}", + pipelineId, datanodeInfo.getID(), e.getMessage(), e); + } + } + } + /** * Process Layout Version report. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java index 6b55f23d30c8..70c4f4c53b60 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java @@ -28,15 +28,16 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.StorageTier; -import org.apache.hadoop.hdds.client.StorageTierUtil; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.apache.hadoop.hdds.scm.node.NodeUtils; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,10 +91,16 @@ protected Pipeline create(ECReplicationConfig replicationConfig, List excludedNodes, List favoredNodes, StorageTier storageTier) throws IOException { - StorageType storageType = StorageTierUtil.getStorageTypeForUniformStorageTier(storageTier, replicationConfig); + StorageType storageType = storageTier.getUniformStorageType(); List dns = placementPolicy .chooseDatanodes(excludedNodes, favoredNodes, replicationConfig.getRequiredNodes(), 0, this.containerSizeBytes, storageType); + List storageTiers = + NodeUtils.getDatanodesStorageTypes(dns, getNodeManager()); + if (!storageTiers.contains(storageTier)) { + throw new SCMException(String.format("Cannot create pipeline for StorageTier %s replicationConfig: %s", + storageTier, replicationConfig), SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } return create(replicationConfig, dns); } @@ -140,12 +147,15 @@ public Pipeline createForRead( private Pipeline createPipelineInternal(ECReplicationConfig repConfig, List dns, Map indexes) { + List storageTiers = + NodeUtils.getDatanodesStorageTypes(dns, getNodeManager()); return Pipeline.newBuilder() .setId(PipelineID.randomId()) .setState(Pipeline.PipelineState.ALLOCATED) .setReplicationConfig(repConfig) .setNodes(dns) .setReplicaIndexes(indexes) + .setSupportedStorageTier(storageTiers) .build(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java index 0a02cb5fd3bd..b207378ce3f3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java @@ -89,7 +89,7 @@ List pickNodesNotUsed(REPLICATION_CONFIG replicationConfig, long dataSizeRequired, StorageTier storageTier) throws SCMException { StorageTierUtil.validateNotEmpty(storageTier); - StorageType storageType = StorageTierUtil.getStorageTypeForUniformStorageTier(storageTier, replicationConfig); + StorageType storageType = storageTier.getUniformStorageType(); int nodesRequired = replicationConfig.getRequiredNodes(); List healthyDNs = pickAllNodesNotUsed(replicationConfig); List healthyDNsWithSpace = healthyDNs.stream() diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java index 9b9d08751574..32a1e337ab99 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java @@ -18,9 +18,14 @@ package org.apache.hadoop.hdds.scm.pipeline; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.Objects; +import java.util.Set; import java.util.concurrent.TimeoutException; import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.client.StorageTier; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; @@ -29,6 +34,7 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.node.NodeUtils; import org.apache.hadoop.hdds.scm.safemode.SafeModeManager; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; @@ -119,6 +125,7 @@ protected void processPipelineReport(PipelineReport report, setReportedDatanode(pipeline, dn); setPipelineLeaderId(report, pipeline, dn); + updateSupportedStorageTiers(pipeline); if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { if (LOGGER.isDebugEnabled()) { @@ -139,6 +146,28 @@ protected void processPipelineReport(PipelineReport report, } } + private void updateSupportedStorageTiers(Pipeline pipeline) { + try { + Set currentSupportedTiers = + pipeline.getSupportedStorageTier() != null + ? new HashSet<>(pipeline.getSupportedStorageTier()) + : Collections.emptySet(); + + Set newSupportedTiers = + new HashSet<>(NodeUtils.getDatanodesStorageTypes(pipeline.getNodes(), + scmContext.getScm().getScmNodeManager())); + + if (!currentSupportedTiers.equals(newSupportedTiers)) { + pipeline.setSupportedStorageTier(new ArrayList<>(newSupportedTiers)); + LOGGER.info("Updated supported storage tiers for Pipeline ID {} from {} to {}", + pipeline.getId(), currentSupportedTiers, newSupportedTiers); + } + } catch (Exception e) { + LOGGER.warn("Failed to update supported storage tiers for Pipeline ID {} due to: {}", + pipeline.getId(), e.getMessage(), e); + } + } + protected void setReportedDatanode(Pipeline pipeline, DatanodeDetails dn) throws IOException { pipeline.reportDatanode(dn); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index 9b763e6d5c2e..8f8d1d595ce1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -40,6 +41,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.apache.hadoop.hdds.scm.node.NodeUtils; import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; import org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.LeaderChoosePolicy; import org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.LeaderChoosePolicyFactory; @@ -154,8 +156,8 @@ public synchronized Pipeline create(RatisReplicationConfig replicationConfig, : String.format(": %d", pipelineNumberLimit); throw new SCMException( - String.format("Cannot create pipeline as it would exceed the limit %s replicationConfig: %s", - limitInfo, replicationConfig), + String.format("Cannot create pipeline for StorageTier %s as it would exceed the limit %s " + + "replicationConfig: %s", storageTier, limitInfo, replicationConfig), SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE ); } @@ -169,7 +171,7 @@ public synchronized Pipeline create(RatisReplicationConfig replicationConfig, break; case THREE: StorageTierUtil.validateNotEmpty(storageTier); - StorageType storageType = StorageTierUtil.getStorageTypeForUniformStorageTier(storageTier, replicationConfig); + StorageType storageType = storageTier.getUniformStorageType(); List excludeDueToEngagement = filterPipelineEngagement(); if (!excludeDueToEngagement.isEmpty()) { if (excludedNodes.isEmpty()) { @@ -188,6 +190,12 @@ public synchronized Pipeline create(RatisReplicationConfig replicationConfig, DatanodeDetails suggestedLeader = leaderChoosePolicy.chooseLeader(dns); + List storageTiers = NodeUtils.getDatanodesStorageTypes(dns, getNodeManager()); + if (!storageTiers.contains(storageTier)) { + throw new SCMException(String.format("Cannot create pipeline for StorageTier %s replicationConfig: %s", + storageTier, replicationConfig), SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + Preconditions.checkArgument(storageTiers.contains(storageTier)); Pipeline pipeline = Pipeline.newBuilder() .setId(PipelineID.randomId()) .setState(PipelineState.ALLOCATED) @@ -195,6 +203,7 @@ public synchronized Pipeline create(RatisReplicationConfig replicationConfig, .setNodes(dns) .setSuggestedLeaderId( suggestedLeader != null ? suggestedLeader.getID() : null) + .setSupportedStorageTier(storageTiers) .build(); // Send command to datanodes to create pipeline @@ -207,8 +216,8 @@ public synchronized Pipeline create(RatisReplicationConfig replicationConfig, createCommand.setTerm(scmContext.getTermOfLeader()); dns.forEach(node -> { - LOG.info("Sending CreatePipelineCommand for pipeline:{} to datanode:{}", - pipeline.getId(), node); + LOG.info("Sending CreatePipelineCommand for pipeline:{} to datanode:{} storageTier:{}", + pipeline.getId(), node, storageTier); eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, new CommandForDatanode<>(node, createCommand)); }); @@ -219,11 +228,13 @@ public synchronized Pipeline create(RatisReplicationConfig replicationConfig, @Override public Pipeline create(RatisReplicationConfig replicationConfig, List nodes) { + List storageTiers = NodeUtils.getDatanodesStorageTypes(nodes, getNodeManager()); return Pipeline.newBuilder() .setId(PipelineID.randomId()) .setState(PipelineState.ALLOCATED) .setReplicationConfig(replicationConfig) .setNodes(nodes) + .setSupportedStorageTier(storageTiers) .build(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java index 621029389377..13065c02c7ec 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java @@ -29,8 +29,10 @@ import org.apache.hadoop.hdds.client.StorageTypeUtils; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeUtils; import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; /** @@ -56,7 +58,7 @@ public Pipeline create(StandaloneReplicationConfig replicationConfig, StorageTie public Pipeline create(StandaloneReplicationConfig replicationConfig, List excludedNodes, List favoredNodes, StorageTier storageTier) throws IOException { - StorageType storageType = StorageTierUtil.getStorageTypeForUniformStorageTier(storageTier, replicationConfig); + StorageType storageType = storageTier.getUniformStorageType(); List dns = pickNodesNotUsed(replicationConfig); dns = dns.stream().filter(dn -> ((DatanodeInfo) dn).getStorageReports().stream() @@ -73,23 +75,31 @@ public Pipeline create(StandaloneReplicationConfig replicationConfig, } Collections.shuffle(dns); + List storageTiers = NodeUtils.getDatanodesStorageTypes(dns, getNodeManager()); + if (!storageTiers.contains(storageTier)) { + throw new SCMException(String.format("Cannot create pipeline for StorageTier %s replicationConfig: %s", + storageTier, replicationConfig), SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } return Pipeline.newBuilder() .setId(PipelineID.randomId()) .setState(PipelineState.OPEN) .setReplicationConfig(replicationConfig) .setNodes(dns.subList(0, replicationConfig.getReplicationFactor().getNumber())) + .setSupportedStorageTier(storageTiers) .build(); } @Override public Pipeline create(StandaloneReplicationConfig replicationConfig, List nodes) { + List storageTiers = NodeUtils.getDatanodesStorageTypes(nodes, getNodeManager()); return Pipeline.newBuilder() .setId(PipelineID.randomId()) .setState(PipelineState.OPEN) .setReplicationConfig(replicationConfig) .setNodes(nodes) + .setSupportedStorageTier(storageTiers) .build(); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 096720afacaa..11e24a249132 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -38,6 +38,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdds.client.StorageTypeUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; @@ -120,6 +121,8 @@ public class MockNodeManager implements NodeManager { private final OzoneConfiguration conf = new OzoneConfiguration(); private StorageType storageType = null; + private Map nodeStorageTypeMap = new HashMap<>(); + { this.healthyNodes = new LinkedList<>(); this.staleNodes = new LinkedList<>(); @@ -289,10 +292,12 @@ public List getNodes( long capacity = nodeMetricMap.get(dd).getCapacity().get(); long used = nodeMetricMap.get(dd).getScmUsed().get(); long remaining = nodeMetricMap.get(dd).getRemaining().get(); + StorageType nodeStorageType = nodeStorageTypeMap.get(dd.getID()) != null ? + nodeStorageTypeMap.get(dd.getID()) : storageType; StorageReportProto storage1 = HddsTestUtils.createStorageReport( di.getID(), "/data1-" + di.getID(), capacity, used, remaining, - storageType == null ? null : getStorageTypeProto(storageType)); + nodeStorageType == null ? null : getStorageTypeProto(nodeStorageType)); MetadataStorageReportProto metaStorage1 = HddsTestUtils.createMetadataStorageReport( "/metadata1-" + di.getID(), capacity, used, remaining, null); @@ -457,9 +462,12 @@ public DatanodeInfo getDatanodeInfo(DatanodeDetails dd) { long capacity = nodeMetricMap.get(dd).getCapacity().get(); long used = nodeMetricMap.get(dd).getScmUsed().get(); long remaining = nodeMetricMap.get(dd).getRemaining().get(); + StorageType nodeStorageType = nodeStorageTypeMap.get(dd.getID()) != null ? + nodeStorageTypeMap.get(dd.getID()) : storageType; StorageReportProto storage1 = HddsTestUtils.createStorageReport( di.getID(), "/data1-" + di.getUuidString(), - capacity, used, remaining, null); + capacity, used, remaining, + nodeStorageType == null ? null : StorageTypeUtils.getStorageTypeProto(nodeStorageType)); MetadataStorageReportProto metaStorage1 = HddsTestUtils.createMetadataStorageReport( "/metadata1-" + di.getUuidString(), capacity, used, @@ -1046,4 +1054,8 @@ public void setCurrentState(long currentState) { } } + + public void setStorageTypeForNode(DatanodeID uuid, StorageType st) { + this.nodeStorageTypeMap.put(uuid, st); + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java index 4ad8ee4a302f..a1bd5194012c 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeUtils; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventQueue; @@ -71,6 +72,8 @@ public Pipeline create(RatisReplicationConfig replicationConfig, StorageTier sto return super.create(replicationConfig, storageTier); } else { Pipeline initialPipeline = super.create(replicationConfig, storageTier); + List storageTiers = + NodeUtils.getDatanodesStorageTypes(initialPipeline.getNodes(), getNodeManager()); Pipeline pipeline = Pipeline.newBuilder() .setId(initialPipeline.getId()) // overwrite pipeline state to main ALLOCATED @@ -79,6 +82,7 @@ public Pipeline create(RatisReplicationConfig replicationConfig, StorageTier sto .fromProtoTypeAndFactor(initialPipeline.getType(), replicationConfig.getReplicationFactor())) .setNodes(initialPipeline.getNodes()) + .setSupportedStorageTier(storageTiers) .build(); return pipeline; } @@ -95,11 +99,14 @@ public static void markPipelineHealthy(Pipeline pipeline) @Override public Pipeline create(RatisReplicationConfig replicationConfig, List nodes) { + List storageTiers = NodeUtils.getDatanodesStorageTypes(nodes, getNodeManager()); return Pipeline.newBuilder() .setId(PipelineID.randomId()) .setState(Pipeline.PipelineState.OPEN) .setReplicationConfig(replicationConfig) .setNodes(nodes) + .setSupportedStorageTier(storageTiers) .build(); } + } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java index 4e53f6d629ae..b58b834ca74d 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java @@ -63,10 +63,12 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.client.StorageTier; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; @@ -163,6 +165,18 @@ public void cleanup() throws Exception { } } + private PipelineManagerImpl createPipelineManager(NodeManager manager, boolean isLeader) + throws IOException { + return PipelineManagerImpl.newPipelineManager(conf, + SCMHAManagerStub.getInstance(isLeader), + manager, + SCMDBDefinition.PIPELINES.getTable(dbStore), + new EventQueue(), + scmContext, + serviceManager, + testClock); + } + private PipelineManagerImpl createPipelineManager(boolean isLeader) throws IOException { return PipelineManagerImpl.newPipelineManager(conf, @@ -377,7 +391,7 @@ public void testPipelineReport() throws Exception { // get pipeline report from each dn in the pipeline PipelineReportHandler pipelineReportHandler = new PipelineReportHandler(scmSafeModeManager, pipelineManager, - SCMContext.emptyContext(), conf); + scmContext, conf); nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline, pipelineReportHandler, false)); sendPipelineReport(nodes.get(nodes.size() - 1), pipeline, @@ -405,6 +419,68 @@ public void testPipelineReport() throws Exception { } } + @Test + public void testPipelineReportUpdateSupportedStorageTier() throws Exception { + // Set Env + int nodeCount = 3; + MockNodeManager localNodeManager = new MockNodeManager(true, nodeCount, StorageType.DISK); + SCMContext localScmContext = spy(SCMContext.emptyContext()); + StorageContainerManager localScm = mock(StorageContainerManager.class); + when(localScm.getScmNodeManager()).thenReturn(localNodeManager); + when(localScmContext.getScm()).thenReturn(localScm); + + PipelineManagerImpl pipelineManager = createPipelineManager(localNodeManager, true); + SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(conf, + localNodeManager, pipelineManager, mock(ContainerManager.class), + serviceManager, new EventQueue(), scmContext); + List nodes = localNodeManager.getNodes(NodeStatus.inServiceHealthy()); + assertEquals(nodeCount, nodes.size()); + Pipeline pipeline = pipelineManager.createPipeline(RatisReplicationConfig + .getInstance(ReplicationFactor.THREE)); + assertTrue(pipeline.getSupportedStorageTier().contains(StorageTier.DISK)); + + assertFalse(pipelineManager.getPipeline(pipeline.getId()).isHealthy()); + PipelineReportHandler pipelineReportHandler = new PipelineReportHandler( + scmSafeModeManager, pipelineManager, localScmContext, conf); + nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline, + pipelineReportHandler, false)); + sendPipelineReport(nodes.get(nodes.size() - 1), pipeline, + pipelineReportHandler, true); + + // All the Datanode Volume StorageType is DISK so the Pipeline StorageTier will be StorageTier.DISK + assertTrue(pipelineManager.getPipeline(pipeline.getId()).isHealthy()); + assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen()); + assertEquals(1, pipeline.getSupportedStorageTier().size()); + assertTrue(pipeline.getSupportedStorageTier().contains(StorageTier.DISK)); + + // Only the first Datanode updated its NodeReport to SSD, + // Pipeline's Datanode StorageType [SSD, DISK, DISK] is not a valid StorageTier + localNodeManager.setStorageTypeForNode(nodes.get(0).getID(), StorageType.SSD); + sendPipelineReport(nodes.get(0), pipeline, pipelineReportHandler, false); + assertEquals(0, + pipelineManager.getPipeline(pipeline.getId()).getSupportedStorageTier().size()); + + // The first and second Datanode updated its NodeReport to SSD + // Pipeline's Datanode StorageType [SSD, SSD, DISK] is not a valid StorageTier + localNodeManager.setStorageTypeForNode(nodes.get(1).getID(), StorageType.SSD); + sendPipelineReport(nodes.get(1), pipeline, pipelineReportHandler, false); + assertEquals(0, + pipelineManager.getPipeline(pipeline.getId()).getSupportedStorageTier().size()); + + // The first and second Datanode updated its NodeReport to SSD + // Pipeline's Datanode StorageType [SSD, SSD, SSD] is StorageTier.SSD + localNodeManager.setStorageTypeForNode(nodes.get(2).getID(), StorageType.SSD); + sendPipelineReport(nodes.get(2), pipeline, pipelineReportHandler, true); + assertEquals(1, + pipelineManager.getPipeline(pipeline.getId()).getSupportedStorageTier().size()); + assertTrue(pipelineManager.getPipeline(pipeline.getId()) + .getSupportedStorageTier().contains(StorageTier.SSD)); + + // close the pipeline and clean up + pipelineManager.closePipeline(pipeline.getId()); + pipelineManager.close(); + } + @Test public void testPipelineCreationFailedMetric() throws Exception { PipelineManagerImpl pipelineManager = createPipelineManager(true); @@ -478,7 +554,7 @@ public void testPipelineOpenOnlyWhenLeaderReported() throws Exception { serviceManager, new EventQueue(), scmContext); PipelineReportHandler pipelineReportHandler = new PipelineReportHandler(scmSafeModeManager, pipelineManager, - SCMContext.emptyContext(), conf); + scmContext, conf); // Report pipelines with leaders List nodes = pipeline.getNodes(); @@ -855,7 +931,7 @@ public void testWaitForAllocatedPipeline() throws IOException { = new HealthyPipelineChoosePolicy(); ContainerManager containerManager = mock(ContainerManager.class); - + WritableContainerProvider provider; String owner = "TEST"; Pipeline allocatedPipeline; @@ -876,7 +952,7 @@ public void testWaitForAllocatedPipeline() throws IOException { ContainerInfo container = HddsTestUtils. getContainer(HddsProtos.LifeCycleState.OPEN, allocatedPipeline.getId()); - + pipelineManager.addContainerToPipeline( allocatedPipeline.getId(), container.containerID()); doReturn(container).when(containerManager).getMatchingContainer(anyLong(), @@ -902,7 +978,7 @@ public void testWaitForAllocatedPipeline() throws IOException { return call.callRealMethod(); }).when(pipelineManagerSpy).waitOnePipelineReady(any(), anyLong()); - + ContainerInfo c = provider.getContainer(1, repConfig, owner, new ExcludeList()); assertEquals(c, container, "Expected container was returned"); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java index af965a44f30a..c6405cf53535 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java @@ -48,7 +48,6 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; -import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; @@ -88,6 +87,7 @@ public class TestRatisPipelineProvider { private File testDir; private DBStore dbStore; private int nodeCount = 10; + private List datanodeList; public void init(int maxPipelinePerNode, StorageTier storageTier) throws Exception { init(maxPipelinePerNode, new OzoneConfiguration(), storageTier); @@ -102,11 +102,12 @@ public void init(int maxPipelinePerNode, OzoneConfiguration conf, File dir, StorageTier storageTier) throws Exception { assertTrue(storageTier.isUniform(), "Only support uniform StorageTier"); assertFalse(storageTier.equals(StorageTier.EMPTY), "not support the EMPTY StorageTier"); - StorageType storageType = storageTier.getStorageTypes( - RatisReplicationConfig.getInstance(ReplicationFactor.ONE).getRequiredNodes()).get(0); + StorageType storageType = storageTier. + getStorageTypes(ReplicationFactor.ONE.getNumber()).get(0); conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.getAbsolutePath()); dbStore = DBStoreBuilder.createDBStore(conf, SCMDBDefinition.get()); nodeManager = new MockNodeManager(true, nodeCount, storageType); + datanodeList = nodeManager.getNodes(NodeStatus.inServiceHealthy()); nodeManager.setNumPipelinePerDatanode(maxPipelinePerNode); SCMHAManager scmhaManager = SCMHAManagerStub.getInstance(true); conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, @@ -132,11 +133,12 @@ void cleanup() throws Exception { private static void assertPipelineProperties( Pipeline pipeline, HddsProtos.ReplicationFactor expectedFactor, HddsProtos.ReplicationType expectedReplicationType, - Pipeline.PipelineState expectedState) { + Pipeline.PipelineState expectedState, StorageTier expectedStorageTier) { assertEquals(expectedState, pipeline.getPipelineState()); assertEquals(expectedReplicationType, pipeline.getType()); assertEquals(expectedFactor.getNumber(), pipeline.getReplicationConfig().getRequiredNodes()); assertEquals(expectedFactor.getNumber(), pipeline.getNodes().size()); + assertTrue(pipeline.getSupportedStorageTier().contains(expectedStorageTier)); } private void createPipelineAndAssertions( @@ -145,7 +147,7 @@ private void createPipelineAndAssertions( Pipeline pipeline = provider.create(RatisReplicationConfig .getInstance(factor), storageTier); assertPipelineProperties(pipeline, factor, REPLICATION_TYPE, - Pipeline.PipelineState.ALLOCATED); + Pipeline.PipelineState.ALLOCATED, storageTier); HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage( ClientVersion.CURRENT_VERSION); stateManager.addPipeline(pipelineProto); @@ -156,7 +158,7 @@ private void createPipelineAndAssertions( HddsProtos.Pipeline pipelineProto1 = pipeline1.getProtobufMessage( ClientVersion.CURRENT_VERSION); assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE, - Pipeline.PipelineState.ALLOCATED); + Pipeline.PipelineState.ALLOCATED, storageTier); // New pipeline should not overlap with the previous created pipeline assertThat(intersection(pipeline.getNodes(), pipeline1.getNodes()).size()) .isLessThan(factor.getNumber()); @@ -184,7 +186,7 @@ public void testCreatePipelineWithFactorOne(StorageTier storageTier) throws Exce private List createListOfNodes(int count) { List nodes = new ArrayList<>(); for (int i = 0; i < count; i++) { - nodes.add(MockDatanodeDetails.randomDatanodeDetails()); + nodes.add(datanodeList.get(i)); } return nodes; } @@ -197,7 +199,7 @@ public void testCreatePipelineWithFactor(StorageTier storageTier) throws Excepti Pipeline pipeline = provider.create(RatisReplicationConfig .getInstance(factor), storageTier); assertPipelineProperties(pipeline, factor, REPLICATION_TYPE, - Pipeline.PipelineState.ALLOCATED); + Pipeline.PipelineState.ALLOCATED, storageTier); HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage( ClientVersion.CURRENT_VERSION); stateManager.addPipeline(pipelineProto); @@ -206,7 +208,7 @@ public void testCreatePipelineWithFactor(StorageTier storageTier) throws Excepti Pipeline pipeline1 = provider.create(RatisReplicationConfig .getInstance(factor), storageTier); assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE, - Pipeline.PipelineState.ALLOCATED); + Pipeline.PipelineState.ALLOCATED, storageTier); HddsProtos.Pipeline pipelineProto1 = pipeline1.getProtobufMessage( ClientVersion.CURRENT_VERSION); stateManager.addPipeline(pipelineProto1); @@ -223,13 +225,13 @@ public void testCreatePipelineWithNodes() throws Exception { provider.create(RatisReplicationConfig.getInstance(factor), createListOfNodes(factor.getNumber())); assertPipelineProperties(pipeline, factor, REPLICATION_TYPE, - Pipeline.PipelineState.OPEN); + Pipeline.PipelineState.OPEN, StorageTier.getDefaultTier()); factor = HddsProtos.ReplicationFactor.ONE; pipeline = provider.create(RatisReplicationConfig.getInstance(factor), createListOfNodes(factor.getNumber())); assertPipelineProperties(pipeline, factor, REPLICATION_TYPE, - Pipeline.PipelineState.OPEN); + Pipeline.PipelineState.OPEN, StorageTier.getDefaultTier()); } @Test @@ -248,8 +250,7 @@ public void testCreateFactorTHREEPipelineWithSameDatanodes() RatisReplicationConfig.getInstance(ReplicationFactor.THREE), healthyNodes); Pipeline pipeline3 = provider.createForRead( - RatisReplicationConfig.getInstance(ReplicationFactor.THREE), - replicas); + RatisReplicationConfig.getInstance(ReplicationFactor.THREE), replicas); assertEquals(pipeline1.getNodeSet(), pipeline2.getNodeSet()); assertEquals(pipeline2.getNodeSet(), pipeline3.getNodeSet()); @@ -288,7 +289,7 @@ public void testCreatePipelinesDnExclude(StorageTier storageTier) throws Excepti Pipeline pipeline = provider.create( RatisReplicationConfig.getInstance(factor), storageTier); assertPipelineProperties(pipeline, factor, REPLICATION_TYPE, - Pipeline.PipelineState.ALLOCATED); + Pipeline.PipelineState.ALLOCATED, storageTier); HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage( ClientVersion.CURRENT_VERSION); nodeManager.addPipeline(pipeline); @@ -459,7 +460,8 @@ public void testCreatePipelineThrowErrorWithDataNodeLimit(int limit, int pipelin // Validate exception message. String expectedError = String.format( - "Cannot create pipeline as it would exceed the limit per datanode: %d replicationConfig: RATIS/THREE", limit); + "Cannot create pipeline for StorageTier DISK as it would exceed the limit per" + + " datanode: %d replicationConfig: RATIS/THREE", limit); assertEquals(expectedError, exception.getMessage()); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java index 42aa1943aa7a..7a0add3f6bcb 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hdds.client.StorageTier; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.exceptions.SCMException; @@ -42,6 +41,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub; import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.hdds.utils.db.DBStoreBuilder; import org.apache.hadoop.ozone.ClientVersion; @@ -63,6 +63,7 @@ public class TestSimplePipelineProvider { @TempDir private File testDir; private DBStore dbStore; + private List datanodeList; @BeforeEach public void startup() throws Exception { @@ -78,6 +79,7 @@ public void init(StorageTier storageTier) throws Exception { StorageType storageType = storageTier.getStorageTypes( RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE).getRequiredNodes()).get(0); NodeManager nodeManager = new MockNodeManager(true, 10, storageType); + datanodeList = nodeManager.getNodes(NodeStatus.inServiceHealthy()); SCMHAManager scmhaManager = SCMHAManagerStub.getInstance(true); stateManager = PipelineStateManagerImpl.newBuilder() .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore)) @@ -109,6 +111,7 @@ public void testCreatePipelineWithFactor(StorageTier storageTier) throws Excepti assertEquals(pipeline.getReplicationConfig().getRequiredNodes(), factor.getNumber()); assertEquals(pipeline.getPipelineState(), Pipeline.PipelineState.OPEN); assertEquals(pipeline.getNodes().size(), factor.getNumber()); + assertTrue(pipeline.getSupportedStorageTier().contains(storageTier)); factor = HddsProtos.ReplicationFactor.ONE; Pipeline pipeline1 = @@ -122,12 +125,13 @@ public void testCreatePipelineWithFactor(StorageTier storageTier) throws Excepti .getReplicationFactor(), factor); assertEquals(pipeline1.getPipelineState(), Pipeline.PipelineState.OPEN); assertEquals(pipeline1.getNodes().size(), factor.getNumber()); + assertTrue(pipeline.getSupportedStorageTier().contains(storageTier)); } private List createListOfNodes(int nodeCount) { List nodes = new ArrayList<>(); for (int i = 0; i < nodeCount; i++) { - nodes.add(MockDatanodeDetails.randomDatanodeDetails()); + nodes.add(datanodeList.get(i)); } return nodes; } @@ -147,6 +151,7 @@ public void testCreatePipelineWithNodes() .getReplicationFactor(), factor); assertEquals(pipeline.getPipelineState(), Pipeline.PipelineState.OPEN); assertEquals(pipeline.getNodes().size(), factor.getNumber()); + assertTrue(pipeline.getSupportedStorageTier().contains(StorageTier.getDefaultTier())); factor = HddsProtos.ReplicationFactor.ONE; pipeline = provider.create(StandaloneReplicationConfig.getInstance(factor), @@ -157,6 +162,7 @@ public void testCreatePipelineWithNodes() .getReplicationFactor(), factor); assertEquals(pipeline.getPipelineState(), Pipeline.PipelineState.OPEN); assertEquals(pipeline.getNodes().size(), factor.getNumber()); + assertTrue(pipeline.getSupportedStorageTier().contains(StorageTier.getDefaultTier())); } @Test diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/hdds/protocol/StorageTierTest.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/hdds/protocol/StorageTierTest.java index 0630dd45cd3e..af978dfef366 100644 --- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/hdds/protocol/StorageTierTest.java +++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/hdds/protocol/StorageTierTest.java @@ -59,7 +59,8 @@ void testGetStorageTypesWithReplicationConfig() { } for (ReplicationConfig replicationConfig : Arrays.asList(ratisOne, ratisThree, standaloneOne, standaloneThree)) { - List storageTypes = tier.getStorageTypes(replicationConfig.getRequiredNodes()); + List storageTypes = tier.getStorageTypes( + replicationConfig.getRequiredNodes()); if (tier.equals(StorageTier.EMPTY)) { assertEquals(0, storageTypes.size()); } else { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java index 431e88574906..5375206bab7e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java @@ -104,6 +104,10 @@ public void testPipelineWithScmRestart() assertNotSame(ratisPipeline2AfterRestart, ratisPipeline2); assertEquals(ratisPipeline1AfterRestart, ratisPipeline1); assertEquals(ratisPipeline2AfterRestart, ratisPipeline2); + assertEquals(ratisPipeline1AfterRestart.getSupportedStorageTier(), + ratisPipeline1.getSupportedStorageTier()); + assertEquals(ratisPipeline2AfterRestart.getSupportedStorageTier(), + ratisPipeline2.getSupportedStorageTier()); // Try creating a new container, it should be from the same pipeline // as was before restart