BackwardSyncStep.java

/*
 * Copyright Hyperledger Besu Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.ethereum.eth.sync.backwardsync;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetHeadersEndingAtFromPeerByHashTask;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BackwardSyncStep {
  private static final Logger LOG = LoggerFactory.getLogger(BackwardSyncStep.class);
  private final BackwardSyncContext context;
  private final BackwardChain backwardChain;

  public BackwardSyncStep(final BackwardSyncContext context, final BackwardChain backwardChain) {
    this.context = context;
    this.backwardChain = backwardChain;
  }

  public CompletableFuture<Void> executeAsync(final BlockHeader firstHeader) {
    return CompletableFuture.supplyAsync(() -> firstHeader)
        .thenApply(this::possibleRestoreOldNodes)
        .thenCompose(this::requestHeaders)
        .thenApply(this::saveHeaders);
  }

  @VisibleForTesting
  protected Hash possibleRestoreOldNodes(final BlockHeader firstAncestor) {
    Hash lastHash = firstAncestor.getParentHash();
    Optional<BlockHeader> iterator = backwardChain.getHeader(lastHash);
    while (iterator.isPresent()) {
      backwardChain.prependAncestorsHeader(iterator.get(), true);
      lastHash = iterator.get().getParentHash();
      iterator = backwardChain.getHeader(lastHash);
    }
    return lastHash;
  }

  @VisibleForTesting
  protected CompletableFuture<List<BlockHeader>> requestHeaders(final Hash hash) {
    final Optional<BlockHeader> blockHeader =
        context.getProtocolContext().getBlockchain().getBlockHeader(hash);
    if (blockHeader.isPresent()) {
      LOG.debug(
          "Hash {} already present in local blockchain no need to request headers from peers",
          hash);
      return CompletableFuture.completedFuture(List.of(blockHeader.get()));
    }

    final int batchSize = context.getBatchSize();
    LOG.trace("Requesting headers for hash {}, with batch size {}", hash, batchSize);

    final RetryingGetHeadersEndingAtFromPeerByHashTask
        retryingGetHeadersEndingAtFromPeerByHashTask =
            RetryingGetHeadersEndingAtFromPeerByHashTask.endingAtHash(
                context.getProtocolSchedule(),
                context.getEthContext(),
                hash,
                0,
                batchSize,
                context.getMetricsSystem(),
                context.getEthContext().getEthPeers().peerCount());
    return context
        .getEthContext()
        .getScheduler()
        .scheduleSyncWorkerTask(retryingGetHeadersEndingAtFromPeerByHashTask::run)
        .thenApply(
            blockHeaders -> {
              LOG.atDebug()
                  .setMessage("Got headers {} -> {}")
                  .addArgument(blockHeaders.get(0)::getNumber)
                  .addArgument(blockHeaders.get(blockHeaders.size() - 1)::getNumber)
                  .log();
              return blockHeaders;
            });
  }

  @VisibleForTesting
  protected Void saveHeader(final BlockHeader blockHeader) {
    backwardChain.prependAncestorsHeader(blockHeader);
    return null;
  }

  @VisibleForTesting
  protected Void saveHeaders(final List<BlockHeader> blockHeaders) {
    for (BlockHeader blockHeader : blockHeaders) {
      saveHeader(blockHeader);
    }

    if (!blockHeaders.isEmpty()) {
      logProgress(blockHeaders.get(blockHeaders.size() - 1).getNumber());
    }
    return null;
  }

  private void logProgress(final long currLowestDownloadedHeight) {
    final long targetHeight = context.getStatus().getTargetChainHeight();
    final long initialHeight = context.getStatus().getInitialChainHeight();
    final long estimatedTotal = targetHeight - initialHeight;
    final long downloaded = targetHeight - currLowestDownloadedHeight;

    final float completedPercentage = 100.0f * downloaded / estimatedTotal;

    if (completedPercentage < 100.0f) {
      if (context.getStatus().progressLogDue()) {
        LOG.info(
            String.format(
                "Backward sync phase 1 of 2, %.2f%% completed, downloaded %d headers of at least %d. Peers: %d",
                completedPercentage,
                downloaded,
                estimatedTotal,
                context.getEthContext().getEthPeers().peerCount()));
      }
    } else {
      LOG.info(
          String.format(
              "Backward sync phase 1 of 2 completed, downloaded a total of %d headers. Peers: %d",
              downloaded, context.getEthContext().getEthPeers().peerCount()));
    }
  }
}