PeerTable.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.p2p.discovery.internal;
import static java.util.Comparator.comparingInt;
import static java.util.stream.Collectors.toList;
import org.hyperledger.besu.crypto.Hash;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryStatus;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable.AddResult.AddOutcome;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Stream;
import com.google.common.hash.BloomFilter;
import org.apache.tuweni.bytes.Bytes;
/**
* Implements a Kademlia routing table based on k-buckets with a keccak-256 XOR-based distance
* metric.
*/
public class PeerTable {
private static final int N_BUCKETS = 256;
private static final int DEFAULT_BUCKET_SIZE = 16;
private static final int BLOOM_FILTER_REGENERATION_THRESHOLD = 50; // evictions
private final Bucket[] table;
private final Bytes keccak256;
private final int maxEntriesCnt;
private final Map<Bytes, Integer> distanceCache;
private BloomFilter<Bytes> idBloom;
private int evictionCnt = 0;
/**
* Builds a new peer table, where distance is calculated using the provided nodeId as a baseline.
*
* @param nodeId The ID of the node where this peer table is stored.
*/
public PeerTable(final Bytes nodeId) {
this.keccak256 = Hash.keccak256(nodeId);
this.table =
Stream.generate(() -> new Bucket(DEFAULT_BUCKET_SIZE))
.limit(N_BUCKETS + 1)
.toArray(Bucket[]::new);
this.distanceCache = new ConcurrentHashMap<>();
this.maxEntriesCnt = N_BUCKETS * DEFAULT_BUCKET_SIZE;
// A bloom filter with 4096 expected insertions of 64-byte keys with a 0.1% false positive
// probability yields a memory footprint of ~7.5kb.
buildBloomFilter();
}
/**
* Returns the table's representation of a peer, if it exists.
*
* @param peer The peer to query.
* @return The stored representation.
*/
public Optional<DiscoveryPeer> get(final PeerId peer) {
final Bytes peerId = peer.getId();
if (!idBloom.mightContain(peerId)) {
return Optional.empty();
}
final int distance = distanceFrom(peer);
return table[distance].getAndTouch(peerId);
}
/**
* Attempts to add the provided peer to the peer table, and returns an {@link AddResult}
* signalling one of three outcomes.
*
* <h3>Possible outcomes:</h3>
*
* <ul>
* <li>the operation succeeded and the peer was added to the corresponding k-bucket.
* <li>the operation failed because the k-bucket was full, in which case a candidate is proposed
* for eviction.
* <li>the operation failed because the peer already existed.
* </ul>
*
* @param peer The peer to add.
* @return An object indicating the outcome of the operation.
* @see AddOutcome
*/
public AddResult tryAdd(final DiscoveryPeer peer) {
final Bytes id = peer.getId();
final int distance = distanceFrom(peer);
// Safeguard against adding ourselves to the peer table.
if (distance == 0) {
return AddResult.self();
}
final Bucket bucket = table[distance];
// We add the peer, and two things can happen: (1) either we get an empty optional (peer was
// added successfully,
// or it was already there), or (2) we get a filled optional, in which case the bucket is full
// and an eviction
// candidate is proposed. The Bucket#add method will raise an exception if the peer already
// existed.
final Optional<DiscoveryPeer> res;
try {
res = bucket.add(peer);
} catch (final IllegalArgumentException ex) {
return AddResult.existed();
}
if (!res.isPresent()) {
idBloom.put(id);
distanceCache.put(id, distance);
return AddResult.added();
}
return res.map(AddResult::bucketFull).get();
}
/**
* Evicts a peer from the underlying table.
*
* @param peer The peer to evict.
* @return Whether the peer existed, and hence the eviction took place.
*/
public EvictResult tryEvict(final PeerId peer) {
final Bytes id = peer.getId();
final int distance = distanceFrom(peer);
if (distance == 0) {
return EvictResult.self();
}
distanceCache.remove(id);
if (table[distance].getPeers().isEmpty()) {
return EvictResult.absent();
}
final boolean evicted = table[distance].evict(peer);
if (evicted) {
evictionCnt++;
} else {
return EvictResult.absent();
}
// Trigger the bloom filter regeneration if needed.
if (evictionCnt >= BLOOM_FILTER_REGENERATION_THRESHOLD) {
ForkJoinPool.commonPool().execute(this::buildBloomFilter);
}
return EvictResult.evicted();
}
private void buildBloomFilter() {
final BloomFilter<Bytes> bf =
BloomFilter.create((id, val) -> val.putBytes(id.toArray()), maxEntriesCnt, 0.001);
streamAllPeers().map(Peer::getId).forEach(bf::put);
this.evictionCnt = 0;
this.idBloom = bf;
}
/**
* Returns the <code>limit</code> peers (at most) bonded closest to the provided target, based on
* the XOR distance between the keccak-256 hash of the ID and the keccak-256 hash of the target.
*
* @param target The target node ID.
* @param limit The amount of results to return.
* @return The <code>limit</code> closest peers, at most.
*/
public List<DiscoveryPeer> nearestBondedPeers(final Bytes target, final int limit) {
final Bytes keccak256 = Hash.keccak256(target);
return streamAllPeers()
.filter(p -> p.getStatus() == PeerDiscoveryStatus.BONDED)
.sorted(
comparingInt((peer) -> PeerDistanceCalculator.distance(peer.keccak256(), keccak256)))
.limit(limit)
.collect(toList());
}
public Stream<DiscoveryPeer> streamAllPeers() {
return Arrays.stream(table).flatMap(e -> e.getPeers().stream());
}
/**
* Calculates the XOR distance between the keccak-256 hashes of our node ID and the provided
* {@link DiscoveryPeer}.
*
* @param peer The target peer.
* @return The distance.
*/
private int distanceFrom(final PeerId peer) {
final Integer distance = distanceCache.get(peer.getId());
return distance == null
? PeerDistanceCalculator.distance(keccak256, peer.keccak256())
: distance;
}
/** A class that encapsulates the result of a peer addition to the table. */
public static class AddResult {
/** The outcome of the operation. */
public enum AddOutcome {
/** The peer was added successfully to its corresponding k-bucket. */
ADDED,
/** The bucket for this peer was full. An eviction candidate must be proposed. */
BUCKET_FULL,
/** The peer already existed, hence it was not overwritten. */
ALREADY_EXISTED,
/** The caller requested to add ourselves. */
SELF
}
private final AddOutcome outcome;
private final Peer evictionCandidate;
private AddResult(final AddOutcome outcome, final Peer evictionCandidate) {
this.outcome = outcome;
this.evictionCandidate = evictionCandidate;
}
static AddResult added() {
return new AddResult(AddOutcome.ADDED, null);
}
static AddResult bucketFull(final Peer evictionCandidate) {
return new AddResult(AddOutcome.BUCKET_FULL, evictionCandidate);
}
static AddResult existed() {
return new AddResult(AddOutcome.ALREADY_EXISTED, null);
}
static AddResult self() {
return new AddResult(AddOutcome.SELF, null);
}
public AddOutcome getOutcome() {
return outcome;
}
public Peer getEvictionCandidate() {
return evictionCandidate;
}
}
static class EvictResult {
public enum EvictOutcome {
EVICTED,
ABSENT,
SELF
}
private final EvictOutcome outcome;
private EvictResult(final EvictOutcome outcome) {
this.outcome = outcome;
}
static EvictResult evicted() {
return new EvictResult(EvictOutcome.EVICTED);
}
static EvictResult absent() {
return new EvictResult(EvictOutcome.ABSENT);
}
static EvictResult self() {
return new EvictResult(EvictOutcome.SELF);
}
EvictOutcome getOutcome() {
return outcome;
}
}
}