LayeredPendingTransactions.java
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.reducing;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.INTERNAL_ERROR;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.NONCE_TOO_FAR_IN_FUTURE_FOR_SENDER;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.RECONCILED;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionAddedListener;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactions;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.evm.account.Account;
import org.hyperledger.besu.evm.account.AccountState;
import org.hyperledger.besu.plugin.data.TransactionSelectionResult;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import kotlin.ranges.LongRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
public class LayeredPendingTransactions implements PendingTransactions {
private static final Logger LOG = LoggerFactory.getLogger(LayeredPendingTransactions.class);
private static final Logger LOG_FOR_REPLAY = LoggerFactory.getLogger("LOG_FOR_REPLAY");
private static final Marker INVALID_TX_REMOVED = MarkerFactory.getMarker("INVALID_TX_REMOVED");
private final TransactionPoolConfiguration poolConfig;
private final AbstractPrioritizedTransactions prioritizedTransactions;
public LayeredPendingTransactions(
final TransactionPoolConfiguration poolConfig,
final AbstractPrioritizedTransactions prioritizedTransactions) {
this.poolConfig = poolConfig;
this.prioritizedTransactions = prioritizedTransactions;
}
@Override
public synchronized void reset() {
prioritizedTransactions.reset();
}
@Override
public synchronized TransactionAddedResult addTransaction(
final PendingTransaction pendingTransaction, final Optional<Account> maybeSenderAccount) {
final long stateSenderNonce = maybeSenderAccount.map(AccountState::getNonce).orElse(0L);
logTransactionForReplayAdd(pendingTransaction, stateSenderNonce);
if (hasAccountNonceDisparity(pendingTransaction, stateSenderNonce)) {
reconcileSender(pendingTransaction.getSender(), stateSenderNonce);
}
final long nonceDistance = pendingTransaction.getNonce() - stateSenderNonce;
final TransactionAddedResult nonceChecksResult =
nonceChecks(pendingTransaction, stateSenderNonce, nonceDistance);
if (nonceChecksResult != null) {
return nonceChecksResult;
}
try {
return prioritizedTransactions.add(pendingTransaction, (int) nonceDistance);
} catch (final Throwable throwable) {
return reconcileAndRetryAdd(
pendingTransaction, stateSenderNonce, (int) nonceDistance, throwable);
}
}
private TransactionAddedResult reconcileAndRetryAdd(
final PendingTransaction pendingTransaction,
final long stateSenderNonce,
final int nonceDistance,
final Throwable throwable) {
// in case something unexpected happened, log this sender txs, force a reconcile and retry
// another time
LOG.atDebug()
.setMessage(
"Unexpected error when adding transaction {}, current sender status {}, force a reconcile and retry")
.setCause(throwable)
.addArgument(pendingTransaction::toTraceLog)
.addArgument(() -> prioritizedTransactions.logSender(pendingTransaction.getSender()))
.log();
reconcileSender(pendingTransaction.getSender(), stateSenderNonce);
try {
return prioritizedTransactions.add(pendingTransaction, nonceDistance);
} catch (final Throwable throwable2) {
// the error should have been solved by the reconcile, logging at higher level now
LOG.atWarn()
.setCause(throwable2)
.setMessage(
"Unexpected error when adding transaction {} after reconciliation, current sender status {}")
.addArgument(pendingTransaction.toTraceLog())
.addArgument(prioritizedTransactions.logSender(pendingTransaction.getSender()))
.log();
return INTERNAL_ERROR;
}
}
/**
* Detect a disparity between account nonce has seen by the world state and the txpool, that could
* happen during the small amount of time during block import when the world state is updated
* while the txpool still does not process the confirmed txs, or when there is a reorg and the
* sender nonce goes back.
*
* @param pendingTransaction the incoming transaction to check
* @param stateSenderNonce account nonce from the world state
* @return false if the nonce for the sender has seen by the txpool matches the value of the
* account nonce in the world state, true if they differ
*/
private boolean hasAccountNonceDisparity(
final PendingTransaction pendingTransaction, final long stateSenderNonce) {
final OptionalLong maybeTxPoolSenderNonce =
prioritizedTransactions.getCurrentNonceFor(pendingTransaction.getSender());
if (maybeTxPoolSenderNonce.isPresent()) {
final long txPoolSenderNonce = maybeTxPoolSenderNonce.getAsLong();
if (stateSenderNonce != txPoolSenderNonce) {
LOG.atDebug()
.setMessage(
"Nonce disparity detected when adding pending transaction {}. "
+ "Account nonce from world state is {} while current txpool nonce is {}")
.addArgument(pendingTransaction::toTraceLog)
.addArgument(stateSenderNonce)
.addArgument(txPoolSenderNonce)
.log();
return true;
}
}
return false;
}
/**
* Rebuild the txpool for a sender according to the specified nonce. This is used in case the
* account nonce has seen by the txpool is not the correct one (see {@link
* LayeredPendingTransactions#hasAccountNonceDisparity(PendingTransaction, long)} for when this
* could happen). It works by removing all the txs for the sender and re-adding them using the
* passed nonce.
*
* @param sender the sender for which rebuild the txpool
* @param stateSenderNonce the world state account nonce to use in the txpool for the sender
*/
private void reconcileSender(final Address sender, final long stateSenderNonce) {
final var existingSenderTxs = prioritizedTransactions.getAllFor(sender);
if (existingSenderTxs.isEmpty()) {
LOG.debug("Sender {} has no transactions to reconcile", sender);
return;
}
LOG.atDebug()
.setMessage("Sender {} with nonce {} has {} transaction(s) to reconcile {}")
.addArgument(sender)
.addArgument(stateSenderNonce)
.addArgument(existingSenderTxs::size)
.addArgument(() -> prioritizedTransactions.logSender(sender))
.log();
final var reAddTxs = new ArrayDeque<PendingTransaction>(existingSenderTxs.size());
// it is more performant to invalidate backward
for (int i = existingSenderTxs.size() - 1; i >= 0; --i) {
final var ptx = existingSenderTxs.get(i);
prioritizedTransactions.remove(ptx, RECONCILED);
if (ptx.getNonce() >= stateSenderNonce) {
reAddTxs.addFirst(ptx);
}
}
if (!reAddTxs.isEmpty()) {
// re-add all the previous txs
final long lowestNonce = reAddTxs.getFirst().getNonce();
final int newNonceDistance = (int) Math.max(0, lowestNonce - stateSenderNonce);
reAddTxs.forEach(ptx -> prioritizedTransactions.add(ptx, newNonceDistance));
}
LOG.atDebug()
.setMessage("Sender {} with nonce {} status after reconciliation {}")
.addArgument(sender)
.addArgument(stateSenderNonce)
.addArgument(() -> prioritizedTransactions.logSender(sender))
.log();
}
private void logTransactionForReplayAdd(
final PendingTransaction pendingTransaction, final long senderNonce) {
// csv fields: sequence, addedAt, sender, sender_nonce, nonce, type, hash, rlp
LOG_FOR_REPLAY
.atTrace()
.setMessage("T,{},{},{},{},{},{},{},{}")
.addArgument(pendingTransaction.getSequence())
.addArgument(pendingTransaction.getAddedAt())
.addArgument(pendingTransaction.getSender())
.addArgument(senderNonce)
.addArgument(pendingTransaction.getNonce())
.addArgument(pendingTransaction.getTransaction().getType())
.addArgument(pendingTransaction::getHash)
.addArgument(
() -> {
final BytesValueRLPOutput rlp = new BytesValueRLPOutput();
pendingTransaction.getTransaction().writeTo(rlp);
return rlp.encoded().toHexString();
})
.log();
}
private void logDiscardedTransaction(
final PendingTransaction pendingTransaction, final TransactionSelectionResult result) {
// csv fields: sequence, addedAt, sender, nonce, type, hash, rlp
LOG_FOR_REPLAY
.atTrace()
.setMessage("D,{},{},{},{},{},{},{}")
.addArgument(pendingTransaction.getSequence())
.addArgument(pendingTransaction.getAddedAt())
.addArgument(pendingTransaction.getSender())
.addArgument(pendingTransaction.getNonce())
.addArgument(pendingTransaction.getTransaction().getType())
.addArgument(pendingTransaction::getHash)
.addArgument(
() -> {
final BytesValueRLPOutput rlp = new BytesValueRLPOutput();
pendingTransaction.getTransaction().writeTo(rlp);
return rlp.encoded().toHexString();
})
.log();
LOG.atInfo()
.addMarker(INVALID_TX_REMOVED)
.addKeyValue("txhash", pendingTransaction::getHash)
.addKeyValue("txlog", pendingTransaction::toTraceLog)
.addKeyValue("reason", result)
.addKeyValue(
"txrlp",
() -> {
final BytesValueRLPOutput rlp = new BytesValueRLPOutput();
pendingTransaction.getTransaction().writeTo(rlp);
return rlp.encoded().toHexString();
})
.log();
}
private TransactionAddedResult nonceChecks(
final PendingTransaction pendingTransaction,
final long senderNonce,
final long nonceDistance) {
if (nonceDistance < 0) {
LOG.atTrace()
.setMessage("Drop already confirmed transaction {}, since current sender nonce is {}")
.addArgument(pendingTransaction::toTraceLog)
.addArgument(senderNonce)
.log();
return ALREADY_KNOWN;
} else if (nonceDistance >= poolConfig.getMaxFutureBySender()) {
LOG.atTrace()
.setMessage(
"Drop too much in the future transaction {}, since current sender nonce is {}")
.addArgument(pendingTransaction::toTraceLog)
.addArgument(senderNonce)
.log();
return NONCE_TOO_FAR_IN_FUTURE_FOR_SENDER;
}
return null;
}
@Override
public void evictOldTransactions() {}
@Override
public synchronized List<Transaction> getLocalTransactions() {
return prioritizedTransactions.getAllLocal();
}
@Override
public synchronized List<Transaction> getPriorityTransactions() {
return prioritizedTransactions.getAllPriority();
}
@Override
// There's a small edge case here we could encounter.
// When we pass an upgrade block that has a new transaction type, we start allowing transactions
// of that new type into our pool.
// If we then reorg to a block lower than the upgrade block height _and_ we create a block, that
// block could end up with transactions of the new type.
// This seems like it would be very rare but worth it to document that we don't handle that case
// right now.
public synchronized void selectTransactions(
final PendingTransactions.TransactionSelector selector) {
final List<PendingTransaction> invalidTransactions = new ArrayList<>();
final Set<Hash> alreadyChecked = new HashSet<>();
final Set<Address> skipSenders = new HashSet<>();
final AtomicBoolean completed = new AtomicBoolean(false);
prioritizedTransactions.stream()
.takeWhile(unused -> !completed.get())
.filter(highPrioPendingTx -> !skipSenders.contains(highPrioPendingTx.getSender()))
.peek(this::logSenderTxs)
.forEach(
highPrioPendingTx ->
prioritizedTransactions.stream(highPrioPendingTx.getSender())
.takeWhile(
candidatePendingTx ->
!skipSenders.contains(candidatePendingTx.getSender())
&& !completed.get())
.filter(
candidatePendingTx ->
!alreadyChecked.contains(candidatePendingTx.getHash())
&& Long.compareUnsigned(
candidatePendingTx.getNonce(), highPrioPendingTx.getNonce())
<= 0)
.forEach(
candidatePendingTx -> {
alreadyChecked.add(candidatePendingTx.getHash());
final var res = selector.evaluateTransaction(candidatePendingTx);
LOG.atTrace()
.setMessage("Selection result {} for transaction {}")
.addArgument(res)
.addArgument(candidatePendingTx::toTraceLog)
.log();
if (res.discard()) {
invalidTransactions.add(candidatePendingTx);
logDiscardedTransaction(candidatePendingTx, res);
}
if (res.stop()) {
completed.set(true);
}
if (!res.selected()) {
// avoid processing other txs from this sender if this one is skipped
// since the following will not be selected due to the nonce gap
skipSenders.add(candidatePendingTx.getSender());
LOG.trace("Skipping tx from sender {}", candidatePendingTx.getSender());
}
}));
invalidTransactions.forEach(
invalidTx -> prioritizedTransactions.remove(invalidTx, INVALIDATED));
}
private void logSenderTxs(final PendingTransaction highPrioPendingTx) {
LOG.atTrace()
.setMessage("highPrioPendingTx {}, senderTxs {}")
.addArgument(highPrioPendingTx::toTraceLog)
.addArgument(
() ->
prioritizedTransactions.stream(highPrioPendingTx.getSender())
.map(PendingTransaction::toTraceLog)
.collect(Collectors.joining(", ")))
.log();
}
@Override
public long maxSize() {
return -1;
}
@Override
public synchronized int size() {
return prioritizedTransactions.count();
}
@Override
public synchronized boolean containsTransaction(final Transaction transaction) {
return prioritizedTransactions.contains(transaction);
}
@Override
public synchronized Optional<Transaction> getTransactionByHash(final Hash transactionHash) {
return prioritizedTransactions.getByHash(transactionHash);
}
@Override
public synchronized List<PendingTransaction> getPendingTransactions() {
return prioritizedTransactions.getAll();
}
@Override
public long subscribePendingTransactions(final PendingTransactionAddedListener listener) {
return prioritizedTransactions.subscribeToAdded(listener);
}
@Override
public void unsubscribePendingTransactions(final long id) {
prioritizedTransactions.unsubscribeFromAdded(id);
}
@Override
public long subscribeDroppedTransactions(final PendingTransactionDroppedListener listener) {
return prioritizedTransactions.subscribeToDropped(listener);
}
@Override
public void unsubscribeDroppedTransactions(final long id) {
prioritizedTransactions.unsubscribeFromDropped(id);
}
@Override
public OptionalLong getNextNonceForSender(final Address sender) {
return prioritizedTransactions.getNextNonceFor(sender);
}
@Override
public synchronized void manageBlockAdded(
final BlockHeader blockHeader,
final List<Transaction> confirmedTransactions,
final List<Transaction> reorgTransactions,
final FeeMarket feeMarket) {
LOG.atTrace()
.setMessage("Managing new added block {}")
.addArgument(blockHeader::toLogString)
.log();
final var maxConfirmedNonceBySender = maxNonceBySender(confirmedTransactions);
final var reorgNonceRangeBySender = nonceRangeBySender(reorgTransactions);
try {
prioritizedTransactions.blockAdded(feeMarket, blockHeader, maxConfirmedNonceBySender);
} catch (final Throwable throwable) {
LOG.warn(
"Unexpected error {} when managing added block {}, maxNonceBySender {}, reorgNonceRangeBySender {}",
throwable,
blockHeader.toLogString(),
maxConfirmedNonceBySender,
reorgTransactions);
LOG.warn("Stack trace", throwable);
}
logBlockHeaderForReplay(blockHeader, maxConfirmedNonceBySender, reorgNonceRangeBySender);
}
private void logBlockHeaderForReplay(
final BlockHeader blockHeader,
final Map<Address, Long> maxConfirmedNonceBySender,
final Map<Address, LongRange> reorgNonceRangeBySender) {
// block number, block hash, sender, max nonce ..., rlp
LOG_FOR_REPLAY
.atTrace()
.setMessage("B,{},{},{},R,{},{}")
.addArgument(blockHeader.getNumber())
.addArgument(blockHeader.getBlockHash())
.addArgument(
() ->
maxConfirmedNonceBySender.entrySet().stream()
.map(e -> e.getKey().toHexString() + "," + e.getValue())
.collect(Collectors.joining(",")))
.addArgument(
() ->
reorgNonceRangeBySender.entrySet().stream()
.map(
e ->
e.getKey().toHexString()
+ ","
+ e.getValue().getStart()
+ ","
+ e.getValue().getEndInclusive())
.collect(Collectors.joining(",")))
.addArgument(
() -> {
final BytesValueRLPOutput rlp = new BytesValueRLPOutput();
blockHeader.writeTo(rlp);
return rlp.encoded().toHexString();
})
.log();
}
private Map<Address, Long> maxNonceBySender(final List<Transaction> confirmedTransactions) {
return confirmedTransactions.stream()
.collect(
groupingBy(
Transaction::getSender, mapping(Transaction::getNonce, reducing(0L, Math::max))));
}
private Map<Address, LongRange> nonceRangeBySender(
final List<Transaction> confirmedTransactions) {
class MutableLongRange {
long start = Long.MAX_VALUE;
long end = 0;
void update(final long nonce) {
if (nonce < start) {
start = nonce;
}
if (nonce > end) {
end = nonce;
}
}
MutableLongRange combine(final MutableLongRange other) {
update(other.start);
update(other.end);
return this;
}
LongRange toImmutable() {
return new LongRange(start, end);
}
}
return confirmedTransactions.stream()
.collect(
groupingBy(
Transaction::getSender,
mapping(
Transaction::getNonce,
Collector.of(
MutableLongRange::new,
MutableLongRange::update,
MutableLongRange::combine,
MutableLongRange::toImmutable))));
}
@Override
public synchronized String toTraceLog() {
return "";
}
@Override
public synchronized String logStats() {
return prioritizedTransactions.logStats();
}
@Override
public Optional<Transaction> restoreBlob(final Transaction transaction) {
return prioritizedTransactions.getBlobCache().restoreBlob(transaction);
}
}