GetBodiesFromPeerTask.java

/*
 * Copyright ConsenSys AG.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.ethereum.eth.manager.task;

import static com.google.common.base.Preconditions.checkArgument;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Deposit;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.ValidatorExit;
import org.hyperledger.besu.ethereum.core.Withdrawal;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.PendingPeerRequest;
import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.mainnet.BodyValidation;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Requests bodies from a peer by header, matches up headers to bodies, and returns blocks. */
public class GetBodiesFromPeerTask extends AbstractPeerRequestTask<List<Block>> {
  private static final Logger LOG = LoggerFactory.getLogger(GetBodiesFromPeerTask.class);

  private final ProtocolSchedule protocolSchedule;
  private final List<BlockHeader> headers;
  private final Map<BodyIdentifier, List<BlockHeader>> bodyToHeaders = new HashMap<>();

  private GetBodiesFromPeerTask(
      final ProtocolSchedule protocolSchedule,
      final EthContext ethContext,
      final List<BlockHeader> headers,
      final MetricsSystem metricsSystem) {
    super(ethContext, EthPV62.GET_BLOCK_BODIES, metricsSystem);
    checkArgument(headers.size() > 0);
    this.protocolSchedule = protocolSchedule;

    this.headers = headers;
    headers.forEach(
        (header) -> {
          final BodyIdentifier bodyId = new BodyIdentifier(header);
          bodyToHeaders.putIfAbsent(bodyId, new ArrayList<>(headers.size()));
          bodyToHeaders.get(bodyId).add(header);
        });
  }

  public static GetBodiesFromPeerTask forHeaders(
      final ProtocolSchedule protocolSchedule,
      final EthContext ethContext,
      final List<BlockHeader> headers,
      final MetricsSystem metricsSystem) {
    return new GetBodiesFromPeerTask(protocolSchedule, ethContext, headers, metricsSystem);
  }

  @Override
  protected PendingPeerRequest sendRequest() {
    final List<Hash> blockHashes =
        headers.stream().map(BlockHeader::getHash).collect(Collectors.toList());
    LOG.atTrace()
        .setMessage("Requesting {} bodies with hashes {}.")
        .addArgument(blockHashes.size())
        .addArgument(blockHashes)
        .log();
    final long minimumRequiredBlockNumber = headers.get(headers.size() - 1).getNumber();

    return sendRequestToPeer(
        peer -> {
          LOG.atTrace()
              .setMessage("Requesting {} bodies from peer {}.")
              .addArgument(blockHashes.size())
              .addArgument(peer)
              .log();
          return peer.getBodies(blockHashes);
        },
        minimumRequiredBlockNumber);
  }

  @Override
  protected Optional<List<Block>> processResponse(
      final boolean streamClosed, final MessageData message, final EthPeer peer) {
    if (streamClosed) {
      // All outstanding requests have been responded to, and we still haven't found the response
      // we wanted. It must have been empty or contain data that didn't match.
      peer.recordUselessResponse("bodies");
      return Optional.of(Collections.emptyList());
    }

    final BlockBodiesMessage bodiesMessage = BlockBodiesMessage.readFrom(message);
    final List<BlockBody> bodies = bodiesMessage.bodies(protocolSchedule);
    if (bodies.size() == 0) {
      // Message contains no data - nothing to do
      LOG.debug("Message contains no data. Peer: {}", peer);
      return Optional.empty();
    } else if (bodies.size() > headers.size()) {
      // Message doesn't match our request - nothing to do
      LOG.debug("Message doesn't match our request. Peer: {}", peer);
      return Optional.empty();
    }

    final List<Block> blocks = new ArrayList<>(headers.size());
    for (final BlockBody body : bodies) {
      final List<BlockHeader> headers = bodyToHeaders.get(new BodyIdentifier(body));
      if (headers == null) {
        // This message contains unrelated bodies - exit
        LOG.debug("This message contains unrelated bodies. Peer: {}", peer);
        return Optional.empty();
      }
      headers.forEach(h -> blocks.add(new Block(h, body)));
      // Clear processed headers
      headers.clear();
    }
    LOG.atTrace()
        .setMessage("Associated {} bodies with {} headers to get {} blocks with these hashes: {}")
        .addArgument(bodies.size())
        .addArgument(headers.size())
        .addArgument(blocks.size())
        .addArgument(() -> blocks.stream().map(Block::toLogString).toList())
        .log();
    return Optional.of(blocks);
  }

  static class BodyIdentifier {
    private final Bytes32 transactionsRoot;
    private final Bytes32 ommersHash;
    private final Bytes32 withdrawalsRoot;
    private final Bytes32 depositsRoot;
    private final Bytes32 exitsRoot;

    public BodyIdentifier(
        final Bytes32 transactionsRoot,
        final Bytes32 ommersHash,
        final Bytes32 withdrawalsRoot,
        final Bytes32 depositsRoot,
        final Bytes32 exitsRoot) {
      this.transactionsRoot = transactionsRoot;
      this.ommersHash = ommersHash;
      this.withdrawalsRoot = withdrawalsRoot;
      this.depositsRoot = depositsRoot;
      this.exitsRoot = exitsRoot;
    }

    public BodyIdentifier(final BlockBody body) {
      this(
          body.getTransactions(),
          body.getOmmers(),
          body.getWithdrawals(),
          body.getDeposits(),
          body.getExits());
    }

    public BodyIdentifier(
        final List<Transaction> transactions,
        final List<BlockHeader> ommers,
        final Optional<List<Withdrawal>> withdrawals,
        final Optional<List<Deposit>> deposits,
        final Optional<List<ValidatorExit>> exits) {
      this(
          BodyValidation.transactionsRoot(transactions),
          BodyValidation.ommersHash(ommers),
          withdrawals.map(BodyValidation::withdrawalsRoot).orElse(null),
          deposits.map(BodyValidation::depositsRoot).orElse(null),
          exits.map(BodyValidation::exitsRoot).orElse(null));
    }

    public BodyIdentifier(final BlockHeader header) {
      this(
          header.getTransactionsRoot(),
          header.getOmmersHash(),
          header.getWithdrawalsRoot().orElse(null),
          header.getDepositsRoot().orElse(null),
          header.getExitsRoot().orElse(null));
    }

    @Override
    public boolean equals(final Object o) {
      if (this == o) return true;
      if (o == null || getClass() != o.getClass()) return false;
      BodyIdentifier that = (BodyIdentifier) o;
      return Objects.equals(transactionsRoot, that.transactionsRoot)
          && Objects.equals(ommersHash, that.ommersHash)
          && Objects.equals(withdrawalsRoot, that.withdrawalsRoot)
          && Objects.equals(depositsRoot, that.depositsRoot)
          && Objects.equals(exitsRoot, that.exitsRoot);
    }

    @Override
    public int hashCode() {
      return Objects.hash(transactionsRoot, ommersHash, withdrawalsRoot, depositsRoot, exitsRoot);
    }
  }
}