TransactionBroadcaster.java
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions;
import static org.hyperledger.besu.datatypes.TransactionType.BLOB;
import static org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction.toTransactionList;
import org.hyperledger.besu.datatypes.TransactionType;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.messages.EthPV65;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TransactionBroadcaster implements TransactionBatchAddedListener {
private static final Logger LOG = LoggerFactory.getLogger(TransactionBroadcaster.class);
private static final EnumSet<TransactionType> ANNOUNCE_HASH_ONLY_TX_TYPES = EnumSet.of(BLOB);
private static final Boolean HASH_ONLY_BROADCAST = Boolean.TRUE;
private static final Boolean FULL_BROADCAST = Boolean.FALSE;
private final PeerTransactionTracker transactionTracker;
private final TransactionsMessageSender transactionsMessageSender;
private final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender;
private final EthContext ethContext;
private final Random random;
public TransactionBroadcaster(
final EthContext ethContext,
final PeerTransactionTracker transactionTracker,
final TransactionsMessageSender transactionsMessageSender,
final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender) {
this(
ethContext,
transactionTracker,
transactionsMessageSender,
newPooledTransactionHashesMessageSender,
null);
}
@VisibleForTesting
protected TransactionBroadcaster(
final EthContext ethContext,
final PeerTransactionTracker transactionTracker,
final TransactionsMessageSender transactionsMessageSender,
final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender,
final Long seed) {
this.transactionTracker = transactionTracker;
this.transactionsMessageSender = transactionsMessageSender;
this.newPooledTransactionHashesMessageSender = newPooledTransactionHashesMessageSender;
this.ethContext = ethContext;
this.random = seed != null ? new Random(seed) : new Random();
}
public void relayTransactionPoolTo(
final EthPeer peer, final Collection<PendingTransaction> pendingTransactions) {
if (!pendingTransactions.isEmpty()) {
if (peer.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES)) {
sendTransactionHashes(toTransactionList(pendingTransactions), List.of(peer));
} else {
// we need to exclude txs that support hash only broadcasting
final var fullBroadcastTxs =
pendingTransactions.stream()
.map(PendingTransaction::getTransaction)
.filter(tx -> !ANNOUNCE_HASH_ONLY_TX_TYPES.contains(tx.getType()))
.toList();
sendFullTransactions(fullBroadcastTxs, List.of(peer));
}
}
}
@Override
public void onTransactionsAdded(final Collection<Transaction> transactions) {
final int currPeerCount = ethContext.getEthPeers().peerCount();
if (currPeerCount == 0) {
return;
}
final int numPeersToSendFullTransactions = (int) Math.round(Math.sqrt(currPeerCount));
final Map<Boolean, List<Transaction>> transactionByBroadcastMode =
transactions.stream()
.collect(
Collectors.partitioningBy(
tx -> ANNOUNCE_HASH_ONLY_TX_TYPES.contains(tx.getType())));
final List<EthPeer> sendOnlyFullTransactionPeers = new ArrayList<>(currPeerCount);
final List<EthPeer> sendOnlyHashPeers = new ArrayList<>(currPeerCount);
final List<EthPeer> sendMixedPeers = new ArrayList<>(currPeerCount);
ethContext
.getEthPeers()
.streamAvailablePeers()
.forEach(
peer -> {
if (peer.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES)) {
sendOnlyHashPeers.add(peer);
} else {
sendOnlyFullTransactionPeers.add(peer);
}
});
if (sendOnlyFullTransactionPeers.size() < numPeersToSendFullTransactions) {
final int delta =
Math.min(
numPeersToSendFullTransactions - sendOnlyFullTransactionPeers.size(),
sendOnlyHashPeers.size());
Collections.shuffle(sendOnlyHashPeers, random);
// move peers from the mixed list to reach the required size for full transaction peers
movePeersBetweenLists(sendOnlyHashPeers, sendMixedPeers, delta);
}
LOG.atTrace()
.setMessage(
"Sending full transactions to {} peers, transaction hashes only to {} peers and mixed to {} peers."
+ " Peers w/o eth/65 {}, peers with eth/65 {}")
.addArgument(sendOnlyFullTransactionPeers::size)
.addArgument(sendOnlyHashPeers::size)
.addArgument(sendMixedPeers::size)
.addArgument(sendOnlyFullTransactionPeers)
.addArgument(() -> sendOnlyHashPeers.toString() + sendMixedPeers)
.log();
sendToFullTransactionsPeers(
transactionByBroadcastMode.get(FULL_BROADCAST), sendOnlyFullTransactionPeers);
sendToOnlyHashPeers(transactionByBroadcastMode, sendOnlyHashPeers);
sendToMixedPeers(transactionByBroadcastMode, sendMixedPeers);
}
private void sendToFullTransactionsPeers(
final List<Transaction> fullBroadcastTransactions, final List<EthPeer> fullTransactionPeers) {
sendFullTransactions(fullBroadcastTransactions, fullTransactionPeers);
}
private void sendToOnlyHashPeers(
final Map<Boolean, List<Transaction>> txsByHashOnlyBroadcast,
final List<EthPeer> hashOnlyPeers) {
final List<Transaction> allTransactions =
txsByHashOnlyBroadcast.values().stream().flatMap(List::stream).toList();
sendTransactionHashes(allTransactions, hashOnlyPeers);
}
private void sendToMixedPeers(
final Map<Boolean, List<Transaction>> txsByHashOnlyBroadcast,
final List<EthPeer> mixedPeers) {
sendFullTransactions(txsByHashOnlyBroadcast.get(FULL_BROADCAST), mixedPeers);
sendTransactionHashes(txsByHashOnlyBroadcast.get(HASH_ONLY_BROADCAST), mixedPeers);
}
private void sendFullTransactions(
final List<Transaction> transactions, final List<EthPeer> fullTransactionPeers) {
if (!transactions.isEmpty()) {
fullTransactionPeers.forEach(
peer -> {
transactions.forEach(
transaction -> transactionTracker.addToPeerSendQueue(peer, transaction));
ethContext
.getScheduler()
.scheduleSyncWorkerTask(
() -> transactionsMessageSender.sendTransactionsToPeer(peer));
});
}
}
private void sendTransactionHashes(
final List<Transaction> transactions, final List<EthPeer> transactionHashPeers) {
if (!transactions.isEmpty()) {
transactionHashPeers.stream()
.forEach(
peer -> {
transactions.forEach(
transaction -> transactionTracker.addToPeerHashSendQueue(peer, transaction));
ethContext
.getScheduler()
.scheduleSyncWorkerTask(
() ->
newPooledTransactionHashesMessageSender.sendTransactionHashesToPeer(
peer));
});
}
}
private void movePeersBetweenLists(
final List<EthPeer> sourceList, final List<EthPeer> destinationList, final int num) {
final int stopIndex = sourceList.size() - num;
for (int i = sourceList.size() - 1; i >= stopIndex; i--) {
destinationList.add(sourceList.remove(i));
}
}
}