EthPeer.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager;
import static com.google.common.base.Preconditions.checkArgument;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.SnapProtocol;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.EthPV63;
import org.hyperledger.besu.ethereum.eth.messages.EthPV65;
import org.hyperledger.besu.ethereum.eth.messages.GetBlockBodiesMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetBlockHeadersMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetNodeDataMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetPooledTransactionsMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.GetAccountRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.GetByteCodesMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.GetStorageRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.GetTrieNodesMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.plugin.services.permissioning.NodeMessagePermissioningProvider;
import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import com.google.common.annotations.VisibleForTesting;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EthPeer implements Comparable<EthPeer> {
private static final Logger LOG = LoggerFactory.getLogger(EthPeer.class);
private static final int MAX_OUTSTANDING_REQUESTS = 5;
private PeerConnection connection;
private final int maxTrackedSeenBlocks = 300;
private final Set<Hash> knownBlocks =
Collections.newSetFromMap(
Collections.synchronizedMap(
new LinkedHashMap<>(16, 0.75f, true) {
@Override
protected boolean removeEldestEntry(final Map.Entry<Hash, Boolean> eldest) {
return size() > maxTrackedSeenBlocks;
}
}));
private final Bytes localNodeId;
private Optional<BlockHeader> checkpointHeader = Optional.empty();
private final String protocolName;
private final int maxMessageSize;
private final Clock clock;
private final List<NodeMessagePermissioningProvider> permissioningProviders;
private final ChainState chainHeadState = new ChainState();
private final AtomicBoolean readyForRequests = new AtomicBoolean(false);
private final AtomicBoolean statusHasBeenReceivedFromPeer = new AtomicBoolean(false);
private final AtomicBoolean fullyValidated = new AtomicBoolean(false);
private final AtomicInteger lastProtocolVersion = new AtomicInteger(0);
private volatile long lastRequestTimestamp = 0;
private final Map<String, Map<Integer, RequestManager>> requestManagers;
private final AtomicReference<Consumer<EthPeer>> onStatusesExchanged = new AtomicReference<>();
private final PeerReputation reputation = new PeerReputation();
private final Map<PeerValidator, Boolean> validationStatus = new ConcurrentHashMap<>();
private final Bytes id;
private static final Map<Integer, Integer> roundMessages;
static {
roundMessages = new HashMap<>();
roundMessages.put(EthPV62.BLOCK_HEADERS, EthPV62.GET_BLOCK_HEADERS);
roundMessages.put(EthPV62.BLOCK_BODIES, EthPV62.GET_BLOCK_BODIES);
roundMessages.put(EthPV63.RECEIPTS, EthPV63.GET_RECEIPTS);
roundMessages.put(EthPV63.NODE_DATA, EthPV63.GET_NODE_DATA);
roundMessages.put(EthPV65.POOLED_TRANSACTIONS, EthPV65.GET_POOLED_TRANSACTIONS);
roundMessages.put(SnapV1.ACCOUNT_RANGE, SnapV1.GET_ACCOUNT_RANGE);
roundMessages.put(SnapV1.STORAGE_RANGE, SnapV1.GET_STORAGE_RANGE);
roundMessages.put(SnapV1.BYTECODES, SnapV1.GET_BYTECODES);
roundMessages.put(SnapV1.TRIE_NODES, SnapV1.GET_TRIE_NODES);
}
@VisibleForTesting
public EthPeer(
final PeerConnection connection,
final String protocolName,
final Consumer<EthPeer> onStatusesExchanged,
final List<PeerValidator> peerValidators,
final int maxMessageSize,
final Clock clock,
final List<NodeMessagePermissioningProvider> permissioningProviders,
final Bytes localNodeId) {
this.connection = connection;
this.protocolName = protocolName;
this.maxMessageSize = maxMessageSize;
this.clock = clock;
this.permissioningProviders = permissioningProviders;
this.onStatusesExchanged.set(onStatusesExchanged);
peerValidators.forEach(peerValidator -> validationStatus.put(peerValidator, false));
fullyValidated.set(peerValidators.isEmpty());
this.requestManagers = new ConcurrentHashMap<>();
this.localNodeId = localNodeId;
this.id = connection.getPeer().getId();
initEthRequestManagers();
initSnapRequestManagers();
}
private void initEthRequestManagers() {
final boolean supportsRequestId =
getAgreedCapabilities().stream().anyMatch(EthProtocol::isEth66Compatible);
// eth protocol
requestManagers.put(
protocolName,
Map.ofEntries(
Map.entry(
EthPV62.GET_BLOCK_HEADERS,
new RequestManager(this, supportsRequestId, protocolName)),
Map.entry(
EthPV62.GET_BLOCK_BODIES,
new RequestManager(this, supportsRequestId, protocolName)),
Map.entry(
EthPV63.GET_RECEIPTS, new RequestManager(this, supportsRequestId, protocolName)),
Map.entry(
EthPV63.GET_NODE_DATA, new RequestManager(this, supportsRequestId, protocolName)),
Map.entry(
EthPV65.GET_POOLED_TRANSACTIONS,
new RequestManager(this, supportsRequestId, protocolName))));
}
private void initSnapRequestManagers() {
// snap protocol
requestManagers.put(
SnapProtocol.NAME,
Map.ofEntries(
Map.entry(SnapV1.GET_ACCOUNT_RANGE, new RequestManager(this, true, SnapProtocol.NAME)),
Map.entry(SnapV1.GET_STORAGE_RANGE, new RequestManager(this, true, SnapProtocol.NAME)),
Map.entry(SnapV1.GET_BYTECODES, new RequestManager(this, true, SnapProtocol.NAME)),
Map.entry(SnapV1.GET_TRIE_NODES, new RequestManager(this, true, SnapProtocol.NAME))));
}
public void markValidated(final PeerValidator validator) {
if (!validationStatus.containsKey(validator)) {
throw new IllegalArgumentException("Attempt to update unknown validation status");
}
validationStatus.put(validator, true);
fullyValidated.set(validationStatus.values().stream().allMatch(b -> b));
}
/**
* Check if this peer has been fully validated.
*
* @return {@code true} if all peer validation logic has run and successfully validated this peer
*/
public boolean isFullyValidated() {
return fullyValidated.get();
}
public boolean isDisconnected() {
return connection.isDisconnected();
}
public long addChainEstimatedHeightListener(final ChainState.EstimatedHeightListener listener) {
return chainHeadState.addEstimatedHeightListener(listener);
}
public void removeChainEstimatedHeightListener(final long listenerId) {
chainHeadState.removeEstimatedHeightListener(listenerId);
}
public void recordRequestTimeout(final int requestCode) {
LOG.atDebug()
.setMessage("Timed out while waiting for response from peer {}")
.addArgument(this::getLoggableId)
.log();
LOG.trace("Timed out while waiting for response from peer {}", this);
reputation.recordRequestTimeout(requestCode).ifPresent(this::disconnect);
}
public void recordUselessResponse(final String requestType) {
LOG.atTrace()
.setMessage("Received useless response for request type {} from peer {}")
.addArgument(requestType)
.addArgument(this::getLoggableId)
.log();
reputation.recordUselessResponse(System.currentTimeMillis()).ifPresent(this::disconnect);
}
public void recordUsefulResponse() {
reputation.recordUsefulResponse();
}
public void disconnect(final DisconnectReason reason) {
connection.disconnect(reason);
}
public RequestManager.ResponseStream send(final MessageData messageData) throws PeerNotConnected {
return send(messageData, this.protocolName);
}
public RequestManager.ResponseStream send(
final MessageData messageData, final String protocolName) throws PeerNotConnected {
return send(messageData, protocolName, this.connection);
}
/**
* This method is only used for sending the status message, as it is possible that we have
* multiple connections to the same peer at that time.
*
* @param messageData the data to send
* @param protocolName the protocol to use for sending
* @param connectionToUse the connection to use for sending
* @return the response stream from the peer
* @throws PeerNotConnected if the peer is not connected
*/
public RequestManager.ResponseStream send(
final MessageData messageData,
final String protocolName,
final PeerConnection connectionToUse)
throws PeerNotConnected {
if (connectionToUse.getAgreedCapabilities().stream()
.noneMatch(capability -> capability.getName().equalsIgnoreCase(protocolName))) {
LOG.atDebug()
.setMessage("Protocol {} unavailable for this peer {}")
.addArgument(protocolName)
.addArgument(this.getLoggableId())
.log();
return null;
}
if (permissioningProviders.stream()
.anyMatch(
p -> !p.isMessagePermitted(connectionToUse.getRemoteEnode(), messageData.getCode()))) {
LOG.info(
"Permissioning blocked sending of message code {} to {}",
messageData.getCode(),
this.getLoggableId());
if (LOG.isDebugEnabled()) {
LOG.debug(
"Permissioning blocked by providers {}",
permissioningProviders.stream()
.filter(
p ->
!p.isMessagePermitted(
connectionToUse.getRemoteEnode(), messageData.getCode())));
}
return null;
}
// Check message size is within limits
if (messageData.getSize() > maxMessageSize) {
// This is a bug or else a misconfiguration of the max message size.
LOG.error(
"Sending {} message to peer ({}) which exceeds local message size limit of {} bytes. Message code: {}, Message Size: {}",
protocolName,
this,
maxMessageSize,
messageData.getCode(),
messageData.getSize());
}
if (requestManagers.containsKey(protocolName)) {
final Map<Integer, RequestManager> managers = this.requestManagers.get(protocolName);
if (managers.containsKey(messageData.getCode())) {
return sendRequest(managers.get(messageData.getCode()), messageData);
}
}
connectionToUse.sendForProtocol(protocolName, messageData);
return null;
}
public RequestManager.ResponseStream getHeadersByHash(
final Hash hash, final int maxHeaders, final int skip, final boolean reverse)
throws PeerNotConnected {
final GetBlockHeadersMessage message =
GetBlockHeadersMessage.create(hash, maxHeaders, skip, reverse);
final RequestManager requestManager =
requestManagers.get(protocolName).get(EthPV62.GET_BLOCK_HEADERS);
return sendRequest(requestManager, message);
}
public RequestManager.ResponseStream getHeadersByNumber(
final long blockNumber, final int maxHeaders, final int skip, final boolean reverse)
throws PeerNotConnected {
final GetBlockHeadersMessage message =
GetBlockHeadersMessage.create(blockNumber, maxHeaders, skip, reverse);
return sendRequest(requestManagers.get(protocolName).get(EthPV62.GET_BLOCK_HEADERS), message);
}
public RequestManager.ResponseStream getBodies(final List<Hash> blockHashes)
throws PeerNotConnected {
final GetBlockBodiesMessage message = GetBlockBodiesMessage.create(blockHashes);
return sendRequest(requestManagers.get(protocolName).get(EthPV62.GET_BLOCK_BODIES), message);
}
public RequestManager.ResponseStream getReceipts(final List<Hash> blockHashes)
throws PeerNotConnected {
final GetReceiptsMessage message = GetReceiptsMessage.create(blockHashes);
return sendRequest(requestManagers.get(protocolName).get(EthPV63.GET_RECEIPTS), message);
}
public RequestManager.ResponseStream getNodeData(final Iterable<Hash> nodeHashes)
throws PeerNotConnected {
final GetNodeDataMessage message = GetNodeDataMessage.create(nodeHashes);
return sendRequest(requestManagers.get(protocolName).get(EthPV63.GET_NODE_DATA), message);
}
public RequestManager.ResponseStream getPooledTransactions(final List<Hash> hashes)
throws PeerNotConnected {
final GetPooledTransactionsMessage message = GetPooledTransactionsMessage.create(hashes);
return sendRequest(
requestManagers.get(protocolName).get(EthPV65.GET_POOLED_TRANSACTIONS), message);
}
public RequestManager.ResponseStream getSnapAccountRange(
final Hash stateRoot, final Bytes32 startKeyHash, final Bytes32 endKeyHash)
throws PeerNotConnected {
final GetAccountRangeMessage getAccountRangeMessage =
GetAccountRangeMessage.create(stateRoot, startKeyHash, endKeyHash);
getAccountRangeMessage.setRootHash(Optional.of(stateRoot));
return sendRequest(
requestManagers.get(SnapProtocol.NAME).get(SnapV1.GET_ACCOUNT_RANGE),
getAccountRangeMessage);
}
public RequestManager.ResponseStream getSnapStorageRange(
final Hash stateRoot,
final List<Bytes32> accountHashes,
final Bytes32 startKeyHash,
final Bytes32 endKeyHash)
throws PeerNotConnected {
final GetStorageRangeMessage getStorageRangeMessage =
GetStorageRangeMessage.create(stateRoot, accountHashes, startKeyHash, endKeyHash);
getStorageRangeMessage.setRootHash(Optional.of(stateRoot));
return sendRequest(
requestManagers.get(SnapProtocol.NAME).get(SnapV1.GET_STORAGE_RANGE),
getStorageRangeMessage);
}
public RequestManager.ResponseStream getSnapBytecode(
final Hash stateRoot, final List<Bytes32> codeHashes) throws PeerNotConnected {
final GetByteCodesMessage getByteCodes = GetByteCodesMessage.create(codeHashes);
getByteCodes.setRootHash(Optional.of(stateRoot));
return sendRequest(
requestManagers.get(SnapProtocol.NAME).get(SnapV1.GET_BYTECODES), getByteCodes);
}
public RequestManager.ResponseStream getSnapTrieNode(
final Hash stateRoot, final List<List<Bytes>> paths) throws PeerNotConnected {
final GetTrieNodesMessage getTrieNodes = GetTrieNodesMessage.create(stateRoot, paths);
getTrieNodes.setRootHash(Optional.of(stateRoot));
return sendRequest(
requestManagers.get(SnapProtocol.NAME).get(SnapV1.GET_TRIE_NODES), getTrieNodes);
}
private RequestManager.ResponseStream sendRequest(
final RequestManager requestManager, final MessageData messageData) throws PeerNotConnected {
lastRequestTimestamp = clock.millis();
return requestManager.dispatchRequest(
msgData -> connection.sendForProtocol(requestManager.getProtocolName(), msgData),
messageData);
}
/**
* Determines the validity of a message received from a peer. A message is considered valid if
* either of the following conditions are met: 1) The message is a request type message (e.g.
* GET_BLOCK_HEADERS), or 2) The message is a response type message (e.g. BLOCK_HEADERS), the node
* has made at least 1 request for that type of message (i.e. it has sent at least 1
* GET_BLOCK_HEADERS request), and it has at least 1 outstanding request of that type which it
* expects to receive a response for.
*
* @param message The message being validated
* @param protocolName The protocol type of the message
* @return true if the message is valid as per the above logic, otherwise false.
*/
public boolean validateReceivedMessage(final EthMessage message, final String protocolName) {
checkArgument(message.getPeer().equals(this), "Mismatched message sent to peer for dispatch");
return getRequestManager(protocolName, message.getData().getCode())
.map(requestManager -> requestManager.outstandingRequests() != 0)
.orElse(true);
}
/**
* Routes messages originating from this peer to listeners.
*
* @param ethMessage the Eth message to dispatch
* @param protocolName Specific protocol name if needed
*/
Optional<RequestManager> dispatch(final EthMessage ethMessage, final String protocolName) {
checkArgument(
ethMessage.getPeer().equals(this), "Mismatched Eth message sent to peer for dispatch");
final int messageCode = ethMessage.getData().getCode();
reputation.resetTimeoutCount(messageCode);
Optional<RequestManager> requestManager = getRequestManager(protocolName, messageCode);
requestManager.ifPresentOrElse(
localRequestManager -> localRequestManager.dispatchResponse(ethMessage),
() -> {
LOG.trace(
"Message {} not expected has just been received for protocol {}, {} ",
messageCode,
protocolName,
this);
});
return requestManager;
}
/**
* Routes messages originating from this peer to listeners.
*
* @param ethMessage the Eth message to dispatch
*/
void dispatch(final EthMessage ethMessage) {
dispatch(ethMessage, protocolName);
}
/**
* Attempt to get a request manager for a received response-type message e.g. BLOCK_HEADERS. If
* the message is a request-type message e.g. GET_BLOCK_HEADERS no request manager will exist so
* Optional.empty() will be returned.
*
* @param protocolName the type of protocol the message is for
* @param code the message code
* @return a request manager for the received response messsage, or Optional.empty() if this is a
* request message
*/
private Optional<RequestManager> getRequestManager(final String protocolName, final int code) {
if (requestManagers.containsKey(protocolName)) {
final Map<Integer, RequestManager> managers = requestManagers.get(protocolName);
final Integer requestCode = roundMessages.getOrDefault(code, -1);
if (managers.containsKey(requestCode)) {
return Optional.of(managers.get(requestCode));
}
}
return Optional.empty();
}
public Map<Integer, AtomicInteger> timeoutCounts() {
return reputation.timeoutCounts();
}
public PeerReputation getReputation() {
return reputation;
}
void handleDisconnect() {
LOG.trace("handleDisconnect - EthPeer {}", this);
requestManagers.forEach(
(protocolName, map) -> map.forEach((code, requestManager) -> requestManager.close()));
}
public void registerKnownBlock(final Hash hash) {
knownBlocks.add(hash);
}
public void registerStatusSent(final PeerConnection connection) {
synchronized (this) {
connection.setStatusSent();
maybeExecuteStatusesExchangedCallback(connection);
}
}
public void registerStatusReceived(
final Hash hash,
final Difficulty td,
final int protocolVersion,
final PeerConnection connection) {
chainHeadState.statusReceived(hash, td);
lastProtocolVersion.set(protocolVersion);
statusHasBeenReceivedFromPeer.set(true);
synchronized (this) {
connection.setStatusReceived();
maybeExecuteStatusesExchangedCallback(connection);
}
}
private void maybeExecuteStatusesExchangedCallback(final PeerConnection newConnection) {
synchronized (this) {
if (newConnection.getStatusExchanged()) {
if (!this.connection.equals(newConnection)) {
if (readyForRequests.get()) {
// We have two connections that are ready for requests, figure out which connection to
// keep
if (compareDuplicateConnections(this.connection, newConnection) > 0) {
LOG.trace("Changed connection from {} to {}", this.connection, newConnection);
this.connection = newConnection;
}
} else {
// use the new connection for now, as it is ready for requests, which the "old" one is
// not
this.connection = newConnection;
}
}
readyForRequests.set(true);
final Consumer<EthPeer> peerConsumer = onStatusesExchanged.getAndSet(null);
if (peerConsumer != null) {
LOG.trace("Status message exchange successful. {}", this);
peerConsumer.accept(this);
}
}
}
}
/**
* Wait until status has been received and verified before using a peer.
*
* @return true if the peer is ready to accept requests for data.
*/
public boolean readyForRequests() {
return readyForRequests.get();
}
/**
* True if the peer has sent its initial status message to us.
*
* @return true if the peer has sent its initial status message to us.
*/
boolean statusHasBeenReceived() {
return statusHasBeenReceivedFromPeer.get();
}
public boolean hasSeenBlock(final Hash hash) {
return knownBlocks.contains(hash);
}
/**
* Return This peer's current chain state.
*
* @return This peer's current chain state.
*/
public ChainState chainState() {
return chainHeadState;
}
public int getLastProtocolVersion() {
return lastProtocolVersion.get();
}
public String getProtocolName() {
return protocolName;
}
/**
* Return A read-only snapshot of this peer's current {@code chainState} }
*
* @return A read-only snapshot of this peer's current {@code chainState} }
*/
public ChainHeadEstimate chainStateSnapshot() {
return chainHeadState.getSnapshot();
}
public void registerHeight(final Hash blockHash, final long height) {
chainHeadState.update(blockHash, height);
}
public int outstandingRequests() {
return requestManagers.values().stream()
.flatMap(m -> m.values().stream())
.mapToInt(RequestManager::outstandingRequests)
.sum();
}
public long getLastRequestTimestamp() {
return lastRequestTimestamp;
}
public boolean hasAvailableRequestCapacity() {
return outstandingRequests() < MAX_OUTSTANDING_REQUESTS;
}
public Set<Capability> getAgreedCapabilities() {
return connection.getAgreedCapabilities();
}
public PeerConnection getConnection() {
return connection;
}
public Bytes nodeId() {
return connection.getPeerInfo().getNodeId();
}
public boolean hasSupportForMessage(final int messageCode) {
return getAgreedCapabilities().stream()
.anyMatch(cap -> EthProtocol.get().isValidMessageCode(cap.getVersion(), messageCode));
}
@Override
public String toString() {
return String.format(
"PeerId: %s %s, validated? %s, disconnected? %s, client: %s, %s, %s",
getLoggableId(),
reputation,
isFullyValidated(),
isDisconnected(),
connection.getPeerInfo().getClientId(),
connection,
connection.getPeer().getEnodeURLString());
}
@Nonnull
public String getLoggableId() {
// 8 bytes plus the 0x prefix is 18 characters
return nodeId().toString().substring(0, 18) + "...";
}
@Override
public int compareTo(final @Nonnull EthPeer ethPeer) {
final int repCompare = this.reputation.compareTo(ethPeer.reputation);
if (repCompare != 0) return repCompare;
final int headStateCompare =
Long.compare(
this.chainHeadState.getBestBlock().getNumber(),
ethPeer.chainHeadState.getBestBlock().getNumber());
if (headStateCompare != 0) return headStateCompare;
return getConnection().getPeerInfo().compareTo(ethPeer.getConnection().getPeerInfo());
}
public void setCheckpointHeader(final BlockHeader header) {
checkpointHeader = Optional.of(header);
}
public Optional<BlockHeader> getCheckpointHeader() {
return checkpointHeader;
}
public Bytes getId() {
return id;
}
/**
* Compares two connections to the same peer to determine which connection should be kept
*
* @param a The first connection
* @param b The second connection
* @return A negative value if {@code a} should be kept, a positive value is {@code b} should be
* kept
*/
private int compareDuplicateConnections(final PeerConnection a, final PeerConnection b) {
if (a.isDisconnected() != b.isDisconnected()) {
// One connection has failed - prioritize the one that hasn't failed
return a.isDisconnected() ? 1 : -1;
}
final Bytes peerId = a.getPeer().getId();
// peerId is the id of the other node
if (a.inboundInitiated() != b.inboundInitiated()) {
// If we have connections initiated in different directions, keep the connection initiated
// by the node with the lower id
if (localNodeId.compareTo(peerId) < 0) {
return a.inboundInitiated() ? 1 : -1;
} else {
return a.inboundInitiated() ? -1 : 1;
}
}
// Otherwise, keep older connection
LOG.trace("comparing timestamps " + a.getInitiatedAt() + " with " + b.getInitiatedAt());
return a.getInitiatedAt() < b.getInitiatedAt() ? -1 : 1;
}
@FunctionalInterface
public interface DisconnectCallback {
void onDisconnect(EthPeer peer);
}
}