RlpBlockImporter.java

/*
 * Copyright ConsenSys AG.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.chainimport;

import static java.util.concurrent.TimeUnit.SECONDS;

import org.hyperledger.besu.controller.BesuController;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderFunctions;
import org.hyperledger.besu.ethereum.core.BlockImporter;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.mainnet.BlockHeaderValidator;
import org.hyperledger.besu.ethereum.mainnet.BlockImportResult;
import org.hyperledger.besu.ethereum.mainnet.HeaderValidationMode;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.util.RawBlockIterator;

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import com.google.common.base.MoreObjects;
import com.google.common.base.Stopwatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Tool for importing rlp-encoded block data from files. */
public class RlpBlockImporter implements Closeable {
  private static final Logger LOG = LoggerFactory.getLogger(RlpBlockImporter.class);

  private final Semaphore blockBacklog = new Semaphore(2);

  private final ExecutorService validationExecutor = Executors.newCachedThreadPool();
  private final ExecutorService importExecutor = Executors.newSingleThreadExecutor();

  private long cumulativeGas;
  private long segmentGas;
  private final Stopwatch cumulativeTimer = Stopwatch.createUnstarted();
  private final Stopwatch segmentTimer = Stopwatch.createUnstarted();
  private static final long SEGMENT_SIZE = 1000;

  /**
   * Imports blocks that are stored as concatenated RLP sections in the given file into Besu's block
   * storage.
   *
   * @param blocks Path to the file containing the blocks
   * @param besuController the BesuController that defines blockchain behavior
   * @param skipPowValidation Skip proof of work validation (correct mix hash and difficulty)
   * @return the import result
   * @throws IOException On Failure
   */
  public RlpBlockImporter.ImportResult importBlockchain(
      final Path blocks, final BesuController besuController, final boolean skipPowValidation)
      throws IOException {
    return importBlockchain(blocks, besuController, skipPowValidation, 0L, Long.MAX_VALUE);
  }

  /**
   * Import blockchain.
   *
   * @param blocks the blocks
   * @param besuController the besu controller
   * @param skipPowValidation the skip pow validation
   * @param startBlock the start block
   * @param endBlock the end block
   * @return the rlp block importer - import result
   * @throws IOException the io exception
   */
  public RlpBlockImporter.ImportResult importBlockchain(
      final Path blocks,
      final BesuController besuController,
      final boolean skipPowValidation,
      final long startBlock,
      final long endBlock)
      throws IOException {
    final ProtocolSchedule protocolSchedule = besuController.getProtocolSchedule();
    final ProtocolContext context = besuController.getProtocolContext();
    final MutableBlockchain blockchain = context.getBlockchain();
    int count = 0;
    final BlockHeaderFunctions blockHeaderFunctions =
        ScheduleBasedBlockHeaderFunctions.create(protocolSchedule);
    try (final RawBlockIterator iterator = new RawBlockIterator(blocks, blockHeaderFunctions)) {
      BlockHeader previousHeader = null;
      CompletableFuture<Void> previousBlockFuture = null;
      final AtomicReference<Throwable> threadedException = new AtomicReference<>();
      while (iterator.hasNext()) {
        final Block block = iterator.next();
        final BlockHeader header = block.getHeader();
        final long blockNumber = header.getNumber();
        if (blockNumber == BlockHeader.GENESIS_BLOCK_NUMBER
            || blockNumber < startBlock
            || blockNumber >= endBlock) {
          continue;
        }
        if (blockchain.contains(header.getHash())) {
          continue;
        }
        if (previousHeader == null) {
          previousHeader = lookupPreviousHeader(blockchain, header);
        }
        final ProtocolSpec protocolSpec = protocolSchedule.getByBlockHeader(header);
        final BlockHeader lastHeader = previousHeader;

        final CompletableFuture<Void> validationFuture =
            CompletableFuture.runAsync(
                () -> validateBlock(protocolSpec, context, lastHeader, header, skipPowValidation),
                validationExecutor);

        final CompletableFuture<Void> extractingFuture =
            CompletableFuture.runAsync(() -> extractSignatures(block));

        final CompletableFuture<Void> calculationFutures;
        if (previousBlockFuture == null) {
          calculationFutures = extractingFuture;
        } else {
          calculationFutures = CompletableFuture.allOf(extractingFuture, previousBlockFuture);
        }

        try {
          do {
            final Throwable t = (Exception) threadedException.get();
            if (t != null) {
              throw new RuntimeException("Error importing block " + header.getNumber(), t);
            }
          } while (!blockBacklog.tryAcquire(1, SECONDS));
        } catch (final InterruptedException e) {
          LOG.error("Interrupted adding to backlog.", e);
          break;
        }
        previousBlockFuture =
            validationFuture.runAfterBothAsync(
                calculationFutures,
                () ->
                    evaluateBlock(
                        context,
                        block,
                        header,
                        protocolSchedule.getByBlockHeader(header),
                        skipPowValidation),
                importExecutor);
        previousBlockFuture.exceptionally(
            exception -> {
              threadedException.set(exception);
              return null;
            });

        ++count;
        previousHeader = header;
      }
      if (previousBlockFuture != null) {
        previousBlockFuture.join();
      }
      logProgress(blockchain.getChainHeadBlockNumber());
      return new RlpBlockImporter.ImportResult(
          blockchain.getChainHead().getTotalDifficulty(), count);
    }
  }

  private void extractSignatures(final Block block) {
    final List<CompletableFuture<Void>> futures =
        new ArrayList<>(block.getBody().getTransactions().size());
    for (final Transaction tx : block.getBody().getTransactions()) {
      futures.add(CompletableFuture.runAsync(tx::getSender, validationExecutor));
    }
    for (final CompletableFuture<Void> future : futures) {
      future.join();
    }
  }

  private void validateBlock(
      final ProtocolSpec protocolSpec,
      final ProtocolContext context,
      final BlockHeader previousHeader,
      final BlockHeader header,
      final boolean skipPowValidation) {
    final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator();
    final boolean validHeader =
        blockHeaderValidator.validateHeader(
            header,
            previousHeader,
            context,
            skipPowValidation
                ? HeaderValidationMode.LIGHT_DETACHED_ONLY
                : HeaderValidationMode.DETACHED_ONLY);
    if (!validHeader) {
      throw new IllegalStateException("Invalid header at block number " + header.getNumber() + ".");
    }
  }

  private void evaluateBlock(
      final ProtocolContext context,
      final Block block,
      final BlockHeader header,
      final ProtocolSpec protocolSpec,
      final boolean skipPowValidation) {
    try {
      cumulativeTimer.start();
      segmentTimer.start();
      final BlockImporter blockImporter = protocolSpec.getBlockImporter();
      final BlockImportResult blockImported =
          blockImporter.importBlock(
              context,
              block,
              skipPowValidation
                  ? HeaderValidationMode.LIGHT_SKIP_DETACHED
                  : HeaderValidationMode.SKIP_DETACHED,
              skipPowValidation ? HeaderValidationMode.LIGHT : HeaderValidationMode.FULL);
      if (!blockImported.isImported()) {
        throw new IllegalStateException(
            "Invalid block at block number " + header.getNumber() + ".");
      }
    } finally {
      blockBacklog.release();
      cumulativeTimer.stop();
      segmentTimer.stop();
      final long thisGas = block.getHeader().getGasUsed();
      cumulativeGas += thisGas;
      segmentGas += thisGas;
      if (header.getNumber() % SEGMENT_SIZE == 0) {
        logProgress(header.getNumber());
      }
    }
  }

  private void logProgress(final long blockNum) {
    final long elapseMicros = segmentTimer.elapsed(TimeUnit.MICROSECONDS);
    //noinspection PlaceholderCountMatchesArgumentCount
    LOG.info(
        "Import at block {} / {} gas {} micros / Mgps {} segment {} cumulative",
        blockNum,
        segmentGas,
        elapseMicros,
        segmentGas / (double) elapseMicros,
        cumulativeGas / (double) cumulativeTimer.elapsed(TimeUnit.MICROSECONDS));
    segmentGas = 0;
    segmentTimer.reset();
  }

  private BlockHeader lookupPreviousHeader(
      final MutableBlockchain blockchain, final BlockHeader header) {
    return blockchain
        .getBlockHeader(header.getParentHash())
        .orElseThrow(
            () ->
                new IllegalStateException(
                    String.format(
                        "Block %s does not connect to the existing chain. Current chain head %s",
                        header.getNumber(), blockchain.getChainHeadBlockNumber())));
  }

  @Override
  public void close() {
    validationExecutor.shutdownNow();
    try {
      //noinspection ResultOfMethodCallIgnored
      validationExecutor.awaitTermination(5, SECONDS);
    } catch (final Exception e) {
      LOG.error("Error shutting down validatorExecutor.", e);
    }

    importExecutor.shutdownNow();
    try {
      //noinspection ResultOfMethodCallIgnored
      importExecutor.awaitTermination(5, SECONDS);
    } catch (final Exception e) {
      LOG.error("Error shutting down importExecutor", e);
    }
  }

  /** The Import result. */
  public static final class ImportResult {

    /** The difficulty. */
    public final Difficulty td;

    /** The Count. */
    final int count;

    /**
     * Instantiates a new Import result.
     *
     * @param td the td
     * @param count the count
     */
    ImportResult(final Difficulty td, final int count) {
      this.td = td;
      this.count = count;
    }

    @Override
    public String toString() {
      return MoreObjects.toStringHelper(this).add("td", td).add("count", count).toString();
    }
  }
}