PeerDiscoveryAgent.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.p2p.discovery;
import static com.google.common.base.Preconditions.checkArgument;
import org.hyperledger.besu.crypto.Hash;
import org.hyperledger.besu.crypto.SignatureAlgorithm;
import org.hyperledger.besu.crypto.SignatureAlgorithmFactory;
import org.hyperledger.besu.cryptoservices.NodeKey;
import org.hyperledger.besu.ethereum.chain.VariablesStorage;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.p2p.config.DiscoveryConfiguration;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.Packet;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerRequirement;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PingPacketData;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.TimerUtil;
import org.hyperledger.besu.ethereum.p2p.peers.EnodeURLImpl;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import org.hyperledger.besu.ethereum.p2p.permissions.PeerPermissions;
import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.nat.NatService;
import org.hyperledger.besu.plugin.data.EnodeURL;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.NetworkUtility;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Suppliers;
import com.google.common.net.InetAddresses;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.units.bigints.UInt64;
import org.ethereum.beacon.discovery.schema.EnrField;
import org.ethereum.beacon.discovery.schema.IdentitySchema;
import org.ethereum.beacon.discovery.schema.NodeRecord;
import org.ethereum.beacon.discovery.schema.NodeRecordFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The peer discovery agent is the network component that sends and receives peer discovery messages
* via UDP.
*/
public abstract class PeerDiscoveryAgent {
private static final Logger LOG = LoggerFactory.getLogger(PeerDiscoveryAgent.class);
private static final com.google.common.base.Supplier<SignatureAlgorithm> SIGNATURE_ALGORITHM =
Suppliers.memoize(SignatureAlgorithmFactory::getInstance);
// The devp2p specification says only accept packets up to 1280, but some
// clients ignore that, so we add in a little extra padding.
private static final int MAX_PACKET_SIZE_BYTES = 1600;
protected final List<DiscoveryPeer> bootstrapPeers;
private final List<PeerRequirement> peerRequirements = new CopyOnWriteArrayList<>();
private final PeerPermissions peerPermissions;
private final NatService natService;
private final MetricsSystem metricsSystem;
private final RlpxAgent rlpxAgent;
private final ForkIdManager forkIdManager;
private final PeerTable peerTable;
private static final boolean isIpv6Available = NetworkUtility.isIPv6Available();
/* The peer controller, which takes care of the state machine of peers. */
protected Optional<PeerDiscoveryController> controller = Optional.empty();
/* The keypair used to sign messages. */
protected final NodeKey nodeKey;
private final Bytes id;
protected final DiscoveryConfiguration config;
/* This is the {@link org.hyperledger.besu.ethereum.p2p.Peer} object representing our local node.
* This value is empty on construction, and is set after the discovery agent is started.
*/
private Optional<DiscoveryPeer> localNode = Optional.empty();
/* Is discovery enabled? */
private boolean isActive = false;
private final VariablesStorage variablesStorage;
private final Supplier<List<Bytes>> forkIdSupplier;
private String advertisedAddress;
protected PeerDiscoveryAgent(
final NodeKey nodeKey,
final DiscoveryConfiguration config,
final PeerPermissions peerPermissions,
final NatService natService,
final MetricsSystem metricsSystem,
final StorageProvider storageProvider,
final ForkIdManager forkIdManager,
final RlpxAgent rlpxAgent,
final PeerTable peerTable) {
this.metricsSystem = metricsSystem;
checkArgument(nodeKey != null, "nodeKey cannot be null");
checkArgument(config != null, "provided configuration cannot be null");
validateConfiguration(config);
this.peerPermissions = peerPermissions;
this.natService = natService;
this.bootstrapPeers =
config.getBootnodes().stream().map(DiscoveryPeer::fromEnode).collect(Collectors.toList());
this.config = config;
this.nodeKey = nodeKey;
this.id = nodeKey.getPublicKey().getEncodedBytes();
this.variablesStorage = storageProvider.createVariablesStorage();
this.forkIdManager = forkIdManager;
this.forkIdSupplier = () -> forkIdManager.getForkIdForChainHead().getForkIdAsBytesList();
this.rlpxAgent = rlpxAgent;
this.peerTable = peerTable;
}
protected abstract TimerUtil createTimer();
protected abstract PeerDiscoveryController.AsyncExecutor createWorkerExecutor();
protected abstract CompletableFuture<InetSocketAddress> listenForConnections();
protected abstract CompletableFuture<Void> sendOutgoingPacket(
final DiscoveryPeer peer, final Packet packet);
public abstract CompletableFuture<?> stop();
public CompletableFuture<Integer> start(final int tcpPort) {
if (config.isActive()) {
final String host = config.getBindHost();
final int port = config.getBindPort();
LOG.info(
"Starting peer discovery agent on host={}, port={}. IPv6 {}.",
host,
port,
NetworkUtility.isIPv6Available() ? "available" : "not available");
// override advertised host if we detect an external IP address via NAT manager
this.advertisedAddress = natService.queryExternalIPAddress(config.getAdvertisedHost());
return listenForConnections()
.thenApply(
(InetSocketAddress localAddress) -> {
// Once listener is set up, finish initializing
final int discoveryPort = localAddress.getPort();
final DiscoveryPeer ourNode =
DiscoveryPeer.fromEnode(
EnodeURLImpl.builder()
.nodeId(id)
.ipAddress(advertisedAddress)
.listeningPort(tcpPort)
.discoveryPort(discoveryPort)
.build());
this.localNode = Optional.of(ourNode);
isActive = true;
LOG.info("P2P peer discovery agent started and listening on {}", localAddress);
updateNodeRecord();
startController(ourNode);
return discoveryPort;
});
} else {
this.isActive = false;
return CompletableFuture.completedFuture(0);
}
}
public void updateNodeRecord() {
if (!config.isActive()) {
return;
}
final NodeRecordFactory nodeRecordFactory = NodeRecordFactory.DEFAULT;
final Optional<NodeRecord> existingNodeRecord =
variablesStorage.getLocalEnrSeqno().map(nodeRecordFactory::fromBytes);
final Bytes addressBytes = Bytes.of(InetAddresses.forString(advertisedAddress).getAddress());
final Optional<EnodeURL> maybeEnodeURL = localNode.map(DiscoveryPeer::getEnodeURL);
final Integer discoveryPort = maybeEnodeURL.flatMap(EnodeURL::getDiscoveryPort).orElse(0);
final Integer listeningPort = maybeEnodeURL.flatMap(EnodeURL::getListeningPort).orElse(0);
final String forkIdEnrField = "eth";
final NodeRecord newNodeRecord =
existingNodeRecord
.filter(
nodeRecord ->
id.equals(nodeRecord.get(EnrField.PKEY_SECP256K1))
&& addressBytes.equals(nodeRecord.get(EnrField.IP_V4))
&& discoveryPort.equals(nodeRecord.get(EnrField.UDP))
&& listeningPort.equals(nodeRecord.get(EnrField.TCP))
&& forkIdSupplier.get().equals(nodeRecord.get(forkIdEnrField)))
.orElseGet(
() -> {
final UInt64 sequenceNumber =
existingNodeRecord.map(NodeRecord::getSeq).orElse(UInt64.ZERO).add(1);
final NodeRecord nodeRecord =
nodeRecordFactory.createFromValues(
sequenceNumber,
new EnrField(EnrField.ID, IdentitySchema.V4),
new EnrField(
SIGNATURE_ALGORITHM.get().getCurveName(),
SIGNATURE_ALGORITHM
.get()
.compressPublicKey(
SIGNATURE_ALGORITHM.get().createPublicKey(id))),
new EnrField(EnrField.IP_V4, addressBytes),
new EnrField(EnrField.TCP, listeningPort),
new EnrField(EnrField.UDP, discoveryPort),
new EnrField(
forkIdEnrField, Collections.singletonList(forkIdSupplier.get())));
nodeRecord.setSignature(
nodeKey
.sign(Hash.keccak256(nodeRecord.serializeNoSignature()))
.encodedBytes()
.slice(0, 64));
LOG.info("Writing node record to disk. {}", nodeRecord);
final var variablesUpdater = variablesStorage.updater();
variablesUpdater.setLocalEnrSeqno(nodeRecord.serialize());
variablesUpdater.commit();
return nodeRecord;
});
localNode
.orElseThrow(() -> new IllegalStateException("Local node should be set here"))
.setNodeRecord(newNodeRecord);
}
public void addPeerRequirement(final PeerRequirement peerRequirement) {
this.peerRequirements.add(peerRequirement);
}
public boolean checkForkId(final DiscoveryPeer peer) {
return peer.getForkId().map(forkIdManager::peerCheck).orElse(true);
}
private void startController(final DiscoveryPeer localNode) {
final PeerDiscoveryController controller = createController(localNode);
this.controller = Optional.of(controller);
controller.start();
}
private PeerDiscoveryController createController(final DiscoveryPeer localNode) {
return PeerDiscoveryController.builder()
.nodeKey(nodeKey)
.localPeer(localNode)
.bootstrapNodes(bootstrapPeers)
.outboundMessageHandler(this::handleOutgoingPacket)
.timerUtil(createTimer())
.workerExecutor(createWorkerExecutor())
.peerRequirement(PeerRequirement.combine(peerRequirements))
.peerPermissions(peerPermissions)
.metricsSystem(metricsSystem)
.filterOnEnrForkId((config.isFilterOnEnrForkIdEnabled()))
.rlpxAgent(rlpxAgent)
.peerTable(peerTable)
.build();
}
protected boolean validatePacketSize(final int packetSize) {
return packetSize <= MAX_PACKET_SIZE_BYTES;
}
protected void handleIncomingPacket(final Endpoint sourceEndpoint, final Packet packet) {
final int udpPort = sourceEndpoint.getUdpPort();
final int tcpPort =
packet
.getPacketData(PingPacketData.class)
.flatMap(PingPacketData::getFrom)
.flatMap(Endpoint::getTcpPort)
.orElse(udpPort);
final String host = deriveHost(sourceEndpoint, packet);
// Notify the peer controller.
final DiscoveryPeer peer =
DiscoveryPeer.fromEnode(
EnodeURLImpl.builder()
.nodeId(packet.getNodeId())
.ipAddress(host)
.listeningPort(tcpPort)
.discoveryPort(udpPort)
.build());
controller.ifPresent(c -> c.onMessage(packet, peer));
}
/**
* method to derive the host from the source endpoint and the P2P PING packet. If the host is
* present in the P2P PING packet itself, use that as the endpoint. If the P2P PING packet
* specifies 127.0.0.1 (the default if a custom value is not specified with --p2p-host or via a
* suitable --nat-method) we ignore it in favour of the UDP source address. Some implementations
* send 127.0.0.1 or 255.255.255.255 anyway, but this reduces the chance of an unexpected change
* in behaviour as a result of https://github.com/hyperledger/besu/issues/6224 being fixed.
*
* @param sourceEndpoint source endpoint of the packet
* @param packet P2P PING packet
* @return host address as string
*/
static String deriveHost(final Endpoint sourceEndpoint, final Packet packet) {
final Optional<String> pingPacketHost =
packet
.getPacketData(PingPacketData.class)
.flatMap(PingPacketData::getFrom)
.map(Endpoint::getHost);
return pingPacketHost
// fall back to source endpoint "from" if ping packet from address does not satisfy filters
.filter(InetAddresses::isInetAddress)
.filter(h -> !NetworkUtility.isUnspecifiedAddress(h))
.filter(h -> !NetworkUtility.isLocalhostAddress(h))
.filter(h -> isIpv6Available || !NetworkUtility.isIpV6Address(h))
.stream()
.peek(
h ->
LOG.atTrace()
.setMessage(
"Using \"From\" endpoint {} specified in ping packet. Ignoring UDP source host {}")
.addArgument(h)
.addArgument(sourceEndpoint::getHost)
.log())
.findFirst()
.orElseGet(
() -> {
LOG.atTrace()
.setMessage(
"Ignoring \"From\" endpoint {} in ping packet. Using UDP source host {}")
.addArgument(pingPacketHost.orElse("not specified"))
.addArgument(sourceEndpoint.getHost())
.log();
return sourceEndpoint.getHost();
});
}
/**
* Send a packet to the given recipient.
*
* @param peer the recipient
* @param packet the packet to send
*/
protected void handleOutgoingPacket(final DiscoveryPeer peer, final Packet packet) {
sendOutgoingPacket(peer, packet)
.whenComplete(
(res, err) -> {
if (err != null) {
handleOutgoingPacketError(err, peer, packet);
return;
}
peer.setLastContacted(System.currentTimeMillis());
});
}
protected abstract void handleOutgoingPacketError(
final Throwable err, final DiscoveryPeer peer, final Packet packet);
public Stream<DiscoveryPeer> streamDiscoveredPeers() {
return controller.map(PeerDiscoveryController::streamDiscoveredPeers).orElse(Stream.empty());
}
public void dropPeer(final PeerId peer) {
controller.ifPresent(c -> c.dropPeer(peer));
}
public Optional<DiscoveryPeer> getAdvertisedPeer() {
return localNode;
}
public Bytes getId() {
return id;
}
private static void validateConfiguration(final DiscoveryConfiguration config) {
checkArgument(
config.getBindHost() != null && InetAddresses.isInetAddress(config.getBindHost()),
"valid bind host required");
checkArgument(
config.getAdvertisedHost() != null
&& InetAddresses.isInetAddress(config.getAdvertisedHost()),
"valid advertisement host required");
checkArgument(
config.getBindPort() == 0 || NetworkUtility.isValidPort(config.getBindPort()),
"valid port number required");
checkArgument(config.getBootnodes() != null, "bootstrapPeers cannot be null");
checkArgument(config.getBucketSize() > 0, "bucket size cannot be negative nor zero");
}
/**
* Returns the current state of the PeerDiscoveryAgent.
*
* <p>If true, the node is actively listening for new connections. If false, discovery has been
* turned off and the node is not listening for connections.
*
* @return true, if the {@link PeerDiscoveryAgent} is active on this node, false, otherwise.
*/
public boolean isActive() {
return isActive;
}
public void bond(final Peer peer) {
controller.ifPresent(
c -> {
DiscoveryPeer.from(peer).ifPresent(c::handleBondingRequest);
});
}
@VisibleForTesting
public Optional<DiscoveryPeer> getLocalNode() {
return localNode;
}
}