DefaultP2PNetwork.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.p2p.network;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import org.hyperledger.besu.cryptoservices.NodeKey;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Util;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.p2p.config.NetworkingConfiguration;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryAgent;
import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryStatus;
import org.hyperledger.besu.ethereum.p2p.discovery.VertxPeerDiscoveryAgent;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.peers.DefaultPeerPrivileges;
import org.hyperledger.besu.ethereum.p2p.peers.EnodeURLImpl;
import org.hyperledger.besu.ethereum.p2p.peers.LocalNode;
import org.hyperledger.besu.ethereum.p2p.peers.MaintainedPeers;
import org.hyperledger.besu.ethereum.p2p.peers.MutableLocalNode;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerPrivileges;
import org.hyperledger.besu.ethereum.p2p.permissions.PeerPermissions;
import org.hyperledger.besu.ethereum.p2p.permissions.PeerPermissionsDenylist;
import org.hyperledger.besu.ethereum.p2p.rlpx.ConnectCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.DisconnectCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.MessageCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.netty.TLSConfiguration;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.ShouldConnectCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.nat.NatMethod;
import org.hyperledger.besu.nat.NatService;
import org.hyperledger.besu.nat.core.NatManager;
import org.hyperledger.besu.nat.core.domain.NatServiceType;
import org.hyperledger.besu.nat.core.domain.NetworkProtocol;
import org.hyperledger.besu.nat.upnp.UpnpNatManager;
import org.hyperledger.besu.plugin.data.EnodeURL;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import io.vertx.core.Vertx;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.devp2p.EthereumNodeRecord;
import org.apache.tuweni.discovery.DNSDaemon;
import org.apache.tuweni.discovery.DNSDaemonListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The peer network service (defunct PeerNetworkingService) is the entrypoint to the peer-to-peer
* components of the Ethereum client. It implements the devp2p framework from the Ethereum
* specifications.
*
* <p>This component manages the peer discovery layer, the RLPx wire protocol and the subprotocols
* supported by this client.
*
* <h2>Peer Discovery</h2>
*
* Ethereum nodes discover one another via a simple UDP protocol that follows some of the techniques
* described in the Kademlia DHT paper. Particularly nodes are classified in a k-bucket table
* composed of 256 buckets, where each bucket contains at most 16 peers whose <i>XOR(SHA3(x))</i>
* distance from us is equal to the index of the bucket. The value <i>x</i> in the distance function
* corresponds to our node ID (public key).
*
* <p>Upper layers in the stack subscribe to events from the peer discovery layer and initiate/drop
* connections accordingly.
*
* <h2>RLPx Wire Protocol</h2>
*
* The RLPx wire protocol is responsible for selecting peers to engage with, authenticating and
* encrypting communications with peers, multiplexing subprotocols, framing messages, controlling
* legality of messages, keeping connections alive, and keeping track of peer reputation.
*
* <h2>Subprotocols</h2>
*
* Subprotocols are pluggable elements on top of the RLPx framework, which can handle a specific set
* of messages. Each subprotocol has a 3-char ASCII denominator and a version number, and statically
* defines a count of messages it can handle.
*
* <p>The RLPx wire protocol dispatches messages to subprotocols based on the capabilities agreed by
* each of the two peers during the protocol handshake.
*
* @see <a href="https://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf">Kademlia DHT
* paper</a>
* @see <a href="https://github.com/ethereum/wiki/wiki/Kademlia-Peer-Selection">Kademlia Peer
* Selection</a>
* @see <a href="https://github.com/ethereum/devp2p/blob/master/rlpx.md">devp2p RLPx</a>
*/
public class DefaultP2PNetwork implements P2PNetwork {
private static final Logger LOG = LoggerFactory.getLogger(DefaultP2PNetwork.class);
private final ScheduledExecutorService peerConnectionScheduler =
Executors.newSingleThreadScheduledExecutor();
private final PeerDiscoveryAgent peerDiscoveryAgent;
private final RlpxAgent rlpxAgent;
private final NetworkingConfiguration config;
private final Bytes nodeId;
private final MutableLocalNode localNode;
private final PeerPermissions peerPermissions;
private final MaintainedPeers maintainedPeers;
private final NatService natService;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final CountDownLatch shutdownLatch = new CountDownLatch(2);
private final Duration shutdownTimeout = Duration.ofSeconds(15);
private final Vertx vertx;
private DNSDaemon dnsDaemon;
/**
* Creates a peer networking service for production purposes.
*
* <p>The caller is expected to provide the IP address to be advertised (normally this node's
* public IP address), as well as TCP and UDP port numbers for the RLPx agent and the discovery
* agent, respectively.
*
* @param localNode A representation of the local node
* @param peerDiscoveryAgent The agent responsible for discovering peers on the network.
* @param nodeKey The node key through which cryptographic operations can be performed
* @param config The network configuration to use.
* @param peerPermissions An object that determines whether peers are allowed to connect
* @param natService The NAT environment manager.
* @param maintainedPeers A collection of peers for which we are expected to maintain connections
* @param reputationManager An object that inspect disconnections for misbehaving peers that can
* then be blacklisted.
* @param vertx the Vert.x instance managing network resources
*/
DefaultP2PNetwork(
final MutableLocalNode localNode,
final PeerDiscoveryAgent peerDiscoveryAgent,
final RlpxAgent rlpxAgent,
final NodeKey nodeKey,
final NetworkingConfiguration config,
final PeerPermissions peerPermissions,
final NatService natService,
final MaintainedPeers maintainedPeers,
final PeerDenylistManager reputationManager,
final Vertx vertx) {
this.localNode = localNode;
this.peerDiscoveryAgent = peerDiscoveryAgent;
this.rlpxAgent = rlpxAgent;
this.config = config;
this.natService = natService;
this.maintainedPeers = maintainedPeers;
this.nodeId = nodeKey.getPublicKey().getEncodedBytes();
this.peerPermissions = peerPermissions;
this.vertx = vertx;
final int maxPeers = rlpxAgent.getMaxPeers();
LOG.debug("setting maxPeers {}", maxPeers);
peerDiscoveryAgent.addPeerRequirement(() -> rlpxAgent.getConnectionCount() >= maxPeers);
subscribeDisconnect(reputationManager);
}
public static Builder builder() {
return new Builder();
}
@Override
public void start() {
if (!started.compareAndSet(false, true)) {
LOG.warn("Attempted to start an already started " + getClass().getSimpleName());
return;
}
if (config.getDiscovery().isDiscoveryV5Enabled()) {
LOG.warn("Discovery Protocol v5 is not available");
}
final String address = config.getDiscovery().getAdvertisedHost();
final int configuredDiscoveryPort = config.getDiscovery().getBindPort();
final int configuredRlpxPort = config.getRlpx().getBindPort();
Optional.ofNullable(config.getDiscovery().getDNSDiscoveryURL())
.ifPresent(
disco -> {
// These lists are updated every 12h
// We retrieve the list every 10 minutes (600000 msec)
LOG.info("Starting DNS discovery with URL {}", disco);
config
.getDnsDiscoveryServerOverride()
.ifPresent(
dnsServer ->
LOG.info(
"Starting DNS discovery with DNS Server override {}", dnsServer));
dnsDaemon =
new DNSDaemon(
disco,
createDaemonListener(),
0L,
600000L,
config.getDnsDiscoveryServerOverride().orElse(null),
vertx);
dnsDaemon.start();
});
final int listeningPort = rlpxAgent.start().join();
final int discoveryPort =
peerDiscoveryAgent
.start(
(configuredDiscoveryPort == 0 && configuredRlpxPort == 0)
? listeningPort
: configuredDiscoveryPort)
.join();
final Consumer<? super NatManager> natAction =
natManager -> {
final UpnpNatManager upnpNatManager = (UpnpNatManager) natManager;
upnpNatManager.requestPortForward(
discoveryPort, NetworkProtocol.UDP, NatServiceType.DISCOVERY);
upnpNatManager.requestPortForward(
listeningPort, NetworkProtocol.TCP, NatServiceType.RLPX);
};
natService.ifNatEnvironment(NatMethod.UPNP, natAction);
natService.ifNatEnvironment(NatMethod.UPNPP2PONLY, natAction);
setLocalNode(address, listeningPort, discoveryPort);
// Call checkMaintainedConnectionPeers() now that the local node is up, for immediate peer
// additions
checkMaintainedConnectionPeers();
// Periodically check maintained connections
final int checkMaintainedConnectionsSec = config.getCheckMaintainedConnectionsFrequencySec();
peerConnectionScheduler.scheduleWithFixedDelay(
this::checkMaintainedConnectionPeers, 2, checkMaintainedConnectionsSec, TimeUnit.SECONDS);
// Periodically initiate outgoing connections to discovered peers
final int checkConnectionsSec = config.getInitiateConnectionsFrequencySec();
peerConnectionScheduler.scheduleWithFixedDelay(
this::attemptPeerConnections, checkConnectionsSec, checkConnectionsSec, TimeUnit.SECONDS);
}
@Override
public void stop() {
if (!this.started.get() || !stopped.compareAndSet(false, true)) {
// We haven't started, or we've started and stopped already
return;
}
getDnsDaemon().ifPresent(DNSDaemon::close);
peerConnectionScheduler.shutdownNow();
peerDiscoveryAgent.stop().whenComplete((res, err) -> shutdownLatch.countDown());
rlpxAgent.stop().whenComplete((res, err) -> shutdownLatch.countDown());
peerPermissions.close();
}
@Override
public void awaitStop() {
try {
if (!peerConnectionScheduler.awaitTermination(
shutdownTimeout.getSeconds(), TimeUnit.SECONDS)) {
LOG.error(
"{} did not shutdown cleanly: peerConnectionScheduler executor did not fully terminate.",
this.getClass().getSimpleName());
}
if (!shutdownLatch.await(shutdownTimeout.getSeconds(), TimeUnit.SECONDS)) {
LOG.error(
"{} did not shutdown cleanly: some internal services failed to fully terminate.",
this.getClass().getSimpleName());
}
} catch (final InterruptedException ex) {
throw new IllegalStateException(ex);
}
}
@Override
public RlpxAgent getRlpxAgent() {
return rlpxAgent;
}
@Override
public boolean addMaintainedConnectionPeer(final Peer peer) {
if (localNode.isReady()
&& localNode.getPeer() != null
&& localNode.getPeer().getEnodeURL() != null
&& peer.getEnodeURL().getNodeId().equals(localNode.getPeer().getEnodeURL().getNodeId())) {
return false;
}
final boolean wasAdded = maintainedPeers.add(peer);
peerDiscoveryAgent.bond(peer);
rlpxAgent.connect(peer);
return wasAdded;
}
@Override
public boolean removeMaintainedConnectionPeer(final Peer peer) {
final boolean wasRemoved = maintainedPeers.remove(peer);
peerDiscoveryAgent.dropPeer(peer);
LOG.debug("Disconnect requested for peer {}.", peer);
rlpxAgent.disconnect(peer.getId(), DisconnectReason.REQUESTED);
return wasRemoved;
}
@VisibleForTesting
Optional<DNSDaemon> getDnsDaemon() {
return Optional.ofNullable(dnsDaemon);
}
@VisibleForTesting
DNSDaemonListener createDaemonListener() {
return (seq, records) -> {
final List<DiscoveryPeer> peers = new ArrayList<>();
for (final EthereumNodeRecord enr : records) {
final EnodeURL enodeURL =
EnodeURLImpl.builder()
.ipAddress(enr.ip())
.nodeId(enr.publicKey().bytes())
.discoveryPort(Optional.ofNullable(enr.udp()))
.listeningPort(Optional.ofNullable(enr.tcp()))
.build();
final DiscoveryPeer peer = DiscoveryPeer.fromEnode(enodeURL);
peers.add(peer);
}
if (!peers.isEmpty()) {
peers.stream().forEach(peerDiscoveryAgent::bond);
}
};
}
@VisibleForTesting
void checkMaintainedConnectionPeers() {
if (!localNode.isReady()) {
return;
}
final List<Bytes> doNotConnectTo =
rlpxAgent
.streamActiveConnections()
.map(c -> c.getPeer().getId())
.collect(Collectors.toList());
doNotConnectTo.add(localNode.getPeer().getEnodeURL().getNodeId());
maintainedPeers
.streamPeers()
.filter(p -> !doNotConnectTo.contains(p.getId()))
.forEach(rlpxAgent::connect);
}
@VisibleForTesting
void attemptPeerConnections() {
LOG.trace("Initiating connections to discovered peers.");
final Stream<DiscoveryPeer> toTry =
streamDiscoveredPeers()
.filter(peer -> peer.getStatus() == PeerDiscoveryStatus.BONDED)
.filter(peerDiscoveryAgent::checkForkId)
.sorted(Comparator.comparing(DiscoveryPeer::getLastAttemptedConnection));
toTry.forEach(rlpxAgent::connect);
}
@Override
public Collection<PeerConnection> getPeers() {
return rlpxAgent.streamConnections().collect(Collectors.toList());
}
@Override
public int getPeerCount() {
return getRlpxAgent().getConnectionCount();
}
@Override
public Stream<DiscoveryPeer> streamDiscoveredPeers() {
return peerDiscoveryAgent.streamDiscoveredPeers();
}
@Override
public CompletableFuture<PeerConnection> connect(final Peer peer) {
return rlpxAgent.connect(peer);
}
@Override
public void subscribe(final Capability capability, final MessageCallback callback) {
rlpxAgent.subscribeMessage(capability, callback);
}
@Override
public void subscribeConnect(final ConnectCallback callback) {
rlpxAgent.subscribeConnect(callback);
}
@Override
public void subscribeConnectRequest(final ShouldConnectCallback callback) {
rlpxAgent.subscribeConnectRequest(callback);
}
@Override
public void subscribeDisconnect(final DisconnectCallback callback) {
rlpxAgent.subscribeDisconnect(callback);
}
@Override
public void close() {
stop();
}
@Override
public boolean isListening() {
return localNode.isReady();
}
@Override
public boolean isP2pEnabled() {
return true;
}
@Override
public boolean isDiscoveryEnabled() {
return peerDiscoveryAgent.isActive();
}
@Override
public Optional<EnodeURL> getLocalEnode() {
if (!localNode.isReady()) {
return Optional.empty();
}
return Optional.of(localNode.getPeer().getEnodeURL());
}
private void setLocalNode(
final String address, final int listeningPort, final int discoveryPort) {
if (localNode.isReady()) {
// Already set
return;
}
// override advertised host if we detect an external IP address via NAT manager
final String advertisedAddress = natService.queryExternalIPAddress(address);
final EnodeURL localEnode =
EnodeURLImpl.builder()
.nodeId(nodeId)
.ipAddress(advertisedAddress)
.listeningPort(listeningPort)
.discoveryPort(discoveryPort)
.build();
LOG.info("Enode URL {}", localEnode.toString());
LOG.info("Node address {}", Util.publicKeyToAddress(localEnode.getNodeId()));
localNode.setEnode(localEnode);
}
@Override
public void updateNodeRecord() {
peerDiscoveryAgent.updateNodeRecord();
}
public static class Builder {
private Vertx vertx;
private PeerDiscoveryAgent peerDiscoveryAgent;
private RlpxAgent rlpxAgent;
private NetworkingConfiguration config = NetworkingConfiguration.create();
private List<Capability> supportedCapabilities;
private NodeKey nodeKey;
private MaintainedPeers maintainedPeers = new MaintainedPeers();
private PeerPermissions peerPermissions = PeerPermissions.noop();
private NatService natService = new NatService(Optional.empty());
private MetricsSystem metricsSystem;
private StorageProvider storageProvider;
private Optional<TLSConfiguration> p2pTLSConfiguration = Optional.empty();
private Blockchain blockchain;
private List<Long> blockNumberForks;
private List<Long> timestampForks;
private boolean legacyForkIdEnabled = false;
private Supplier<Stream<PeerConnection>> allConnectionsSupplier;
private Supplier<Stream<PeerConnection>> allActiveConnectionsSupplier;
private int maxPeers;
private PeerTable peerTable;
public P2PNetwork build() {
validate();
return doBuild();
}
private P2PNetwork doBuild() {
// Set up permissions
// Fold peer reputation into permissions
final PeerPermissionsDenylist misbehavingPeers = PeerPermissionsDenylist.create(500);
final PeerDenylistManager reputationManager =
new PeerDenylistManager(misbehavingPeers, maintainedPeers);
peerPermissions = PeerPermissions.combine(peerPermissions, misbehavingPeers);
final MutableLocalNode localNode =
MutableLocalNode.create(config.getRlpx().getClientId(), 5, supportedCapabilities);
final PeerPrivileges peerPrivileges = new DefaultPeerPrivileges(maintainedPeers);
peerTable = new PeerTable(nodeKey.getPublicKey().getEncodedBytes());
rlpxAgent = rlpxAgent == null ? createRlpxAgent(localNode, peerPrivileges) : rlpxAgent;
peerDiscoveryAgent = peerDiscoveryAgent == null ? createDiscoveryAgent() : peerDiscoveryAgent;
return new DefaultP2PNetwork(
localNode,
peerDiscoveryAgent,
rlpxAgent,
nodeKey,
config,
peerPermissions,
natService,
maintainedPeers,
reputationManager,
vertx);
}
private void validate() {
checkState(nodeKey != null, "NodeKey must be set.");
checkState(config != null, "NetworkingConfiguration must be set.");
checkState(
supportedCapabilities != null && supportedCapabilities.size() > 0,
"Supported capabilities must be set and non-empty.");
checkState(metricsSystem != null, "MetricsSystem must be set.");
checkState(storageProvider != null, "StorageProvider must be set.");
checkState(peerDiscoveryAgent != null || vertx != null, "Vertx must be set.");
checkState(blockNumberForks != null, "BlockNumberForks must be set.");
checkState(timestampForks != null, "TimestampForks must be set.");
checkState(allConnectionsSupplier != null, "Supplier must be set.");
checkState(allActiveConnectionsSupplier != null, "Supplier must be set.");
}
private PeerDiscoveryAgent createDiscoveryAgent() {
final ForkIdManager forkIdManager =
new ForkIdManager(blockchain, blockNumberForks, timestampForks, this.legacyForkIdEnabled);
return new VertxPeerDiscoveryAgent(
vertx,
nodeKey,
config.getDiscovery(),
peerPermissions,
natService,
metricsSystem,
storageProvider,
forkIdManager,
rlpxAgent,
peerTable);
}
private RlpxAgent createRlpxAgent(
final LocalNode localNode, final PeerPrivileges peerPrivileges) {
return RlpxAgent.builder()
.nodeKey(nodeKey)
.config(config.getRlpx())
.peerPermissions(peerPermissions)
.peerPrivileges(peerPrivileges)
.localNode(localNode)
.metricsSystem(metricsSystem)
.p2pTLSConfiguration(p2pTLSConfiguration)
.allConnectionsSupplier(allConnectionsSupplier)
.allActiveConnectionsSupplier(allActiveConnectionsSupplier)
.maxPeers(maxPeers)
.peerTable(peerTable)
.build();
}
public Builder peerDiscoveryAgent(final PeerDiscoveryAgent peerDiscoveryAgent) {
checkNotNull(peerDiscoveryAgent);
this.peerDiscoveryAgent = peerDiscoveryAgent;
return this;
}
public Builder rlpxAgent(final RlpxAgent rlpxAgent) {
checkNotNull(rlpxAgent);
this.rlpxAgent = rlpxAgent;
return this;
}
public Builder vertx(final Vertx vertx) {
checkNotNull(vertx);
this.vertx = vertx;
return this;
}
public Builder nodeKey(final NodeKey nodeKey) {
checkNotNull(nodeKey);
this.nodeKey = nodeKey;
return this;
}
public Builder config(final NetworkingConfiguration config) {
checkNotNull(config);
this.config = config;
return this;
}
public Builder supportedCapabilities(final List<Capability> supportedCapabilities) {
checkNotNull(supportedCapabilities);
this.supportedCapabilities = supportedCapabilities;
return this;
}
public Builder supportedCapabilities(final Capability... supportedCapabilities) {
this.supportedCapabilities = Arrays.asList(supportedCapabilities);
return this;
}
public Builder peerPermissions(final PeerPermissions peerPermissions) {
checkNotNull(peerPermissions);
this.peerPermissions = peerPermissions;
return this;
}
public Builder metricsSystem(final MetricsSystem metricsSystem) {
checkNotNull(metricsSystem);
this.metricsSystem = metricsSystem;
return this;
}
public Builder natService(final NatService natService) {
checkNotNull(natService);
this.natService = natService;
return this;
}
public Builder maintainedPeers(final MaintainedPeers maintainedPeers) {
checkNotNull(maintainedPeers);
this.maintainedPeers = maintainedPeers;
return this;
}
public Builder storageProvider(final StorageProvider storageProvider) {
checkNotNull(storageProvider);
this.storageProvider = storageProvider;
return this;
}
public Builder p2pTLSConfiguration(final Optional<TLSConfiguration> p2pTLSConfiguration) {
checkNotNull(p2pTLSConfiguration);
this.p2pTLSConfiguration = p2pTLSConfiguration;
return this;
}
public Builder blockchain(final MutableBlockchain blockchain) {
checkNotNull(blockchain);
this.blockchain = blockchain;
return this;
}
public Builder blockNumberForks(final List<Long> forks) {
checkNotNull(forks);
this.blockNumberForks = forks;
return this;
}
public Builder timestampForks(final List<Long> forks) {
checkNotNull(forks);
this.timestampForks = forks;
return this;
}
public Builder legacyForkIdEnabled(final boolean legacyForkIdEnabled) {
this.legacyForkIdEnabled = legacyForkIdEnabled;
return this;
}
public Builder allConnectionsSupplier(
final Supplier<Stream<PeerConnection>> allConnectionsSupplier) {
this.allConnectionsSupplier = allConnectionsSupplier;
return this;
}
public Builder allActiveConnectionsSupplier(
final Supplier<Stream<PeerConnection>> allActiveConnectionsSupplier) {
this.allActiveConnectionsSupplier = allActiveConnectionsSupplier;
return this;
}
public Builder maxPeers(final int maxPeers) {
this.maxPeers = maxPeers;
return this;
}
}
}