AbstractTransactionsLayer.java

/*
 * Copyright Hyperledger Besu Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ADDED;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.TRY_NEXT_LAYER;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.CONFIRMED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.CROSS_LAYER_REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.PROMOTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.REPLACED;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.datatypes.TransactionType;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionAddedListener;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import org.hyperledger.besu.util.Subscribers;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractTransactionsLayer implements TransactionsLayer {
  private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionsLayer.class);
  private static final NavigableMap<Long, PendingTransaction> EMPTY_SENDER_TXS = new TreeMap<>();
  protected final TransactionPoolConfiguration poolConfig;
  protected final TransactionsLayer nextLayer;
  protected final BiFunction<PendingTransaction, PendingTransaction, Boolean>
      transactionReplacementTester;
  protected final TransactionPoolMetrics metrics;
  protected final Map<Hash, PendingTransaction> pendingTransactions = new HashMap<>();
  protected final Map<Address, NavigableMap<Long, PendingTransaction>> txsBySender =
      new HashMap<>();
  private final Subscribers<PendingTransactionAddedListener> onAddedListeners =
      Subscribers.create();
  private final Subscribers<PendingTransactionDroppedListener> onDroppedListeners =
      Subscribers.create();
  private OptionalLong nextLayerOnAddedListenerId = OptionalLong.empty();
  private OptionalLong nextLayerOnDroppedListenerId = OptionalLong.empty();
  protected long spaceUsed = 0;
  protected final int[] txCountByType = new int[TransactionType.values().length];
  private final BlobCache blobCache;

  protected AbstractTransactionsLayer(
      final TransactionPoolConfiguration poolConfig,
      final TransactionsLayer nextLayer,
      final BiFunction<PendingTransaction, PendingTransaction, Boolean>
          transactionReplacementTester,
      final TransactionPoolMetrics metrics,
      final BlobCache blobCache) {
    this.poolConfig = poolConfig;
    this.nextLayer = nextLayer;
    this.transactionReplacementTester = transactionReplacementTester;
    this.metrics = metrics;
    metrics.initSpaceUsed(this::getLayerSpaceUsed, name());
    metrics.initTransactionCount(pendingTransactions::size, name());
    metrics.initUniqueSenderCount(txsBySender::size, name());
    Arrays.stream(TransactionType.values())
        .forEach(
            type ->
                metrics.initTransactionCountByType(
                    () -> txCountByType[type.ordinal()], name(), type));
    this.blobCache = blobCache;
  }

  protected abstract boolean gapsAllowed();

  @Override
  public void reset() {
    pendingTransactions.clear();
    txsBySender.clear();
    spaceUsed = 0;
    Arrays.fill(txCountByType, 0);
    nextLayer.reset();
  }

  @Override
  public Optional<Transaction> getByHash(final Hash transactionHash) {
    final var currLayerTx = pendingTransactions.get(transactionHash);
    if (currLayerTx == null) {
      return nextLayer.getByHash(transactionHash);
    }
    return Optional.of(currLayerTx.getTransaction());
  }

  @Override
  public boolean contains(final Transaction transaction) {
    return pendingTransactions.containsKey(transaction.getHash())
        || nextLayer.contains(transaction);
  }

  @Override
  public List<PendingTransaction> getAll() {
    final List<PendingTransaction> allNextLayers = nextLayer.getAll();
    final List<PendingTransaction> allTxs =
        new ArrayList<>(pendingTransactions.size() + allNextLayers.size());
    allTxs.addAll(pendingTransactions.values());
    allTxs.addAll(allNextLayers);
    return allTxs;
  }

  @Override
  public long getCumulativeUsedSpace() {
    return getLayerSpaceUsed() + nextLayer.getCumulativeUsedSpace();
  }

  protected long getLayerSpaceUsed() {
    return spaceUsed;
  }

  protected abstract TransactionAddedResult canAdd(
      final PendingTransaction pendingTransaction, final int gap);

  @Override
  public TransactionAddedResult add(final PendingTransaction pendingTransaction, final int gap) {

    // is replacing an existing one?
    TransactionAddedResult addStatus = maybeReplaceTransaction(pendingTransaction);
    if (addStatus == null) {
      addStatus = canAdd(pendingTransaction, gap);
    }

    if (addStatus.equals(TRY_NEXT_LAYER)) {
      return addToNextLayer(pendingTransaction, gap);
    }

    if (addStatus.isSuccess()) {
      processAdded(pendingTransaction.detachedCopy());
      addStatus.maybeReplacedTransaction().ifPresent(this::replaced);

      nextLayer.notifyAdded(pendingTransaction);

      if (!maybeFull()) {
        // if there is space try to see if the added tx filled some gaps
        tryFillGap(addStatus, pendingTransaction);
      }

      notifyTransactionAdded(pendingTransaction);
    } else {
      final var rejectReason = addStatus.maybeInvalidReason().orElseThrow();
      metrics.incrementRejected(pendingTransaction, rejectReason, name());
      LOG.atTrace()
          .setMessage("Transaction {} rejected reason {}")
          .addArgument(pendingTransaction::toTraceLog)
          .addArgument(rejectReason)
          .log();
    }

    return addStatus;
  }

  private boolean maybeFull() {
    final long cacheFreeSpace = cacheFreeSpace();
    final int overflowTxsCount = pendingTransactions.size() - maxTransactionsNumber();
    if (cacheFreeSpace < 0 || overflowTxsCount > 0) {
      LOG.atDebug()
          .setMessage("Layer full: {}")
          .addArgument(
              () ->
                  cacheFreeSpace < 0
                      ? "need to free " + (-cacheFreeSpace) + " space"
                      : "need to evict " + overflowTxsCount + " transaction(s)")
          .log();

      evict(-cacheFreeSpace, overflowTxsCount);
      return true;
    }
    return false;
  }

  private void tryFillGap(
      final TransactionAddedResult addStatus, final PendingTransaction pendingTransaction) {
    // it makes sense to fill gaps only if the add is not a replacement and this layer does not
    // allow gaps
    if (!addStatus.isReplacement() && !gapsAllowed()) {
      final PendingTransaction promotedTx =
          nextLayer.promoteFor(pendingTransaction.getSender(), pendingTransaction.getNonce());
      if (promotedTx != null) {
        processAdded(promotedTx);
        if (!maybeFull()) {
          tryFillGap(ADDED, promotedTx);
        }
      }
    }
  }

  @Override
  public void notifyAdded(final PendingTransaction pendingTransaction) {
    final Address sender = pendingTransaction.getSender();
    final var senderTxs = txsBySender.get(sender);
    if (senderTxs != null) {
      if (senderTxs.firstKey() < pendingTransaction.getNonce()) {
        // in the case the world state has been updated but the confirmed txs have not yet been
        // processed
        confirmed(sender, pendingTransaction.getNonce());
      } else if (senderTxs.firstKey() == pendingTransaction.getNonce()) {
        // it is a cross layer replacement, namely added to a previous layer
        final PendingTransaction replacedTx = senderTxs.pollFirstEntry().getValue();
        processRemove(senderTxs, replacedTx.getTransaction(), CROSS_LAYER_REPLACED);

        if (senderTxs.isEmpty()) {
          txsBySender.remove(sender);
        }
      } else {
        internalNotifyAdded(senderTxs, pendingTransaction);
      }
    }
    nextLayer.notifyAdded(pendingTransaction);
  }

  protected abstract void internalNotifyAdded(
      final NavigableMap<Long, PendingTransaction> senderTxs,
      final PendingTransaction pendingTransaction);

  @Override
  public PendingTransaction promoteFor(final Address sender, final long nonce) {
    final var senderTxs = txsBySender.get(sender);
    if (senderTxs != null) {
      long expectedNonce = nonce + 1;
      if (senderTxs.firstKey() == expectedNonce) {
        final PendingTransaction promotedTx = senderTxs.pollFirstEntry().getValue();
        processRemove(senderTxs, promotedTx.getTransaction(), PROMOTED);
        metrics.incrementRemoved(promotedTx, "promoted", name());

        if (senderTxs.isEmpty()) {
          txsBySender.remove(sender);
        }
        return promotedTx;
      }
    }
    return nextLayer.promoteFor(sender, nonce);
  }

  private TransactionAddedResult addToNextLayer(
      final PendingTransaction pendingTransaction, final int distance) {
    return addToNextLayer(
        txsBySender.getOrDefault(pendingTransaction.getSender(), EMPTY_SENDER_TXS),
        pendingTransaction,
        distance);
  }

  protected TransactionAddedResult addToNextLayer(
      final NavigableMap<Long, PendingTransaction> senderTxs,
      final PendingTransaction pendingTransaction,
      final int distance) {
    final int nextLayerDistance;
    if (senderTxs.isEmpty()) {
      nextLayerDistance = distance;
    } else {
      nextLayerDistance = (int) (pendingTransaction.getNonce() - (senderTxs.lastKey() + 1));
    }
    return nextLayer.add(pendingTransaction, nextLayerDistance);
  }

  private void processAdded(final PendingTransaction addedTx) {
    pendingTransactions.put(addedTx.getHash(), addedTx);
    final var senderTxs = txsBySender.computeIfAbsent(addedTx.getSender(), s -> new TreeMap<>());
    senderTxs.put(addedTx.getNonce(), addedTx);
    increaseCounters(addedTx);
    metrics.incrementAdded(addedTx, name());
    internalAdd(senderTxs, addedTx);
  }

  protected abstract void internalAdd(
      final NavigableMap<Long, PendingTransaction> senderTxs, final PendingTransaction addedTx);

  protected abstract int maxTransactionsNumber();

  private void evict(final long spaceToFree, final int txsToEvict) {
    final var evictableTx = getEvictable();
    if (evictableTx != null) {
      final var lessReadySender = evictableTx.getSender();
      final var lessReadySenderTxs = txsBySender.get(lessReadySender);

      long evictedSize = 0;
      int evictedCount = 0;
      PendingTransaction lastTx;
      // lastTx must never be null, because the sender have at least the lessReadyTx
      while ((evictedSize < spaceToFree || txsToEvict > evictedCount)
          && !lessReadySenderTxs.isEmpty()) {
        lastTx = lessReadySenderTxs.pollLastEntry().getValue();
        processEvict(lessReadySenderTxs, lastTx, EVICTED);
        ++evictedCount;
        evictedSize += lastTx.memorySize();
        // evicted can always be added to the next layer
        addToNextLayer(lessReadySenderTxs, lastTx, 0);
      }

      if (lessReadySenderTxs.isEmpty()) {
        txsBySender.remove(lessReadySender);
      }

      final long newSpaceToFree = spaceToFree - evictedSize;
      final int newTxsToEvict = txsToEvict - evictedCount;

      if ((newSpaceToFree > 0 || newTxsToEvict > 0) && !txsBySender.isEmpty()) {
        // try next less valuable sender
        evict(newSpaceToFree, newTxsToEvict);
      }
    }
  }

  protected void replaced(final PendingTransaction replacedTx) {
    pendingTransactions.remove(replacedTx.getHash());
    decreaseCounters(replacedTx);
    metrics.incrementRemoved(replacedTx, REPLACED.label(), name());
    internalReplaced(replacedTx);
    notifyTransactionDropped(replacedTx);
  }

  protected abstract void internalReplaced(final PendingTransaction replacedTx);

  private TransactionAddedResult maybeReplaceTransaction(final PendingTransaction incomingTx) {

    final var existingTxs = txsBySender.get(incomingTx.getSender());

    if (existingTxs != null) {
      final var existingReadyTx = existingTxs.get(incomingTx.getNonce());
      if (existingReadyTx != null) {

        if (existingReadyTx.getHash().equals(incomingTx.getHash())) {
          return ALREADY_KNOWN;
        }

        if (!transactionReplacementTester.apply(existingReadyTx, incomingTx)) {
          return REJECTED_UNDERPRICED_REPLACEMENT;
        }
        return TransactionAddedResult.createForReplacement(existingReadyTx);
      }
    }
    return null;
  }

  protected PendingTransaction processRemove(
      final NavigableMap<Long, PendingTransaction> senderTxs,
      final Transaction transaction,
      final RemovalReason removalReason) {
    final PendingTransaction removedTx = pendingTransactions.remove(transaction.getHash());

    if (removedTx != null) {
      decreaseCounters(removedTx);
      metrics.incrementRemoved(removedTx, removalReason.label(), name());
      internalRemove(senderTxs, removedTx, removalReason);
    }
    return removedTx;
  }

  protected PendingTransaction processEvict(
      final NavigableMap<Long, PendingTransaction> senderTxs,
      final PendingTransaction evictedTx,
      final RemovalReason reason) {
    final PendingTransaction removedTx = pendingTransactions.remove(evictedTx.getHash());
    if (removedTx != null) {
      decreaseCounters(evictedTx);
      metrics.incrementRemoved(evictedTx, reason.label(), name());
      internalEvict(senderTxs, removedTx);
    }
    return removedTx;
  }

  protected abstract void internalEvict(
      final NavigableMap<Long, PendingTransaction> lessReadySenderTxs,
      final PendingTransaction evictedTx);

  @Override
  public final void blockAdded(
      final FeeMarket feeMarket,
      final BlockHeader blockHeader,
      final Map<Address, Long> maxConfirmedNonceBySender) {
    LOG.atDebug()
        .setMessage("Managing new added block {}")
        .addArgument(blockHeader::toLogString)
        .log();

    nextLayer.blockAdded(feeMarket, blockHeader, maxConfirmedNonceBySender);
    maxConfirmedNonceBySender.forEach(this::confirmed);
    internalBlockAdded(blockHeader, feeMarket);
    promoteTransactions();
  }

  protected abstract void internalBlockAdded(
      final BlockHeader blockHeader, final FeeMarket feeMarket);

  final void promoteTransactions() {
    final int freeSlots = maxTransactionsNumber() - pendingTransactions.size();
    final long freeSpace = cacheFreeSpace();

    if (freeSlots > 0 && freeSpace > 0) {
      nextLayer
          .promote(this::promotionFilter, cacheFreeSpace(), freeSlots)
          .forEach(this::processAdded);
    }
  }

  private void confirmed(final Address sender, final long maxConfirmedNonce) {
    final var senderTxs = txsBySender.get(sender);

    if (senderTxs != null) {
      final var confirmedTxs = senderTxs.headMap(maxConfirmedNonce, true);
      final var highestNonceRemovedTx =
          confirmedTxs.isEmpty() ? null : confirmedTxs.lastEntry().getValue();

      final var itConfirmedTxs = confirmedTxs.values().iterator();
      while (itConfirmedTxs.hasNext()) {
        final var confirmedTx = itConfirmedTxs.next();
        itConfirmedTxs.remove();
        if (confirmedTx.getTransaction().getBlobsWithCommitments().isPresent()) {
          this.blobCache.cacheBlobs(confirmedTx.getTransaction());
        }
        processRemove(senderTxs, confirmedTx.getTransaction(), CONFIRMED);

        metrics.incrementRemoved(confirmedTx, "confirmed", name());
        LOG.atTrace()
            .setMessage("Removed confirmed pending transactions {}")
            .addArgument(confirmedTx::toTraceLog)
            .log();
      }

      if (senderTxs.isEmpty()) {
        txsBySender.remove(sender);
      } else {
        internalConfirmed(senderTxs, sender, maxConfirmedNonce, highestNonceRemovedTx);
      }
    }
  }

  protected abstract void internalConfirmed(
      final NavigableMap<Long, PendingTransaction> senderTxs,
      final Address sender,
      final long maxConfirmedNonce,
      final PendingTransaction highestNonceRemovedTx);

  protected abstract void internalRemove(
      final NavigableMap<Long, PendingTransaction> senderTxs,
      final PendingTransaction pendingTransaction,
      final RemovalReason removalReason);

  protected abstract PendingTransaction getEvictable();

  protected void increaseCounters(final PendingTransaction pendingTransaction) {
    spaceUsed += pendingTransaction.memorySize();
    ++txCountByType[pendingTransaction.getTransaction().getType().ordinal()];
  }

  protected void decreaseCounters(final PendingTransaction pendingTransaction) {
    spaceUsed -= pendingTransaction.memorySize();
    --txCountByType[pendingTransaction.getTransaction().getType().ordinal()];
  }

  protected abstract long cacheFreeSpace();

  protected abstract boolean promotionFilter(PendingTransaction pendingTransaction);

  @Override
  public List<Transaction> getAllLocal() {
    final var localTxs =
        pendingTransactions.values().stream()
            .filter(PendingTransaction::isReceivedFromLocalSource)
            .map(PendingTransaction::getTransaction)
            .collect(Collectors.toCollection(ArrayList::new));
    localTxs.addAll(nextLayer.getAllLocal());
    return localTxs;
  }

  @Override
  public List<Transaction> getAllPriority() {
    final var priorityTxs =
        pendingTransactions.values().stream()
            .filter(PendingTransaction::hasPriority)
            .map(PendingTransaction::getTransaction)
            .collect(Collectors.toCollection(ArrayList::new));
    priorityTxs.addAll(nextLayer.getAllPriority());
    return priorityTxs;
  }

  Stream<PendingTransaction> stream(final Address sender) {
    return txsBySender.getOrDefault(sender, EMPTY_SENDER_TXS).values().stream();
  }

  @Override
  public List<PendingTransaction> getAllFor(final Address sender) {
    return Stream.concat(stream(sender), nextLayer.getAllFor(sender).stream()).toList();
  }

  abstract Stream<PendingTransaction> stream();

  @Override
  public int count() {
    return pendingTransactions.size() + nextLayer.count();
  }

  protected void notifyTransactionAdded(final PendingTransaction pendingTransaction) {
    onAddedListeners.forEach(
        listener -> listener.onTransactionAdded(pendingTransaction.getTransaction()));
  }

  protected void notifyTransactionDropped(final PendingTransaction pendingTransaction) {
    onDroppedListeners.forEach(
        listener -> listener.onTransactionDropped(pendingTransaction.getTransaction()));
  }

  @Override
  public long subscribeToAdded(final PendingTransactionAddedListener listener) {
    nextLayerOnAddedListenerId = OptionalLong.of(nextLayer.subscribeToAdded(listener));
    return onAddedListeners.subscribe(listener);
  }

  @Override
  public void unsubscribeFromAdded(final long id) {
    nextLayerOnAddedListenerId.ifPresent(nextLayer::unsubscribeFromAdded);
    onAddedListeners.unsubscribe(id);
  }

  @Override
  public long subscribeToDropped(final PendingTransactionDroppedListener listener) {
    nextLayerOnDroppedListenerId = OptionalLong.of(nextLayer.subscribeToDropped(listener));
    return onDroppedListeners.subscribe(listener);
  }

  @Override
  public void unsubscribeFromDropped(final long id) {
    nextLayerOnDroppedListenerId.ifPresent(nextLayer::unsubscribeFromDropped);
    onDroppedListeners.unsubscribe(id);
  }

  @Override
  public String logStats() {
    return internalLogStats() + " | " + nextLayer.logStats();
  }

  @Override
  public String logSender(final Address sender) {
    final var senderTxs = txsBySender.get(sender);
    return name()
        + "["
        + (Objects.isNull(senderTxs) ? "Empty" : senderTxs.keySet())
        + "] "
        + nextLayer.logSender(sender);
  }

  protected abstract String internalLogStats();

  boolean consistencyCheck(
      final Map<Address, NavigableMap<Long, PendingTransaction>> prevLayerTxsBySender) {
    final BinaryOperator<PendingTransaction> noMergeExpected =
        (a, b) -> {
          throw new IllegalArgumentException();
        };
    final var controlTxsBySender =
        pendingTransactions.values().stream()
            .collect(
                Collectors.groupingBy(
                    PendingTransaction::getSender,
                    Collectors.toMap(
                        PendingTransaction::getNonce,
                        Function.identity(),
                        noMergeExpected,
                        TreeMap::new)));

    assert txsBySender.equals(controlTxsBySender)
        : "pendingTransactions and txsBySender do not contain the same txs";

    assert pendingTransactions.values().stream().mapToInt(PendingTransaction::memorySize).sum()
            == spaceUsed
        : "space used does not match";

    internalConsistencyCheck(prevLayerTxsBySender);

    if (nextLayer instanceof AbstractTransactionsLayer) {
      txsBySender.forEach(
          (sender, txsByNonce) ->
              prevLayerTxsBySender
                  .computeIfAbsent(sender, s -> new TreeMap<>())
                  .putAll(txsByNonce));
      return ((AbstractTransactionsLayer) nextLayer).consistencyCheck(prevLayerTxsBySender);
    }
    return true;
  }

  protected abstract void internalConsistencyCheck(
      final Map<Address, NavigableMap<Long, PendingTransaction>> prevLayerTxsBySender);

  public BlobCache getBlobCache() {
    return blobCache;
  }
}