CompleteBlocksTask.java

/*
 * Copyright ConsenSys AG.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.ethereum.eth.sync.tasks;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Collections.emptyList;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toMap;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.GetBodiesFromPeerTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Given a set of headers, "completes" them by repeatedly requesting additional data (bodies) needed
 * to create the blocks that correspond to the supplied headers.
 */
public class CompleteBlocksTask extends AbstractRetryingPeerTask<List<Block>> {
  private static final Logger LOG = LoggerFactory.getLogger(CompleteBlocksTask.class);

  private static final int MIN_SIZE_INCOMPLETE_LIST = 1;
  private static final int DEFAULT_RETRIES = 4;

  private final EthContext ethContext;
  private final ProtocolSchedule protocolSchedule;

  private final List<BlockHeader> headers;
  private final Map<Long, Block> blocks;
  private final MetricsSystem metricsSystem;

  private CompleteBlocksTask(
      final ProtocolSchedule protocolSchedule,
      final EthContext ethContext,
      final List<BlockHeader> headers,
      final int maxRetries,
      final MetricsSystem metricsSystem) {
    super(ethContext, maxRetries, Collection::isEmpty, metricsSystem);
    checkArgument(headers.size() > 0, "Must supply a non-empty headers list");
    this.protocolSchedule = protocolSchedule;
    this.ethContext = ethContext;
    this.metricsSystem = metricsSystem;

    this.headers = headers;
    this.blocks =
        headers.stream()
            .filter(this::hasEmptyBody)
            .collect(
                toMap(
                    BlockHeader::getNumber,
                    header ->
                        new Block(
                            header,
                            createEmptyBodyBasedOnProtocolSchedule(protocolSchedule, header))));
  }

  @Nonnull
  private BlockBody createEmptyBodyBasedOnProtocolSchedule(
      final ProtocolSchedule protocolSchedule, final BlockHeader header) {
    return new BlockBody(
        Collections.emptyList(),
        Collections.emptyList(),
        isWithdrawalsEnabled(protocolSchedule, header)
            ? Optional.of(Collections.emptyList())
            : Optional.empty(),
        Optional.empty(),
        Optional.empty());
  }

  private boolean isWithdrawalsEnabled(
      final ProtocolSchedule protocolSchedule, final BlockHeader header) {
    return protocolSchedule.getByBlockHeader(header).getWithdrawalsProcessor().isPresent();
  }

  private boolean hasEmptyBody(final BlockHeader header) {
    return header.getOmmersHash().equals(Hash.EMPTY_LIST_HASH)
        && header.getTransactionsRoot().equals(Hash.EMPTY_TRIE_HASH)
        && header
            .getWithdrawalsRoot()
            .map(wsRoot -> wsRoot.equals(Hash.EMPTY_TRIE_HASH))
            .orElse(true);
  }

  public static CompleteBlocksTask forHeaders(
      final ProtocolSchedule protocolSchedule,
      final EthContext ethContext,
      final List<BlockHeader> headers,
      final int maxRetries,
      final MetricsSystem metricsSystem) {
    return new CompleteBlocksTask(protocolSchedule, ethContext, headers, maxRetries, metricsSystem);
  }

  public static CompleteBlocksTask forHeaders(
      final ProtocolSchedule protocolSchedule,
      final EthContext ethContext,
      final List<BlockHeader> headers,
      final MetricsSystem metricsSystem) {
    return new CompleteBlocksTask(
        protocolSchedule, ethContext, headers, DEFAULT_RETRIES, metricsSystem);
  }

  @Override
  protected CompletableFuture<List<Block>> executePeerTask(final Optional<EthPeer> assignedPeer) {
    return requestBodies(assignedPeer).thenCompose(this::processBodiesResult);
  }

  private CompletableFuture<List<Block>> requestBodies(final Optional<EthPeer> assignedPeer) {
    final List<BlockHeader> incompleteHeaders = incompleteHeaders();
    if (incompleteHeaders.isEmpty()) {
      return completedFuture(emptyList());
    }
    LOG.debug(
        "Requesting bodies to complete {} blocks, starting with {}.",
        incompleteHeaders.size(),
        incompleteHeaders.get(0).getNumber());
    return executeSubTask(
        () -> {
          final GetBodiesFromPeerTask task =
              GetBodiesFromPeerTask.forHeaders(
                  protocolSchedule, ethContext, incompleteHeaders, metricsSystem);
          assignedPeer.ifPresent(task::assignPeer);
          return task.run().thenApply(PeerTaskResult::getResult);
        });
  }

  private CompletableFuture<List<Block>> processBodiesResult(final List<Block> blocksResult) {
    blocksResult.forEach((block) -> blocks.put(block.getHeader().getNumber(), block));

    if (incompleteHeaders().isEmpty()) {
      result.complete(
          headers.stream().map(h -> blocks.get(h.getNumber())).collect(Collectors.toList()));
    }

    return completedFuture(blocksResult);
  }

  private List<BlockHeader> incompleteHeaders() {
    final List<BlockHeader> collectedHeaders =
        headers.stream()
            .filter(h -> blocks.get(h.getNumber()) == null)
            .collect(Collectors.toList());
    if (!collectedHeaders.isEmpty() && getRetryCount() > 1) {
      final int subSize = (int) Math.ceil((double) collectedHeaders.size() / getRetryCount());
      if (getRetryCount() > getMaxRetries()) {
        return collectedHeaders.subList(0, MIN_SIZE_INCOMPLETE_LIST);
      } else {
        return collectedHeaders.subList(0, subSize);
      }
    }

    return collectedHeaders;
  }
}