GetReceiptsForHeadersTask.java

/*
 * Copyright ConsenSys AG.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.ethereum.eth.sync.tasks;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.GetReceiptsFromPeerTask;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Given a set of headers, repeatedly requests the receipts for those blocks. */
public class GetReceiptsForHeadersTask
    extends AbstractRetryingPeerTask<Map<BlockHeader, List<TransactionReceipt>>> {
  private static final Logger LOG = LoggerFactory.getLogger(GetReceiptsForHeadersTask.class);
  private static final int DEFAULT_RETRIES = 4;

  private final EthContext ethContext;

  private final List<BlockHeader> headers;
  private final Map<BlockHeader, List<TransactionReceipt>> receipts;
  private final MetricsSystem metricsSystem;

  private GetReceiptsForHeadersTask(
      final EthContext ethContext,
      final List<BlockHeader> headers,
      final int maxRetries,
      final MetricsSystem metricsSystem) {
    super(ethContext, maxRetries, Map::isEmpty, metricsSystem);
    checkArgument(headers.size() > 0, "Must supply a non-empty headers list");
    this.ethContext = ethContext;
    this.metricsSystem = metricsSystem;

    this.headers = headers;
    this.receipts = new HashMap<>();
    completeEmptyReceipts(headers);
  }

  public static GetReceiptsForHeadersTask forHeaders(
      final EthContext ethContext,
      final List<BlockHeader> headers,
      final int maxRetries,
      final MetricsSystem metricsSystem) {
    return new GetReceiptsForHeadersTask(ethContext, headers, maxRetries, metricsSystem);
  }

  public static GetReceiptsForHeadersTask forHeaders(
      final EthContext ethContext,
      final List<BlockHeader> headers,
      final MetricsSystem metricsSystem) {
    return new GetReceiptsForHeadersTask(ethContext, headers, DEFAULT_RETRIES, metricsSystem);
  }

  private void completeEmptyReceipts(final List<BlockHeader> headers) {
    headers.stream()
        .filter(header -> header.getReceiptsRoot().equals(Hash.EMPTY_TRIE_HASH))
        .forEach(header -> receipts.put(header, emptyList()));
  }

  @Override
  protected CompletableFuture<Map<BlockHeader, List<TransactionReceipt>>> executePeerTask(
      final Optional<EthPeer> assignedPeer) {
    return requestReceipts(assignedPeer).thenCompose(this::processResponse);
  }

  private CompletableFuture<Map<BlockHeader, List<TransactionReceipt>>> requestReceipts(
      final Optional<EthPeer> assignedPeer) {
    final List<BlockHeader> incompleteHeaders = incompleteHeaders();
    if (incompleteHeaders.isEmpty()) {
      return CompletableFuture.completedFuture(emptyMap());
    }
    LOG.debug(
        "Requesting bodies to complete {} blocks, starting with {}.",
        incompleteHeaders.size(),
        incompleteHeaders.get(0).getNumber());
    return executeSubTask(
        () -> {
          final GetReceiptsFromPeerTask task =
              GetReceiptsFromPeerTask.forHeaders(ethContext, incompleteHeaders, metricsSystem);
          assignedPeer.ifPresent(task::assignPeer);
          return task.run().thenApply(PeerTaskResult::getResult);
        });
  }

  private CompletableFuture<Map<BlockHeader, List<TransactionReceipt>>> processResponse(
      final Map<BlockHeader, List<TransactionReceipt>> responseData) {
    receipts.putAll(responseData);

    if (isComplete()) {
      result.complete(receipts);
    }

    return CompletableFuture.completedFuture(responseData);
  }

  private List<BlockHeader> incompleteHeaders() {
    return headers.stream().filter(h -> receipts.get(h) == null).collect(Collectors.toList());
  }

  private boolean isComplete() {
    return headers.stream().allMatch(header -> receipts.get(header) != null);
  }
}