Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.hadoop.hdds.scm.pipeline;

import com.fasterxml.jackson.annotation.JsonIgnore;
import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.function.Supplier;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
import org.apache.hadoop.hdds.utils.db.UuidCodec;
import org.apache.hadoop.ozone.util.UUIDUtil;
import org.apache.ratis.util.MemoizedSupplier;

/**
Expand Down Expand Up @@ -52,6 +54,19 @@ public static PipelineID randomId() {
return new PipelineID(UUID.randomUUID());
}

/**
* Generates a random PipelineID using {@link java.util.Random} instead of
* {@link java.security.SecureRandom}. This avoids contention on the shared
* {@code SecureRandom} instance and is suitable for non-sensitive,
* throwaway IDs such as read pipelines, where predictability of the next
* ID has no security impact.
*/
public static PipelineID insecureRandomId() {
byte[] bytes = UUIDUtil.insecureRandomUUIDBytes();
ByteBuffer buf = ByteBuffer.wrap(bytes);
return new PipelineID(new UUID(buf.getLong(), buf.getLong()));
}

public static PipelineID valueOf(UUID id) {
return new PipelineID(id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,27 @@
package org.apache.hadoop.ozone.util;

import java.security.SecureRandom;
import java.util.Random;
import java.util.function.Consumer;

/**
* Helper methods to deal with random UUIDs.
*/
public final class UUIDUtil {
private static final ThreadLocal<SecureRandom> GENERATOR = ThreadLocal.withInitial(SecureRandom::new);
private static final ThreadLocal<Random> INSECURE_GENERATOR = ThreadLocal.withInitial(Random::new);

public static byte[] randomUUIDBytes() {
return getUUIDBytes(GENERATOR.get()::nextBytes);
}

public static byte[] insecureRandomUUIDBytes() {
return getUUIDBytes(INSECURE_GENERATOR.get()::nextBytes);
}

private static byte[] getUUIDBytes(Consumer<byte[]> generator) {
final byte[] bytes = new byte[16];
GENERATOR.get().nextBytes(bytes);
generator.accept(bytes);
// See RFC 4122 section 4.4
bytes[6] &= 0x0f;
bytes[6] |= 0x40;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.hadoop.hdds.scm.container;

import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.builder.CompareToBuilder;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
Expand Down Expand Up @@ -165,6 +168,12 @@ public int compareTo(ContainerReplica that) {
.build();
}

public static List<DatanodeDetails> toDatanodeDetailsList(Set<ContainerReplica> replicas) {
return replicas.stream()
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
}

/**
* Returns a new Builder to construct ContainerReplica.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ protected Pipeline create(ECReplicationConfig replicationConfig,
ecIndex++;
}

return createPipelineInternal(replicationConfig, nodes, dnIndexes);
return newPipelineBuilder(replicationConfig, nodes)
.setId(PipelineID.randomId())
.setReplicaIndexes(dnIndexes)
.build();
}

@Override
Expand All @@ -130,17 +133,11 @@ public Pipeline createForRead(

dns.sort(Comparator.comparing(nodeStatusMap::get, CREATE_FOR_READ_COMPARATOR));

return createPipelineInternal(replicationConfig, dns, map);
}

private Pipeline createPipelineInternal(ECReplicationConfig repConfig,
List<DatanodeDetails> dns, Map<DatanodeDetails, Integer> indexes) {
return Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setState(Pipeline.PipelineState.ALLOCATED)
.setReplicationConfig(repConfig)
.setNodes(dns)
.setReplicaIndexes(indexes)
// Use insecureRandomId for throwaway read pipeline IDs to avoid
// contention on the shared SecureRandom instance.
return newPipelineBuilder(replicationConfig, dns)
.setId(PipelineID.insecureRandomId())
.setReplicaIndexes(map)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,11 @@ List<DatanodeDetails> pickAllNodesNotUsed(
}
return dns;
}

protected Pipeline.Builder newPipelineBuilder(ReplicationConfig replicationConfig, List<DatanodeDetails> nodes) {
return Pipeline.newBuilder()
.setNodes(nodes)
.setReplicationConfig(replicationConfig)
.setState(Pipeline.PipelineState.ALLOCATED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
Expand Down Expand Up @@ -182,13 +181,9 @@ public synchronized Pipeline create(RatisReplicationConfig replicationConfig,

DatanodeDetails suggestedLeader = leaderChoosePolicy.chooseLeader(dns);

Pipeline pipeline = Pipeline.newBuilder()
Pipeline pipeline = newPipelineBuilder(RatisReplicationConfig.getInstance(factor), dns)
.setId(PipelineID.randomId())
.setState(PipelineState.ALLOCATED)
.setReplicationConfig(RatisReplicationConfig.getInstance(factor))
.setNodes(dns)
.setSuggestedLeaderId(
suggestedLeader != null ? suggestedLeader.getID() : null)
.setSuggestedLeaderId(suggestedLeader != null ? suggestedLeader.getID() : null)
.build();

// Send command to datanodes to create pipeline
Expand All @@ -213,22 +208,20 @@ public synchronized Pipeline create(RatisReplicationConfig replicationConfig,
@Override
public Pipeline create(RatisReplicationConfig replicationConfig,
List<DatanodeDetails> nodes) {
return Pipeline.newBuilder()
return newPipelineBuilder(replicationConfig, nodes)
.setId(PipelineID.randomId())
.setState(PipelineState.ALLOCATED)
.setReplicationConfig(replicationConfig)
.setNodes(nodes)
.build();
}

@Override
public Pipeline createForRead(
RatisReplicationConfig replicationConfig,
Set<ContainerReplica> replicas) {
return create(replicationConfig, replicas
.stream()
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList()));
// Use insecureRandomId for throwaway read pipeline IDs to avoid
// contention on the shared SecureRandom instance.
return newPipelineBuilder(replicationConfig, ContainerReplica.toDatanodeDetailsList(replicas))
.setId(PipelineID.insecureRandomId())
.build();
}

private List<DatanodeDetails> filterPipelineEngagement() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
Expand Down Expand Up @@ -61,38 +61,37 @@ public Pipeline create(StandaloneReplicationConfig replicationConfig,
}

Collections.shuffle(dns);
return Pipeline.newBuilder()
return newPipelineBuilder(replicationConfig, dns.subList(0, replicationConfig.getReplicationFactor().getNumber()))
.setId(PipelineID.randomId())
.setState(PipelineState.OPEN)
.setReplicationConfig(replicationConfig)
.setNodes(dns.subList(0,
replicationConfig.getReplicationFactor().getNumber()))
.build();
}

@Override
public Pipeline create(StandaloneReplicationConfig replicationConfig,
List<DatanodeDetails> nodes) {
return Pipeline.newBuilder()
return newPipelineBuilder(replicationConfig, nodes)
.setId(PipelineID.randomId())
.setState(PipelineState.OPEN)
.setReplicationConfig(replicationConfig)
.setNodes(nodes)
.build();
}

@Override
public Pipeline createForRead(StandaloneReplicationConfig replicationConfig,
Set<ContainerReplica> replicas) {
return create(replicationConfig, replicas
.stream()
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList()));
// Use insecureRandomId for throwaway read pipeline IDs to avoid
// contention on the shared SecureRandom instance.
return newPipelineBuilder(replicationConfig, ContainerReplica.toDatanodeDetailsList(replicas))
.setId(PipelineID.insecureRandomId())
.build();
}

@Override
public void close(Pipeline pipeline) throws IOException {

}

@Override
protected Pipeline.Builder newPipelineBuilder(ReplicationConfig replicationConfig, List<DatanodeDetails> nodes) {
return super.newPipelineBuilder(replicationConfig, nodes)
.setState(PipelineState.OPEN);
}
}