PivotBlockRetriever.java

/*
 * Copyright ConsenSys AG.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.ethereum.eth.sync.fastsync;

import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.ExceptionUtils;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This task attempts to find a non-controversial pivot block by confirming the pivot block number
 * with a minimum number of peers. If a particular pivot block cannot be confirmed, the pivot block
 * number is pushed back and the confirmation process repeats. If a maximum number of retries is
 * reached, the task fails with a {@code FastSyncException} containing {@code
 * FastSyncError.PIVOT_BLOCK_HEADER_MISMATCH}.
 */
public class PivotBlockRetriever {

  private static final Logger LOG = LoggerFactory.getLogger(PivotBlockRetriever.class);
  public static final int MAX_QUERY_RETRIES_PER_PEER = 4;
  private static final int DEFAULT_MAX_PIVOT_BLOCK_RESETS = 250;
  private static final int SUSPICIOUS_NUMBER_OF_RETRIES = 5;

  private final EthContext ethContext;
  private final MetricsSystem metricsSystem;
  private final ProtocolSchedule protocolSchedule;

  // The number of peers we need to query to confirm our pivot block
  private final int peersToQuery;
  // The max times to push the pivot block number back when peers can't agree on a pivot
  private final int maxPivotBlockResets;
  // How far to push back the pivot block when we retry on pivot disagreement
  private final long pivotBlockNumberResetDelta;

  // The current pivot block number, gets pushed back if peers disagree on the pivot block
  AtomicLong pivotBlockNumber;

  private final CompletableFuture<FastSyncState> result = new CompletableFuture<>();
  private final Map<Long, PivotBlockConfirmer> confirmationTasks = new ConcurrentHashMap<>();

  private final AtomicBoolean isStarted = new AtomicBoolean(false);

  PivotBlockRetriever(
      final ProtocolSchedule protocolSchedule,
      final EthContext ethContext,
      final MetricsSystem metricsSystem,
      final long pivotBlockNumber,
      final int peersToQuery,
      final long pivotBlockNumberResetDelta,
      final int maxPivotBlockResets) {
    this.protocolSchedule = protocolSchedule;
    this.ethContext = ethContext;
    this.metricsSystem = metricsSystem;
    this.pivotBlockNumber = new AtomicLong(pivotBlockNumber);
    this.peersToQuery = peersToQuery;
    this.pivotBlockNumberResetDelta = pivotBlockNumberResetDelta;
    this.maxPivotBlockResets = maxPivotBlockResets;
  }

  public PivotBlockRetriever(
      final ProtocolSchedule protocolSchedule,
      final EthContext ethContext,
      final MetricsSystem metricsSystem,
      final long pivotBlockNumber,
      final int peersToQuery,
      final long pivotBlockNumberResetDelta) {
    this(
        protocolSchedule,
        ethContext,
        metricsSystem,
        pivotBlockNumber,
        peersToQuery,
        pivotBlockNumberResetDelta,
        DEFAULT_MAX_PIVOT_BLOCK_RESETS);
  }

  public CompletableFuture<FastSyncState> downloadPivotBlockHeader() {
    if (isStarted.compareAndSet(false, true)) {
      LOG.info("Retrieve a pivot block that can be confirmed by at least {} peers.", peersToQuery);
      confirmBlock(pivotBlockNumber.get());
    }

    return result;
  }

  private void confirmBlock(final long blockNumber) {
    final PivotBlockConfirmer pivotBlockConfirmationTask =
        new PivotBlockConfirmer(
            protocolSchedule,
            ethContext,
            metricsSystem,
            pivotBlockNumber.get(),
            peersToQuery,
            MAX_QUERY_RETRIES_PER_PEER);
    final PivotBlockConfirmer preexistingTask =
        confirmationTasks.putIfAbsent(blockNumber, pivotBlockConfirmationTask);
    if (preexistingTask != null) {
      // We already set up a task to confirm this block
      return;
    }

    pivotBlockConfirmationTask.confirmPivotBlock().whenComplete(this::handleConfirmationResult);
  }

  private void handleConfirmationResult(final FastSyncState fastSyncState, final Throwable error) {
    if (error != null) {
      final Throwable rootCause = ExceptionUtils.rootCause(error);
      if (rootCause instanceof PivotBlockConfirmer.ContestedPivotBlockException) {
        final long blockNumber =
            ((PivotBlockConfirmer.ContestedPivotBlockException) error).getBlockNumber();
        handleContestedPivotBlock(blockNumber);
      } else {
        LOG.error("Encountered error while requesting pivot block header", error);
        result.completeExceptionally(error);
      }
      return;
    }

    result.complete(fastSyncState);
  }

  private void handleContestedPivotBlock(final long contestedBlockNumber) {
    if (pivotBlockNumber.compareAndSet(
        contestedBlockNumber, contestedBlockNumber - pivotBlockNumberResetDelta)) {
      LOG.info("Received conflicting pivot blocks for {}.", contestedBlockNumber);

      final int retryCount = confirmationTasks.size();

      if ((retryCount % SUSPICIOUS_NUMBER_OF_RETRIES) == 0) {
        LOG.warn("{} attempts have failed to find a fast sync pivot block", retryCount);
      }

      if (retryCount > maxPivotBlockResets
          || pivotBlockNumber.get() <= BlockHeader.GENESIS_BLOCK_NUMBER) {
        LOG.info("Max retries reached, cancel pivot block download.");
        // Pivot block selection has failed
        result.completeExceptionally(new SyncException(SyncError.PIVOT_BLOCK_HEADER_MISMATCH));
        return;
      } else {
        LOG.info("Move pivot block back to {} and retry.", pivotBlockNumber);
      }

      confirmBlock(pivotBlockNumber.get());
    }
  }
}