SparseTransactions.java
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.PROMOTED;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import com.google.common.collect.Iterables;
public class SparseTransactions extends AbstractTransactionsLayer {
private final NavigableSet<PendingTransaction> sparseEvictionOrder =
new TreeSet<>(
Comparator.comparing(PendingTransaction::hasPriority)
.thenComparing(PendingTransaction::getSequence));
private final Map<Address, Integer> gapBySender = new HashMap<>();
private final List<SendersByPriority> orderByGap;
public SparseTransactions(
final TransactionPoolConfiguration poolConfig,
final TransactionsLayer nextLayer,
final TransactionPoolMetrics metrics,
final BiFunction<PendingTransaction, PendingTransaction, Boolean>
transactionReplacementTester,
final BlobCache blobCache) {
super(poolConfig, nextLayer, transactionReplacementTester, metrics, blobCache);
orderByGap = new ArrayList<>(poolConfig.getMaxFutureBySender());
IntStream.range(0, poolConfig.getMaxFutureBySender())
.forEach(i -> orderByGap.add(new SendersByPriority()));
}
@Override
public String name() {
return "sparse";
}
@Override
protected long cacheFreeSpace() {
return poolConfig.getPendingTransactionsLayerMaxCapacityBytes() - getLayerSpaceUsed();
}
@Override
protected boolean gapsAllowed() {
return true;
}
@Override
public void reset() {
super.reset();
sparseEvictionOrder.clear();
gapBySender.clear();
orderByGap.forEach(SendersByPriority::clear);
}
@Override
protected TransactionAddedResult canAdd(
final PendingTransaction pendingTransaction, final int gap) {
gapBySender.compute(
pendingTransaction.getSender(),
(sender, currGap) -> {
if (currGap == null) {
orderByGap.get(gap).add(pendingTransaction);
return gap;
}
if (Long.compareUnsigned(
pendingTransaction.getNonce(), txsBySender.get(sender).firstKey())
< 0) {
orderByGap.get(currGap).remove(sender);
orderByGap.get(gap).add(pendingTransaction);
return gap;
}
return currGap;
});
return TransactionAddedResult.ADDED;
}
@Override
protected void internalAdd(
final NavigableMap<Long, PendingTransaction> senderTxs, final PendingTransaction addedTx) {
sparseEvictionOrder.add(addedTx);
}
@Override
protected int maxTransactionsNumber() {
return Integer.MAX_VALUE;
}
@Override
protected void internalReplaced(final PendingTransaction replacedTx) {
sparseEvictionOrder.remove(replacedTx);
}
@Override
protected void internalBlockAdded(final BlockHeader blockHeader, final FeeMarket feeMarket) {}
/**
* We only want to promote transactions that have gap == 0, so there will be no gap in the prev
* layers. A promoted transaction is removed from this layer, and the gap data is updated for its
* sender.
*
* @param promotionFilter the prev layer's promotion filter
* @param freeSpace max amount of memory promoted txs can occupy
* @param freeSlots max number of promoted txs
* @return a list of transactions promoted to the prev layer
*/
@Override
public List<PendingTransaction> promote(
final Predicate<PendingTransaction> promotionFilter,
final long freeSpace,
final int freeSlots) {
long accumulatedSpace = 0;
final List<PendingTransaction> promotedTxs = new ArrayList<>();
final var zeroGapSenders = orderByGap.get(0);
search:
for (final var sender : zeroGapSenders) {
final var senderSeqTxs = getSequentialSubset(txsBySender.get(sender));
for (final var candidateTx : senderSeqTxs.values()) {
if (promotionFilter.test(candidateTx)) {
accumulatedSpace += candidateTx.memorySize();
if (promotedTxs.size() < freeSlots && accumulatedSpace <= freeSpace) {
promotedTxs.add(candidateTx);
} else {
// no room for more txs the search is over exit the loops
break search;
}
} else {
// skip remaining txs for this sender
break;
}
}
}
// remove promoted txs from this layer
promotedTxs.forEach(
promotedTx -> {
final var sender = promotedTx.getSender();
final var senderTxs = txsBySender.get(sender);
senderTxs.remove(promotedTx.getNonce());
processRemove(senderTxs, promotedTx.getTransaction(), PROMOTED);
if (senderTxs.isEmpty()) {
txsBySender.remove(sender);
orderByGap.get(0).remove(sender);
gapBySender.remove(sender);
} else {
final long firstNonce = senderTxs.firstKey();
final int newGap = (int) (firstNonce - (promotedTx.getNonce() + 1));
if (newGap != 0) {
updateGap(sender, 0, newGap);
}
}
});
if (!promotedTxs.isEmpty()) {
// since we removed some txs we can try to promote from next layer
promoteTransactions();
}
return promotedTxs;
}
private NavigableMap<Long, PendingTransaction> getSequentialSubset(
final NavigableMap<Long, PendingTransaction> senderTxs) {
long lastSequentialNonce = senderTxs.firstKey();
for (final long nonce : senderTxs.tailMap(lastSequentialNonce, false).keySet()) {
if (nonce == lastSequentialNonce + 1) {
++lastSequentialNonce;
} else {
break;
}
}
return senderTxs.headMap(lastSequentialNonce, true);
}
@Override
public void remove(final PendingTransaction invalidatedTx, final RemovalReason reason) {
final var senderTxs = txsBySender.get(invalidatedTx.getSender());
if (senderTxs != null && senderTxs.containsKey(invalidatedTx.getNonce())) {
// gaps are allowed here then just remove
senderTxs.remove(invalidatedTx.getNonce());
processRemove(senderTxs, invalidatedTx.getTransaction(), reason);
if (senderTxs.isEmpty()) {
txsBySender.remove(invalidatedTx.getSender());
}
} else {
nextLayer.remove(invalidatedTx, reason);
}
}
@Override
protected void internalConfirmed(
final NavigableMap<Long, PendingTransaction> senderTxs,
final Address sender,
final long maxConfirmedNonce,
final PendingTransaction highestNonceRemovedTx) {
if (highestNonceRemovedTx != null) {
final int currGap = gapBySender.get(sender);
final int newGap = (int) (senderTxs.firstKey() - (highestNonceRemovedTx.getNonce() + 1));
if (currGap != newGap) {
updateGap(sender, currGap, newGap);
}
} else {
final int currGap = gapBySender.get(sender);
final int newGap = (int) (senderTxs.firstKey() - (maxConfirmedNonce + 1));
if (newGap < currGap) {
updateGap(sender, currGap, newGap);
}
}
}
@Override
protected void internalEvict(
final NavigableMap<Long, PendingTransaction> lessReadySenderTxs,
final PendingTransaction evictedTx) {
sparseEvictionOrder.remove(evictedTx);
if (lessReadySenderTxs.isEmpty()) {
deleteGap(evictedTx.getSender());
}
}
@Override
protected void internalRemove(
final NavigableMap<Long, PendingTransaction> senderTxs,
final PendingTransaction removedTx,
final RemovalReason removalReason) {
sparseEvictionOrder.remove(removedTx);
final Address sender = removedTx.getSender();
if (senderTxs != null && !senderTxs.isEmpty()) {
final int deltaGap = (int) (senderTxs.firstKey() - removedTx.getNonce());
if (deltaGap > 0) {
final int currGap = gapBySender.get(sender);
final int newGap;
if (removalReason.equals(INVALIDATED)) {
newGap = currGap + deltaGap;
} else {
newGap = deltaGap - 1;
}
if (currGap != newGap) {
updateGap(sender, currGap, newGap);
}
}
} else {
deleteGap(sender);
}
}
private void deleteGap(final Address sender) {
orderByGap.get(gapBySender.remove(sender)).remove(sender);
}
@Override
protected PendingTransaction getEvictable() {
return sparseEvictionOrder.first();
}
@Override
protected boolean promotionFilter(final PendingTransaction pendingTransaction) {
return false;
}
@Override
public Stream<PendingTransaction> stream() {
return sparseEvictionOrder.descendingSet().stream();
}
@Override
public OptionalLong getNextNonceFor(final Address sender) {
final Integer gap = gapBySender.get(sender);
if (gap != null && gap == 0) {
final var senderTxs = txsBySender.get(sender);
var currNonce = senderTxs.firstKey();
for (final var nextNonce : senderTxs.keySet()) {
if (nextNonce > currNonce + 1) {
break;
}
currNonce = nextNonce;
}
return OptionalLong.of(currNonce + 1);
}
return OptionalLong.empty();
}
@Override
public OptionalLong getCurrentNonceFor(final Address sender) {
final var senderTxs = txsBySender.get(sender);
if (senderTxs != null) {
final var gap = gapBySender.get(sender);
return OptionalLong.of(senderTxs.firstKey() - gap);
}
return nextLayer.getCurrentNonceFor(sender);
}
@Override
protected void internalNotifyAdded(
final NavigableMap<Long, PendingTransaction> senderTxs,
final PendingTransaction pendingTransaction) {
final Address sender = pendingTransaction.getSender();
final Integer currGap = gapBySender.get(sender);
if (currGap != null) {
final int newGap = (int) (senderTxs.firstKey() - (pendingTransaction.getNonce() + 1));
if (newGap < currGap) {
updateGap(sender, currGap, newGap);
}
}
}
@Override
public String logSender(final Address sender) {
final var senderTxs = txsBySender.get(sender);
return name()
+ "["
+ (Objects.isNull(senderTxs)
? "Empty"
: "gap(" + gapBySender.get(sender) + ") " + senderTxs.keySet())
+ "] "
+ nextLayer.logSender(sender);
}
@Override
public String internalLogStats() {
if (sparseEvictionOrder.isEmpty()) {
return "Sparse: Empty";
}
final Transaction newest = sparseEvictionOrder.last().getTransaction();
final Transaction oldest = sparseEvictionOrder.first().getTransaction();
return "Sparse: "
+ "count="
+ pendingTransactions.size()
+ ", space used: "
+ spaceUsed
+ ", unique senders: "
+ txsBySender.size()
+ ", oldest [gap: "
+ gapBySender.get(oldest.getSender())
+ ", max fee:"
+ oldest.getMaxGasPrice().toHumanReadableString()
+ ", hash: "
+ oldest.getHash()
+ "], newest [gap: "
+ gapBySender.get(newest.getSender())
+ ", max fee: "
+ newest.getMaxGasPrice().toHumanReadableString()
+ ", hash: "
+ newest.getHash()
+ "]";
}
private void updateGap(final Address sender, final int currGap, final int newGap) {
final boolean hasPriority = orderByGap.get(currGap).remove(sender);
orderByGap.get(newGap).add(sender, hasPriority);
gapBySender.put(sender, newGap);
}
@Override
protected void internalConsistencyCheck(
final Map<Address, NavigableMap<Long, PendingTransaction>> prevLayerTxsBySender) {
txsBySender.values().stream()
.filter(senderTxs -> senderTxs.size() > 1)
.map(NavigableMap::entrySet)
.map(Set::iterator)
.forEach(
itNonce -> {
PendingTransaction firstTx = itNonce.next().getValue();
prevLayerTxsBySender.computeIfPresent(
firstTx.getSender(),
(sender, txsByNonce) -> {
final long prevLayerMaxNonce = txsByNonce.lastKey();
assert prevLayerMaxNonce < firstTx.getNonce()
: "first nonce is not greater than previous layer last nonce";
final int gap = (int) (firstTx.getNonce() - (prevLayerMaxNonce + 1));
assert gapBySender.get(firstTx.getSender()).equals(gap) : "gap mismatch";
assert orderByGap.get(gap).contains(firstTx.getSender())
: "orderByGap sender not found";
return txsByNonce;
});
long prevNonce = firstTx.getNonce();
while (itNonce.hasNext()) {
final long currNonce = itNonce.next().getKey();
assert Long.compareUnsigned(prevNonce, currNonce) < 0 : "non incremental nonce";
prevNonce = currNonce;
}
});
}
private static class SendersByPriority implements Iterable<Address> {
final Set<Address> prioritySenders = new HashSet<>();
final Set<Address> standardSenders = new HashSet<>();
void clear() {
prioritySenders.clear();
standardSenders.clear();
}
public void add(final Address sender, final boolean hasPriority) {
if (hasPriority) {
if (standardSenders.contains(sender)) {
throw new IllegalStateException(
"Sender " + sender + " cannot simultaneously have and not have priority");
}
prioritySenders.add(sender);
} else {
if (prioritySenders.contains(sender)) {
throw new IllegalStateException(
"Sender " + sender + " cannot simultaneously have and not have priority");
}
standardSenders.add(sender);
}
}
void add(final PendingTransaction pendingTransaction) {
add(pendingTransaction.getSender(), pendingTransaction.hasPriority());
}
boolean remove(final Address sender) {
if (standardSenders.remove(sender)) {
return false;
}
return prioritySenders.remove(sender);
}
public boolean contains(final Address sender) {
return standardSenders.contains(sender) || prioritySenders.contains(sender);
}
@Override
public Iterator<Address> iterator() {
return Iterables.concat(prioritySenders, standardSenders).iterator();
}
}
}