EthProtocolManager.java
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager;
import static com.google.common.base.Preconditions.checkArgument;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.chain.MinedBlockObserver;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.StatusMessage;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidatorRunner;
import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.forkid.ForkId;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.p2p.network.ProtocolManager;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Message;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.ethereum.rlp.RLPException;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import org.apache.tuweni.bytes.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
private static final Logger LOG = LoggerFactory.getLogger(EthProtocolManager.class);
private final EthScheduler scheduler;
private final CountDownLatch shutdown;
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final Hash genesisHash;
private final ForkIdManager forkIdManager;
private final BigInteger networkId;
private final EthPeers ethPeers;
private final EthMessages ethMessages;
private final EthContext ethContext;
private final List<Capability> supportedCapabilities;
private final Blockchain blockchain;
private final BlockBroadcaster blockBroadcaster;
private final List<PeerValidator> peerValidators;
private final Optional<MergePeerFilter> mergePeerFilter;
public EthProtocolManager(
final Blockchain blockchain,
final BigInteger networkId,
final WorldStateArchive worldStateArchive,
final TransactionPool transactionPool,
final EthProtocolConfiguration ethereumWireProtocolConfiguration,
final EthPeers ethPeers,
final EthMessages ethMessages,
final EthContext ethContext,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final SynchronizerConfiguration synchronizerConfiguration,
final EthScheduler scheduler,
final ForkIdManager forkIdManager) {
this.networkId = networkId;
this.peerValidators = peerValidators;
this.scheduler = scheduler;
this.blockchain = blockchain;
this.mergePeerFilter = mergePeerFilter;
this.shutdown = new CountDownLatch(1);
this.genesisHash = blockchain.getBlockHashByNumber(0L).orElse(Hash.ZERO);
this.forkIdManager = forkIdManager;
this.ethPeers = ethPeers;
this.ethMessages = ethMessages;
this.ethContext = ethContext;
this.blockBroadcaster = new BlockBroadcaster(ethContext);
this.supportedCapabilities =
calculateCapabilities(synchronizerConfiguration, ethereumWireProtocolConfiguration);
// Run validators
for (final PeerValidator peerValidator : this.peerValidators) {
PeerValidatorRunner.runValidator(ethContext, peerValidator);
}
// Set up request handlers
new EthServer(
blockchain,
worldStateArchive,
transactionPool,
ethMessages,
ethereumWireProtocolConfiguration);
}
@VisibleForTesting
public EthProtocolManager(
final Blockchain blockchain,
final BigInteger networkId,
final WorldStateArchive worldStateArchive,
final TransactionPool transactionPool,
final EthProtocolConfiguration ethereumWireProtocolConfiguration,
final EthPeers ethPeers,
final EthMessages ethMessages,
final EthContext ethContext,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final SynchronizerConfiguration synchronizerConfiguration,
final EthScheduler scheduler) {
this(
blockchain,
networkId,
worldStateArchive,
transactionPool,
ethereumWireProtocolConfiguration,
ethPeers,
ethMessages,
ethContext,
peerValidators,
mergePeerFilter,
synchronizerConfiguration,
scheduler,
new ForkIdManager(
blockchain,
Collections.emptyList(),
Collections.emptyList(),
ethereumWireProtocolConfiguration.isLegacyEth64ForkIdEnabled()));
}
public EthProtocolManager(
final Blockchain blockchain,
final BigInteger networkId,
final WorldStateArchive worldStateArchive,
final TransactionPool transactionPool,
final EthProtocolConfiguration ethereumWireProtocolConfiguration,
final EthPeers ethPeers,
final EthMessages ethMessages,
final EthContext ethContext,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final SynchronizerConfiguration synchronizerConfiguration,
final EthScheduler scheduler,
final List<Long> blockNumberForks,
final List<Long> timestampForks) {
this(
blockchain,
networkId,
worldStateArchive,
transactionPool,
ethereumWireProtocolConfiguration,
ethPeers,
ethMessages,
ethContext,
peerValidators,
mergePeerFilter,
synchronizerConfiguration,
scheduler,
new ForkIdManager(
blockchain,
blockNumberForks,
timestampForks,
ethereumWireProtocolConfiguration.isLegacyEth64ForkIdEnabled()));
}
public EthContext ethContext() {
return ethContext;
}
public BlockBroadcaster getBlockBroadcaster() {
return blockBroadcaster;
}
@Override
public String getSupportedProtocol() {
return EthProtocol.NAME;
}
private List<Capability> calculateCapabilities(
final SynchronizerConfiguration synchronizerConfiguration,
final EthProtocolConfiguration ethProtocolConfiguration) {
final List<Capability> capabilities = new ArrayList<>();
if (SyncMode.isFullSync(synchronizerConfiguration.getSyncMode())) {
capabilities.add(EthProtocol.ETH62);
}
capabilities.add(EthProtocol.ETH63);
capabilities.add(EthProtocol.ETH64);
capabilities.add(EthProtocol.ETH65);
capabilities.add(EthProtocol.ETH66);
// Version 67 removes the GetNodeData and NodeData
// Fast sync depends on GetNodeData and NodeData
// see https://eips.ethereum.org/EIPS/eip-4938
if (!Objects.equals(SyncMode.FAST, synchronizerConfiguration.getSyncMode())) {
capabilities.add(EthProtocol.ETH67);
capabilities.add(EthProtocol.ETH68);
}
capabilities.removeIf(cap -> cap.getVersion() > ethProtocolConfiguration.getMaxEthCapability());
capabilities.removeIf(cap -> cap.getVersion() < ethProtocolConfiguration.getMinEthCapability());
return Collections.unmodifiableList(capabilities);
}
@Override
public int getHighestProtocolVersion() {
return getSupportedCapabilities().stream()
.max(Comparator.comparing(Capability::getVersion))
.map(Capability::getVersion)
.orElse(0);
}
@Override
public List<Capability> getSupportedCapabilities() {
return supportedCapabilities;
}
@Override
public void stop() {
if (stopped.compareAndSet(false, true)) {
LOG.atInfo().setMessage("Stopping {} Subprotocol.").addArgument(getSupportedProtocol()).log();
scheduler.stop();
shutdown.countDown();
} else {
LOG.atInfo()
.setMessage("Attempted to stop already stopped {} Subprotocol.")
.addArgument(this::getSupportedProtocol)
.log();
}
}
@Override
public void awaitStop() throws InterruptedException {
shutdown.await();
scheduler.awaitStop();
LOG.atInfo()
.setMessage("{} Subprotocol stopped.")
.addArgument(this::getSupportedProtocol)
.log();
}
@Override
public void processMessage(final Capability cap, final Message message) {
checkArgument(
getSupportedCapabilities().contains(cap),
"Unsupported capability passed to processMessage(): " + cap);
final MessageData messageData = message.getData();
final int code = messageData.getCode();
EthProtocolLogger.logProcessMessage(cap, code);
final EthPeer ethPeer = ethPeers.peer(message.getConnection());
if (ethPeer == null) {
LOG.atDebug()
.setMessage("Ignoring message received from unknown peer connection: {}")
.addArgument(message::getConnection)
.log();
return;
}
// Handle STATUS processing
if (code == EthPV62.STATUS) {
handleStatusMessage(ethPeer, message);
return;
} else if (!ethPeer.statusHasBeenReceived()) {
// Peers are required to send status messages before any other message type
LOG.atDebug()
.setMessage(
"{} requires a Status ({}) message to be sent first. Instead, received message {} (BREACH_OF_PROTOCOL). Disconnecting from {}.")
.addArgument(() -> this.getClass().getSimpleName())
.addArgument(EthPV62.STATUS)
.addArgument(code)
.addArgument(ethPeer::toString)
.log();
ethPeer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL_RECEIVED_OTHER_MESSAGE_BEFORE_STATUS);
return;
}
if (this.mergePeerFilter.isPresent()) {
if (this.mergePeerFilter.get().disconnectIfGossipingBlocks(message, ethPeer)) {
LOG.atDebug()
.setMessage("Post-merge disconnect: peer still gossiping blocks {}")
.addArgument(ethPeer::toString)
.log();
handleDisconnect(
ethPeer.getConnection(), DisconnectReason.SUBPROTOCOL_TRIGGERED_POW_BLOCKS, false);
return;
}
}
final EthMessage ethMessage = new EthMessage(ethPeer, messageData);
if (!ethPeer.validateReceivedMessage(ethMessage, getSupportedProtocol())) {
LOG.debug(
"Unsolicited message received {} (BREACH_OF_PROTOCOL), disconnecting from EthPeer: {}",
ethMessage.getData().getCode(),
ethPeer);
ethPeer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL_UNSOLICITED_MESSAGE_RECEIVED);
return;
}
// This will handle responses
ethPeers.dispatchMessage(ethPeer, ethMessage, getSupportedProtocol());
// This will handle requests
Optional<MessageData> maybeResponseData = Optional.empty();
try {
if (EthProtocol.isEth66Compatible(cap) && EthProtocol.requestIdCompatible(code)) {
final Map.Entry<BigInteger, MessageData> requestIdAndEthMessage =
ethMessage.getData().unwrapMessageData();
maybeResponseData =
ethMessages
.dispatch(new EthMessage(ethPeer, requestIdAndEthMessage.getValue()))
.map(responseData -> responseData.wrapMessageData(requestIdAndEthMessage.getKey()));
} else {
maybeResponseData = ethMessages.dispatch(ethMessage);
}
} catch (final RLPException e) {
LOG.atDebug()
.setMessage("Received malformed message {} (BREACH_OF_PROTOCOL), disconnecting: {}, {}")
.addArgument(messageData::getData)
.addArgument(ethPeer::toString)
.addArgument(e::toString)
.log();
ethPeer.disconnect(
DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL_MALFORMED_MESSAGE_RECEIVED);
}
maybeResponseData.ifPresent(
responseData -> {
try {
ethPeer.send(responseData, getSupportedProtocol());
} catch (final PeerNotConnected __) {
// Peer disconnected before we could respond - nothing to do
}
});
}
@Override
public void handleNewConnection(final PeerConnection connection) {
ethPeers.registerNewConnection(connection, peerValidators);
final EthPeer peer = ethPeers.peer(connection);
final Capability cap = connection.capability(getSupportedProtocol());
final ForkId latestForkId =
cap.getVersion() >= 64 ? forkIdManager.getForkIdForChainHead() : null;
final StatusMessage status =
StatusMessage.create(
cap.getVersion(),
networkId,
blockchain.getChainHead().getTotalDifficulty(),
blockchain.getChainHeadHash(),
genesisHash,
latestForkId);
try {
LOG.atTrace()
.setMessage("Sending status message to {} for connection {}.")
.addArgument(peer::getId)
.addArgument(connection::toString)
.log();
peer.send(status, getSupportedProtocol(), connection);
peer.registerStatusSent(connection);
} catch (final PeerNotConnected peerNotConnected) {
// Nothing to do.
}
LOG.atTrace().setMessage("{}").addArgument(ethPeers::toString).log();
}
@Override
public boolean shouldConnect(final Peer peer, final boolean incoming) {
if (peer.getForkId().map(forkIdManager::peerCheck).orElse(true)) {
LOG.atDebug()
.setMessage("ForkId OK or not available for peer {}")
.addArgument(peer::getLoggableId)
.log();
if (ethPeers.shouldConnect(peer, incoming)) {
return true;
}
} else {
LOG.atDebug()
.setMessage("ForkId check failed for peer {} our fork id {} theirs {}")
.addArgument(peer::getLoggableId)
.addArgument(forkIdManager.getForkIdForChainHead())
.addArgument(peer.getForkId())
.log();
return false;
}
return false;
}
@Override
public void handleDisconnect(
final PeerConnection connection,
final DisconnectReason reason,
final boolean initiatedByPeer) {
if (ethPeers.registerDisconnect(connection)) {
LOG.atDebug()
.setMessage("Disconnect - {} - {} - {} - {} peers left")
.addArgument(initiatedByPeer ? "Inbound" : "Outbound")
.addArgument(reason::toString)
.addArgument(() -> connection.getPeer().getLoggableId())
.addArgument(ethPeers::peerCount)
.log();
LOG.atTrace().setMessage("{}").addArgument(ethPeers::toString).log();
}
}
private void handleStatusMessage(final EthPeer peer, final Message message) {
final StatusMessage status = StatusMessage.readFrom(message.getData());
final ForkId forkId = status.forkId();
peer.getConnection().getPeer().setForkId(forkId);
try {
if (!status.networkId().equals(networkId)) {
LOG.atDebug()
.setMessage("Mismatched network id: {}, peer {}")
.addArgument(status::networkId)
.addArgument(() -> getPeerOrPeerId(peer))
.log();
peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED_MISMATCHED_NETWORK);
} else if (!forkIdManager.peerCheck(forkId) && status.protocolVersion() > 63) {
LOG.atDebug()
.setMessage("{} has matching network id ({}), but non-matching fork id: {}")
.addArgument(() -> getPeerOrPeerId(peer))
.addArgument(networkId::toString)
.addArgument(forkId)
.log();
peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED_MISMATCHED_FORKID);
} else if (forkIdManager.peerCheck(status.genesisHash())) {
LOG.atDebug()
.setMessage("{} has matching network id ({}), but non-matching genesis hash: {}")
.addArgument(() -> getPeerOrPeerId(peer))
.addArgument(networkId::toString)
.addArgument(status::genesisHash)
.log();
peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED_MISMATCHED_GENESIS_HASH);
} else if (mergePeerFilter.isPresent()
&& mergePeerFilter.get().disconnectIfPoW(status, peer)) {
LOG.atDebug()
.setMessage("Post-merge disconnect: peer still PoW {}")
.addArgument(() -> getPeerOrPeerId(peer))
.log();
handleDisconnect(
peer.getConnection(), DisconnectReason.SUBPROTOCOL_TRIGGERED_POW_DIFFICULTY, false);
} else {
LOG.atDebug()
.setMessage("Received status message from {}: {} with connection {}")
.addArgument(peer::toString)
.addArgument(status::toString)
.addArgument(message::getConnection)
.log();
peer.registerStatusReceived(
status.bestHash(),
status.totalDifficulty(),
status.protocolVersion(),
message.getConnection());
}
} catch (final RLPException e) {
LOG.atDebug()
.setMessage("Unable to parse status message from peer {} {}")
.addArgument(peer::getLoggableId)
.addArgument(e)
.log();
// Parsing errors can happen when clients broadcast network ids outside the int range,
// So just disconnect with "subprotocol" error rather than "breach of protocol".
peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED_UNPARSABLE_STATUS);
}
}
private Object getPeerOrPeerId(final EthPeer peer) {
return LOG.isTraceEnabled() ? peer : peer.getLoggableId();
}
@Override
public void blockMined(final Block block) {
// This assumes the block has already been included in the chain
final Difficulty totalDifficulty =
blockchain
.getTotalDifficultyByHash(block.getHash())
.orElseThrow(
() ->
new IllegalStateException(
"Unable to get total difficulty from blockchain for mined block."));
blockBroadcaster.propagate(block, totalDifficulty);
}
public List<Bytes> getForkIdAsBytesList() {
final ForkId chainHeadForkId = forkIdManager.getForkIdForChainHead();
return chainHeadForkId == null
? Collections.emptyList()
: chainHeadForkId.getForkIdAsBytesList();
}
}