Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ public boolean isUniform() {
return isUniform;
}

public StorageType getUniformStorageType() {
if (!isUniform()) {
throw new IllegalArgumentException("Uniform storage type is not supported");
}
return storageTypes.get(0);
}

/**
* Maps a StorageTier to its corresponding StorageType based on replication type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.hadoop.hdds.client;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;

Expand Down Expand Up @@ -45,22 +47,31 @@ public static void validateNotEmpty(StorageTier storageTier) throws SCMException
}
}

/**
* Returns the StorageType for uniform StorageTier.
*
* @param storageTier the StorageTier to get StorageType from
* @return The uniform StorageTier corresponding StorageType
* @throws SCMException if the StorageTier is non-uniform or the EMPTY StorageTier
*/
public static StorageType getStorageTypeForUniformStorageTier(StorageTier storageTier, ReplicationConfig config)
throws SCMException {
validateNotEmpty(storageTier);
List<StorageType> storageTypes = storageTier.getStorageTypes(config.getRequiredNodes());
if (storageTier.isUniform()) {
return storageTypes.get(0);
} else {
throw new SCMException("Unsupported non-uniform storage tier " + storageTier,
SCMException.ResultCodes.UNSUPPORTED_NON_UNIFORM_STORAGE_TIER);
public static List<StorageTier> findSupportedStorageTiers(
List<Set<StorageType>> dnStorageTypes) {
List<StorageTier> supportedStorageTiers = new ArrayList<>();
if (dnStorageTypes.isEmpty()) {
return supportedStorageTiers;
}
// We only support uniform storage tiers currently
for (StorageTier storageTier : StorageTier.values()) {
if (storageTier.equals(StorageTier.EMPTY)) {
continue;
}
if (!storageTier.isUniform()) {
throw new UnsupportedOperationException(storageTier + " is not a uniform storage tier");
}
boolean supportedTier = true;
for (Set<StorageType> dnStorageType : dnStorageTypes) {
if (!dnStorageType.contains(storageTier.getUniformStorageType())) {
supportedTier = false;
break;
}
}
if (supportedTier) {
supportedStorageTiers.add(storageTier);
}
}
return supportedStorageTiers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -42,6 +44,7 @@
import org.apache.hadoop.hdds.client.ReplicatedReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.client.StorageTier;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
Expand Down Expand Up @@ -86,6 +89,9 @@ public final class Pipeline {
private Instant creationTimestamp;
// suggested leader id with high priority
private final DatanodeID suggestedLeaderId;
private List<StorageTier> supportedStorageTier;

private final ReadWriteLock lock;

private final Instant stateEnterTime;

Expand Down Expand Up @@ -114,6 +120,8 @@ private Pipeline(Builder b) {
replicaIndexes = b.replicaIndexes;
creationTimestamp = b.creationTimestamp != null ? b.creationTimestamp : Instant.now();
stateEnterTime = Instant.now();
supportedStorageTier = b.supportedStorageTier;
lock = new ReentrantReadWriteLock();
}

public static Codec<Pipeline> getCodec() {
Expand Down Expand Up @@ -170,6 +178,32 @@ public DatanodeID getSuggestedLeaderId() {
return suggestedLeaderId;
}

/**
* Return the Pipeline supported StorageTier.
*
* @return Supported StorageTier
*/
public List<StorageTier> getSupportedStorageTier() {
lock.readLock().lock();
try {
return supportedStorageTier;
} finally {
lock.readLock().unlock();
}
}

/**
* Set the storageTier supported by the pipeline.
*/
public void setSupportedStorageTier(List<StorageTier> supportedStorageTier) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using setSupportedStorageTier here breaks the immutable design of the Pipeline class. A better approach would be to follow the existing pattern of copyWithNodesInOrder to update the pipeline's state.

lock.writeLock().lock();
try {
this.supportedStorageTier = supportedStorageTier;
} finally {
lock.writeLock().unlock();
}
}

/**
* Set the creation timestamp. Only for protobuf now.
*/
Expand Down Expand Up @@ -399,6 +433,12 @@ public HddsProtos.Pipeline getProtobufMessage(int clientVersion, Set<DatanodeDet
builder.setSuggestedLeaderDatanodeID(suggestedLeaderId.toProto());
}

if (supportedStorageTier != null) {
for (StorageTier storageTier : supportedStorageTier) {
builder.addSupportedStorageTier(storageTier.toProto());
}
}

// To save the message size on wire, only transfer the node order based on
// network topology
if (!nodesInOrder.isEmpty()) {
Expand Down Expand Up @@ -488,6 +528,10 @@ public static Builder toBuilder(HddsProtos.Pipeline pipeline) {
HddsProtos.UUID uuid = pipeline.getSuggestedLeaderID();
suggestedLeaderId = DatanodeID.of(uuid);
}
List<StorageTier> supportedStorageTier = new ArrayList<>();
for (HddsProtos.StorageTierProto storageTierProto : pipeline.getSupportedStorageTierList()) {
supportedStorageTier.add(StorageTier.fromProto(storageTierProto));
}

final ReplicationConfig config = ReplicationConfig
.fromProto(pipeline.getType(), pipeline.getFactor(),
Expand All @@ -501,6 +545,7 @@ public static Builder toBuilder(HddsProtos.Pipeline pipeline) {
.setLeaderId(leaderId)
.setSuggestedLeaderId(suggestedLeaderId)
.setNodeOrder(pipeline.getMemberOrdersList())
.setSupportedStorageTier(supportedStorageTier)
.setCreateTimestamp(pipeline.getCreationTimeStamp());
}

Expand Down Expand Up @@ -548,6 +593,7 @@ public String toString() {
b.append(']')
.append(", ReplicationConfig: ").append(replicationConfig)
.append(", State:").append(getPipelineState())
.append(", SupportStorageTier:").append(getSupportedStorageTier())
.append(", leaderId:").append(leaderId != null ? leaderId.toString() : "")
.append(", CreationTimestamp").append(getCreationTimestamp().atZone(ZoneId.systemDefault()))
.append('}');
Expand All @@ -572,6 +618,7 @@ public static final class Builder {
private Instant creationTimestamp = null;
private DatanodeID suggestedLeaderId = null;
private Map<DatanodeDetails, Integer> replicaIndexes = ImmutableMap.of();
private List<StorageTier> supportedStorageTier;

private Builder() { }

Expand All @@ -594,6 +641,7 @@ private Builder(Pipeline pipeline) {
}
replicaIndexes = b.build();
}
this.supportedStorageTier = pipeline.getSupportedStorageTier();
}

public Builder setId(DatanodeID datanodeID) {
Expand Down Expand Up @@ -676,6 +724,11 @@ public Builder setReplicaIndexes(Map<DatanodeDetails, Integer> indexes) {
return this;
}

public Builder setSupportedStorageTier(List<StorageTier> supportedStorageTier) {
this.supportedStorageTier = supportedStorageTier;
return this;
}

public Pipeline build() {
Objects.requireNonNull(id, "id == null");
Objects.requireNonNull(replicationConfig, "replicationConfig == null");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.client;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.fs.StorageType;
import org.junit.jupiter.api.Test;

/**
* Provides {@link StorageTierUtil} factory methods for testing.
*/
class StorageTierUtilTest {
@Test
void testFindSupportedStorageTiers() {
assertStorageTiers(createDnStorageTypes(
setOf(StorageType.SSD),
setOf(StorageType.SSD),
setOf(StorageType.SSD)),
1, setOf(StorageTier.SSD));

assertStorageTiers(createDnStorageTypes(
setOf(StorageType.SSD, StorageType.DISK),
setOf(StorageType.SSD, StorageType.DISK),
setOf(StorageType.SSD)),
1, setOf(StorageTier.SSD));

assertStorageTiers(createDnStorageTypes(
setOf(StorageType.SSD, StorageType.DISK),
setOf(StorageType.SSD, StorageType.DISK),
setOf(StorageType.SSD, StorageType.DISK)),
2, setOf(StorageTier.SSD, StorageTier.DISK));

assertStorageTiers(createDnStorageTypes(
setOf(StorageType.SSD, StorageType.DISK, StorageType.ARCHIVE),
setOf(StorageType.SSD, StorageType.DISK),
setOf(StorageType.SSD, StorageType.DISK)),
2, setOf(StorageTier.SSD, StorageTier.DISK));

assertStorageTiers(createDnStorageTypes(
setOf(StorageType.SSD, StorageType.DISK, StorageType.ARCHIVE),
setOf(StorageType.SSD, StorageType.DISK, StorageType.ARCHIVE),
setOf(StorageType.SSD, StorageType.DISK, StorageType.ARCHIVE)),
3, setOf(StorageTier.SSD, StorageTier.DISK, StorageTier.ARCHIVE));

assertStorageTiers(createDnStorageTypes(
setOf(StorageType.SSD, StorageType.DISK)),
2, setOf(StorageTier.SSD, StorageTier.DISK));

assertStorageTiers(createDnStorageTypes(
setOf(StorageType.SSD)),
1, setOf(StorageTier.SSD));

assertStorageTiers(createDnStorageTypes(
setOf(StorageType.DISK)),
1, setOf(StorageTier.DISK));

assertStorageTiers(createDnStorageTypes(
setOf(StorageType.ARCHIVE)),
1, setOf(StorageTier.ARCHIVE));

assertStorageTiers(createDnStorageTypes(
setOf(StorageType.SSD, StorageType.DISK),
setOf(StorageType.SSD, StorageType.DISK),
setOf(StorageType.SSD, StorageType.DISK),
setOf(StorageType.SSD, StorageType.DISK),
setOf(StorageType.SSD, StorageType.DISK)),
2, setOf(StorageTier.SSD, StorageTier.DISK));
}

@Test
void testFindInvalidStorageTiers() {
assertStorageTiers(createDnStorageTypes(), 0, new HashSet<>());

assertStorageTiers(createDnStorageTypes(
setOf(StorageType.DISK),
setOf(StorageType.SSD),
setOf(StorageType.SSD)),
0, new HashSet<>());

assertStorageTiers(createDnStorageTypes(
setOf(StorageType.DISK),
setOf(StorageType.DISK),
setOf(StorageType.SSD)),
0, new HashSet<>());

assertStorageTiers(createDnStorageTypes(
setOf(StorageType.ARCHIVE),
setOf(StorageType.DISK),
setOf(StorageType.SSD)),
0, new HashSet<>());

assertStorageTiers(createDnStorageTypes(
setOf(StorageType.RAM_DISK),
setOf(StorageType.SSD),
setOf(StorageType.SSD)),
0, new HashSet<>());

assertStorageTiers(createDnStorageTypes(
setOf(StorageType.RAM_DISK),
setOf(StorageType.RAM_DISK),
setOf(StorageType.RAM_DISK)),
0, new HashSet<>());

assertStorageTiers(createDnStorageTypes(
setOf(StorageType.PROVIDED),
setOf(StorageType.PROVIDED),
setOf(StorageType.PROVIDED)),
0, new HashSet<>());
}

private void assertStorageTiers(List<Set<StorageType>> dnStorageTypes,
int expectedSize, Set<StorageTier> expectedTiers) {
List<StorageTier> supportedTiers =
StorageTierUtil.findSupportedStorageTiers(dnStorageTypes);
HashSet<StorageTier> tiers = new HashSet<>(supportedTiers);
assertEquals(expectedSize, tiers.size());
assertEquals(expectedTiers, tiers);
}

private List<Set<StorageType>> createDnStorageTypes(Set<StorageType>... storageSets) {
return Arrays.asList(storageSets);
}

private Set<StorageType> setOf(StorageType... types) {
return new HashSet<>(Arrays.asList(types));
}

private Set<StorageTier> setOf(StorageTier... tiers) {
return new HashSet<>(Arrays.asList(tiers));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,31 +132,6 @@ public long getFsAvailable() {
return fsAvailable;
}

public static StorageType getStorageType(StorageTypeProto proto) throws
IllegalArgumentException {
StorageType storageType;
switch (proto) {
case SSD:
storageType = StorageType.SSD;
break;
case DISK:
storageType = StorageType.DISK;
break;
case ARCHIVE:
storageType = StorageType.ARCHIVE;
break;
case PROVIDED:
storageType = StorageType.PROVIDED;
break;
case RAM_DISK:
storageType = StorageType.RAM_DISK;
break;
default:
throw new IllegalArgumentException("Illegal Storage Type specified: " + proto);
}
return storageType;
}

/**
* Returns the StorageReportProto protoBuf message for the Storage Location
* report.
Expand Down
Loading
Loading