NewPooledTransactionHashesMessageProcessor.java

/*
 * Copyright ConsenSys AG.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.ethereum.eth.transactions;

import static java.time.Instant.now;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.BufferedGetPooledTransactionsFromPeerFetcher;
import org.hyperledger.besu.ethereum.eth.messages.NewPooledTransactionHashesMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.ethereum.rlp.RLPException;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NewPooledTransactionHashesMessageProcessor {

  private static final Logger LOG =
      LoggerFactory.getLogger(NewPooledTransactionHashesMessageProcessor.class);

  static final String METRIC_LABEL = "new_pooled_transaction_hashes";

  private final ConcurrentHashMap<EthPeer, BufferedGetPooledTransactionsFromPeerFetcher>
      scheduledTasks;

  private final PeerTransactionTracker transactionTracker;
  private final TransactionPool transactionPool;
  private final TransactionPoolConfiguration transactionPoolConfiguration;
  private final EthContext ethContext;
  private final TransactionPoolMetrics metrics;

  public NewPooledTransactionHashesMessageProcessor(
      final PeerTransactionTracker transactionTracker,
      final TransactionPool transactionPool,
      final TransactionPoolConfiguration transactionPoolConfiguration,
      final EthContext ethContext,
      final TransactionPoolMetrics metrics) {
    this.transactionTracker = transactionTracker;
    this.transactionPool = transactionPool;
    this.transactionPoolConfiguration = transactionPoolConfiguration;
    this.ethContext = ethContext;
    this.metrics = metrics;
    metrics.initExpiredMessagesCounter(METRIC_LABEL);
    this.scheduledTasks = new ConcurrentHashMap<>();
  }

  void processNewPooledTransactionHashesMessage(
      final EthPeer peer,
      final NewPooledTransactionHashesMessage transactionsMessage,
      final Instant startedAt,
      final Duration keepAlive) {
    // Check if message is not expired.
    if (startedAt.plus(keepAlive).isAfter(now())) {
      this.processNewPooledTransactionHashesMessage(peer, transactionsMessage);
    } else {
      metrics.incrementExpiredMessages(METRIC_LABEL);
    }
  }

  @SuppressWarnings("UnstableApiUsage")
  private void processNewPooledTransactionHashesMessage(
      final EthPeer peer, final NewPooledTransactionHashesMessage transactionsMessage) {
    try {
      final List<Hash> incomingTransactionHashes = transactionsMessage.pendingTransactionHashes();

      LOG.atTrace()
          .setMessage(
              "Received pooled transaction hashes message from {}... incoming hashes {}, incoming list {}")
          .addArgument(() -> peer == null ? null : peer.getLoggableId())
          .addArgument(incomingTransactionHashes::size)
          .addArgument(incomingTransactionHashes)
          .log();

      final BufferedGetPooledTransactionsFromPeerFetcher bufferedTask =
          scheduledTasks.computeIfAbsent(
              peer,
              ethPeer -> {
                final ScheduledFuture<?> scheduledFuture =
                    ethContext
                        .getScheduler()
                        .scheduleFutureTaskWithFixedDelay(
                            new FetcherCreatorTask(peer),
                            transactionPoolConfiguration
                                .getUnstable()
                                .getEth65TrxAnnouncedBufferingPeriod(),
                            transactionPoolConfiguration
                                .getUnstable()
                                .getEth65TrxAnnouncedBufferingPeriod());

                return new BufferedGetPooledTransactionsFromPeerFetcher(
                    ethContext,
                    scheduledFuture,
                    peer,
                    transactionPool,
                    transactionTracker,
                    metrics,
                    METRIC_LABEL);
              });

      bufferedTask.addHashes(
          incomingTransactionHashes.stream()
              .filter(hash -> transactionPool.getTransactionByHash(hash).isEmpty())
              .collect(Collectors.toList()));
    } catch (final RLPException ex) {
      if (peer != null) {
        LOG.debug(
            "Malformed pooled transaction hashes message received (BREACH_OF_PROTOCOL), disconnecting: {}",
            peer,
            ex);
        LOG.trace("Message data: {}", transactionsMessage.getData());
        peer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL_MALFORMED_MESSAGE_RECEIVED);
      }
    }
  }

  public class FetcherCreatorTask implements Runnable {
    final EthPeer peer;

    public FetcherCreatorTask(final EthPeer peer) {
      this.peer = peer;
    }

    @Override
    public void run() {
      if (peer != null) {
        if (peer.isDisconnected()) {
          scheduledTasks.remove(peer).getScheduledFuture().cancel(true);
        } else if (peer.hasAvailableRequestCapacity()) {
          scheduledTasks.get(peer).requestTransactions();
        }
      }
    }
  }
}