AbstractPendingTransactionsSorter.java
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions.sorter;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ADDED;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.NONCE_TOO_FAR_IN_FUTURE_FOR_SENDER;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionAddedListener;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactions;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolReplacementHandler;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.evm.account.Account;
import org.hyperledger.besu.evm.account.AccountState;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.data.TransactionSelectionResult;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import org.hyperledger.besu.util.Subscribers;
import java.time.Clock;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
/**
* Holds the current set of pending transactions with the ability to iterate them based on priority
* for mining or look-up by hash.
*
* <p>This class is safe for use across multiple threads.
*/
public abstract class AbstractPendingTransactionsSorter implements PendingTransactions {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractPendingTransactionsSorter.class);
private static final Marker INVALID_TX_REMOVED = MarkerFactory.getMarker("INVALID_TX_REMOVED");
protected final Clock clock;
protected final TransactionPoolConfiguration poolConfig;
protected final Object lock = new Object();
protected final Map<Hash, PendingTransaction> pendingTransactions;
protected final Map<Address, PendingTransactionsForSender> transactionsBySender =
new ConcurrentHashMap<>();
protected final Subscribers<PendingTransactionAddedListener> pendingTransactionSubscribers =
Subscribers.create();
protected final Subscribers<PendingTransactionDroppedListener> transactionDroppedListeners =
Subscribers.create();
protected final LabelledMetric<Counter> transactionRemovedCounter;
protected final Counter localTransactionAddedCounter;
protected final Counter remoteTransactionAddedCounter;
protected final TransactionPoolReplacementHandler transactionReplacementHandler;
protected final Supplier<BlockHeader> chainHeadHeaderSupplier;
private final BlobCache blobCache;
public AbstractPendingTransactionsSorter(
final TransactionPoolConfiguration poolConfig,
final Clock clock,
final MetricsSystem metricsSystem,
final Supplier<BlockHeader> chainHeadHeaderSupplier) {
this.poolConfig = poolConfig;
this.pendingTransactions = new ConcurrentHashMap<>(poolConfig.getTxPoolMaxSize());
this.clock = clock;
this.chainHeadHeaderSupplier = chainHeadHeaderSupplier;
this.transactionReplacementHandler =
new TransactionPoolReplacementHandler(
poolConfig.getPriceBump(), poolConfig.getBlobPriceBump());
final LabelledMetric<Counter> transactionAddedCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.TRANSACTION_POOL,
"transactions_added_total",
"Count of transactions added to the transaction pool",
"source");
localTransactionAddedCounter = transactionAddedCounter.labels("local");
remoteTransactionAddedCounter = transactionAddedCounter.labels("remote");
transactionRemovedCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.TRANSACTION_POOL,
"transactions_removed_total",
"Count of transactions removed from the transaction pool",
"source",
"operation");
metricsSystem.createIntegerGauge(
BesuMetricCategory.TRANSACTION_POOL,
"transactions",
"Current size of the transaction pool",
pendingTransactions::size);
this.blobCache = new BlobCache();
}
@Override
public void reset() {
pendingTransactions.clear();
transactionsBySender.clear();
}
@Override
public void evictOldTransactions() {
final long removeTransactionsBefore =
clock
.instant()
.minus(poolConfig.getPendingTxRetentionPeriod(), ChronoUnit.HOURS)
.toEpochMilli();
pendingTransactions.values().stream()
.filter(transaction -> transaction.getAddedAt() < removeTransactionsBefore)
.forEach(
transactionInfo -> {
LOG.atTrace()
.setMessage("Evicted {} due to age")
.addArgument(transactionInfo::toTraceLog)
.log();
removeTransaction(transactionInfo.getTransaction());
});
}
@Override
public List<Transaction> getLocalTransactions() {
return pendingTransactions.values().stream()
.filter(PendingTransaction::isReceivedFromLocalSource)
.map(PendingTransaction::getTransaction)
.collect(Collectors.toList());
}
@Override
public List<Transaction> getPriorityTransactions() {
return pendingTransactions.values().stream()
.filter(PendingTransaction::hasPriority)
.map(PendingTransaction::getTransaction)
.collect(Collectors.toList());
}
@Override
public TransactionAddedResult addTransaction(
final PendingTransaction transaction, final Optional<Account> maybeSenderAccount) {
final TransactionAddedResult transactionAddedStatus =
internalAddTransaction(transaction, maybeSenderAccount);
if (transactionAddedStatus.equals(ADDED)) {
if (!transaction.isReceivedFromLocalSource()) {
remoteTransactionAddedCounter.inc();
} else {
localTransactionAddedCounter.inc();
}
}
return transactionAddedStatus;
}
void removeTransaction(final Transaction transaction) {
removeTransaction(transaction, false);
notifyTransactionDropped(transaction);
}
@Override
public void manageBlockAdded(
final BlockHeader blockHeader,
final List<Transaction> confirmedTransactions,
final List<Transaction> reorgTransactions,
final FeeMarket feeMarket) {
synchronized (lock) {
confirmedTransactions.forEach(this::transactionAddedToBlock);
manageBlockAdded(blockHeader);
}
}
protected abstract void manageBlockAdded(final BlockHeader blockHeader);
public void transactionAddedToBlock(final Transaction transaction) {
removeTransaction(transaction, true);
}
private void incrementTransactionRemovedCounter(
final boolean receivedFromLocalSource, final boolean addedToBlock) {
final String location = receivedFromLocalSource ? "local" : "remote";
final String operation = addedToBlock ? "addedToBlock" : "dropped";
transactionRemovedCounter.labels(location, operation).inc();
}
// There's a small edge case here we could encounter.
// When we pass an upgrade block that has a new transaction type, we start allowing transactions
// of that new type into our pool.
// If we then reorg to a block lower than the upgrade block height _and_ we create a block, that
// block could end up with transactions of the new type.
// This seems like it would be very rare but worth it to document that we don't handle that case
// right now.
@Override
public void selectTransactions(final TransactionSelector selector) {
synchronized (lock) {
final Set<Transaction> transactionsToRemove = new HashSet<>();
final Map<Address, AccountTransactionOrder> accountTransactions = new HashMap<>();
final Iterator<PendingTransaction> prioritizedTransactions = prioritizedTransactions();
while (prioritizedTransactions.hasNext()) {
final PendingTransaction highestPriorityPendingTransaction = prioritizedTransactions.next();
final AccountTransactionOrder accountTransactionOrder =
accountTransactions.computeIfAbsent(
highestPriorityPendingTransaction.getSender(), this::createSenderTransactionOrder);
for (final PendingTransaction transactionToProcess :
accountTransactionOrder.transactionsToProcess(highestPriorityPendingTransaction)) {
final TransactionSelectionResult result =
selector.evaluateTransaction(transactionToProcess);
if (result.discard()) {
transactionsToRemove.add(transactionToProcess.getTransaction());
logDiscardedTransaction(transactionToProcess, result);
}
if (result.stop()) {
transactionsToRemove.forEach(this::removeTransaction);
return;
}
}
}
transactionsToRemove.forEach(this::removeTransaction);
}
}
private void logDiscardedTransaction(
final PendingTransaction pendingTransaction, final TransactionSelectionResult result) {
LOG.atInfo()
.addMarker(INVALID_TX_REMOVED)
.addKeyValue("txhash", pendingTransaction::getHash)
.addKeyValue("txlog", pendingTransaction::toTraceLog)
.addKeyValue("reason", result)
.addKeyValue(
"txrlp",
() -> {
final BytesValueRLPOutput rlp = new BytesValueRLPOutput();
pendingTransaction.getTransaction().writeTo(rlp);
return rlp.encoded().toHexString();
})
.log();
}
private AccountTransactionOrder createSenderTransactionOrder(final Address address) {
return new AccountTransactionOrder(
transactionsBySender.get(address).streamPendingTransactions());
}
private TransactionAddedResult addTransactionForSenderAndNonce(
final PendingTransaction pendingTransaction, final Optional<Account> maybeSenderAccount) {
PendingTransactionsForSender pendingTxsForSender =
transactionsBySender.computeIfAbsent(
pendingTransaction.getSender(),
address -> new PendingTransactionsForSender(maybeSenderAccount));
PendingTransaction existingPendingTx =
pendingTxsForSender.getPendingTransactionForNonce(pendingTransaction.getNonce());
final Optional<Transaction> maybeReplacedTransaction;
if (existingPendingTx != null) {
if (!transactionReplacementHandler.shouldReplace(
existingPendingTx, pendingTransaction, chainHeadHeaderSupplier.get())) {
LOG.atTrace()
.setMessage("Reject underpriced transaction replacement {}")
.addArgument(pendingTransaction::toTraceLog)
.log();
return REJECTED_UNDERPRICED_REPLACEMENT;
}
LOG.atTrace()
.setMessage("Replace existing transaction {}, with new transaction {}")
.addArgument(existingPendingTx::toTraceLog)
.addArgument(pendingTransaction::toTraceLog)
.log();
maybeReplacedTransaction = Optional.of(existingPendingTx.getTransaction());
} else {
maybeReplacedTransaction = Optional.empty();
}
pendingTxsForSender.updateSenderAccount(maybeSenderAccount);
pendingTxsForSender.trackPendingTransaction(pendingTransaction);
LOG.atTrace()
.setMessage("Tracked transaction by sender {}")
.addArgument(pendingTxsForSender::toTraceLog)
.log();
maybeReplacedTransaction.ifPresent(this::removeTransaction);
return ADDED;
}
private void removePendingTransactionBySenderAndNonce(
final PendingTransaction pendingTransaction) {
final Transaction transaction = pendingTransaction.getTransaction();
Optional.ofNullable(transactionsBySender.get(transaction.getSender()))
.ifPresent(
pendingTxsForSender -> {
pendingTxsForSender.removeTrackedPendingTransaction(pendingTransaction);
if (pendingTxsForSender.transactionCount() == 0) {
LOG.trace(
"Removing sender {} from transactionBySender since no more tracked transactions",
transaction.getSender());
transactionsBySender.remove(transaction.getSender());
} else {
LOG.atTrace()
.setMessage("Tracked transaction by sender {} after the removal of {}")
.addArgument(pendingTxsForSender::toTraceLog)
.addArgument(transaction::toTraceLog)
.log();
}
});
}
private void notifyTransactionAdded(final Transaction transaction) {
pendingTransactionSubscribers.forEach(listener -> listener.onTransactionAdded(transaction));
}
private void notifyTransactionDropped(final Transaction transaction) {
transactionDroppedListeners.forEach(listener -> listener.onTransactionDropped(transaction));
}
@Override
public long maxSize() {
return poolConfig.getTxPoolMaxSize();
}
@Override
public int size() {
return pendingTransactions.size();
}
@Override
public boolean containsTransaction(final Transaction transaction) {
return pendingTransactions.containsKey(transaction.getHash());
}
@Override
public Optional<Transaction> getTransactionByHash(final Hash transactionHash) {
return Optional.ofNullable(pendingTransactions.get(transactionHash))
.map(PendingTransaction::getTransaction);
}
@Override
public List<PendingTransaction> getPendingTransactions() {
return new ArrayList<>(pendingTransactions.values());
}
@Override
public long subscribePendingTransactions(final PendingTransactionAddedListener listener) {
return pendingTransactionSubscribers.subscribe(listener);
}
@Override
public void unsubscribePendingTransactions(final long id) {
pendingTransactionSubscribers.unsubscribe(id);
}
@Override
public long subscribeDroppedTransactions(final PendingTransactionDroppedListener listener) {
return transactionDroppedListeners.subscribe(listener);
}
@Override
public void unsubscribeDroppedTransactions(final long id) {
transactionDroppedListeners.unsubscribe(id);
}
@Override
public OptionalLong getNextNonceForSender(final Address sender) {
final PendingTransactionsForSender pendingTransactionsForSender =
transactionsBySender.get(sender);
return pendingTransactionsForSender == null
? OptionalLong.empty()
: pendingTransactionsForSender.maybeNextNonce();
}
private void removeTransaction(final Transaction transaction, final boolean addedToBlock) {
synchronized (lock) {
final PendingTransaction removedPendingTx = pendingTransactions.remove(transaction.getHash());
if (removedPendingTx != null) {
removePrioritizedTransaction(removedPendingTx);
removePendingTransactionBySenderAndNonce(removedPendingTx);
incrementTransactionRemovedCounter(
removedPendingTx.isReceivedFromLocalSource(), addedToBlock);
if (removedPendingTx.getTransaction().getBlobsWithCommitments().isPresent()
&& addedToBlock) {
this.blobCache.cacheBlobs(removedPendingTx.getTransaction());
}
}
}
}
protected abstract void removePrioritizedTransaction(PendingTransaction removedPendingTx);
protected abstract Iterator<PendingTransaction> prioritizedTransactions();
protected abstract void prioritizeTransaction(final PendingTransaction pendingTransaction);
private TransactionAddedResult internalAddTransaction(
final PendingTransaction pendingTransaction, final Optional<Account> maybeSenderAccount) {
final Transaction transaction = pendingTransaction.getTransaction();
synchronized (lock) {
if (pendingTransactions.containsKey(pendingTransaction.getHash())) {
LOG.atTrace()
.setMessage("Already known transaction {}")
.addArgument(pendingTransaction::toTraceLog)
.log();
return ALREADY_KNOWN;
}
if (transaction.getNonce() - maybeSenderAccount.map(AccountState::getNonce).orElse(0L)
>= poolConfig.getTxPoolMaxFutureTransactionByAccount()) {
LOG.atTrace()
.setMessage(
"Transaction {} not added because nonce too far in the future for sender {}")
.addArgument(transaction::toTraceLog)
.addArgument(
() ->
maybeSenderAccount
.map(Account::getAddress)
.map(Address::toString)
.orElse("unknown"))
.log();
return NONCE_TOO_FAR_IN_FUTURE_FOR_SENDER;
}
final TransactionAddedResult transactionAddedStatus =
addTransactionForSenderAndNonce(pendingTransaction, maybeSenderAccount);
if (!transactionAddedStatus.equals(ADDED)) {
return transactionAddedStatus;
}
pendingTransactions.put(pendingTransaction.getHash(), pendingTransaction);
prioritizeTransaction(pendingTransaction);
if (pendingTransactions.size() > poolConfig.getTxPoolMaxSize()) {
evictLessPriorityTransactions();
}
}
notifyTransactionAdded(pendingTransaction.getTransaction());
return ADDED;
}
protected abstract PendingTransaction getLeastPriorityTransaction();
private void evictLessPriorityTransactions() {
final PendingTransaction leastPriorityTx = getLeastPriorityTransaction();
// evict all txs for the sender with nonce >= the least priority one to avoid gaps
final var pendingTxsForSender = transactionsBySender.get(leastPriorityTx.getSender());
final var txsToEvict = pendingTxsForSender.getPendingTransactions(leastPriorityTx.getNonce());
// remove backward to avoid gaps
for (int i = txsToEvict.size() - 1; i >= 0; i--) {
removeTransaction(txsToEvict.get(i).getTransaction());
}
}
@Override
public String logStats() {
return "Pending " + pendingTransactions.size();
}
@Override
public String toTraceLog() {
synchronized (lock) {
StringBuilder sb =
new StringBuilder(
"Prioritized transactions { "
+ StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
prioritizedTransactions(), Spliterator.ORDERED),
false)
.map(PendingTransaction::toTraceLog)
.collect(Collectors.joining("; "))
+ " }");
return sb.toString();
}
}
/**
* @param transaction to restore blobs onto
* @return an optional copy of the supplied transaction, but with the BlobsWithCommitments
* restored. If none could be restored, empty.
*/
@Override
public Optional<Transaction> restoreBlob(final Transaction transaction) {
return blobCache.restoreBlob(transaction);
}
}