SnapWorldDownloadState.java

/*
 * Copyright contributors to Hyperledger Besu
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.ethereum.eth.sync.snapsync;

import static org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest.createAccountFlatHealingRangeRequest;
import static org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest.createAccountTrieNodeDataRequest;
import static org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator.applyForStrategy;

import org.hyperledger.besu.ethereum.chain.BlockAddedObserver;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.context.SnapSyncStatePersistenceManager;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.AccountRangeDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.BytecodeRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.StorageRangeDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.AccountFlatDatabaseHealingRangeRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.StorageFlatDatabaseHealingRangeRequest;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState;
import org.hyperledger.besu.ethereum.trie.RangeManager;
import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.worldstate.FlatDbMode;
import org.hyperledger.besu.ethereum.worldstate.WorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.storage.DataStorageFormat;
import org.hyperledger.besu.services.tasks.InMemoryTaskQueue;
import org.hyperledger.besu.services.tasks.InMemoryTasksPriorityQueues;
import org.hyperledger.besu.services.tasks.Task;
import org.hyperledger.besu.services.tasks.TaskCollection;

import java.time.Clock;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;

import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest> {

  private static final Logger LOG = LoggerFactory.getLogger(SnapWorldDownloadState.class);

  protected final InMemoryTaskQueue<SnapDataRequest> pendingAccountRequests =
      new InMemoryTaskQueue<>();
  protected final InMemoryTaskQueue<SnapDataRequest> pendingStorageRequests =
      new InMemoryTaskQueue<>();
  protected final InMemoryTaskQueue<SnapDataRequest> pendingLargeStorageRequests =
      new InMemoryTaskQueue<>();
  protected final InMemoryTaskQueue<SnapDataRequest> pendingCodeRequests =
      new InMemoryTaskQueue<>();
  protected final InMemoryTasksPriorityQueues<SnapDataRequest> pendingTrieNodeRequests =
      new InMemoryTasksPriorityQueues<>();

  protected final InMemoryTasksPriorityQueues<SnapDataRequest>
      pendingAccountFlatDatabaseHealingRequests = new InMemoryTasksPriorityQueues<>();

  protected final InMemoryTasksPriorityQueues<SnapDataRequest>
      pendingStorageFlatDatabaseHealingRequests = new InMemoryTasksPriorityQueues<>();
  private Set<Bytes> accountsHealingList = new HashSet<>();
  private DynamicPivotBlockSelector pivotBlockSelector;

  private final SnapSyncStatePersistenceManager snapContext;
  private final SnapSyncProcessState snapSyncState;

  // blockchain
  private final Blockchain blockchain;
  private OptionalLong blockObserverId;

  // metrics around the snapsync
  private final SnapSyncMetricsManager metricsManager;

  public SnapWorldDownloadState(
      final WorldStateStorageCoordinator worldStateStorageCoordinator,
      final SnapSyncStatePersistenceManager snapContext,
      final Blockchain blockchain,
      final SnapSyncProcessState snapSyncState,
      final InMemoryTasksPriorityQueues<SnapDataRequest> pendingRequests,
      final int maxRequestsWithoutProgress,
      final long minMillisBeforeStalling,
      final SnapSyncMetricsManager metricsManager,
      final Clock clock) {
    super(
        worldStateStorageCoordinator,
        pendingRequests,
        maxRequestsWithoutProgress,
        minMillisBeforeStalling,
        clock);
    this.snapContext = snapContext;
    this.blockchain = blockchain;
    this.snapSyncState = snapSyncState;
    this.metricsManager = metricsManager;
    this.blockObserverId = OptionalLong.empty();
    metricsManager
        .getMetricsSystem()
        .createLongGauge(
            BesuMetricCategory.SYNCHRONIZER,
            "snap_world_state_pending_account_requests_current",
            "Number of account pending requests for snap sync world state download",
            pendingAccountRequests::size);
    metricsManager
        .getMetricsSystem()
        .createLongGauge(
            BesuMetricCategory.SYNCHRONIZER,
            "snap_world_state_pending_storage_requests_current",
            "Number of storage pending requests for snap sync world state download",
            pendingStorageRequests::size);
    metricsManager
        .getMetricsSystem()
        .createLongGauge(
            BesuMetricCategory.SYNCHRONIZER,
            "snap_world_state_pending_big_storage_requests_current",
            "Number of storage pending requests for snap sync world state download",
            pendingLargeStorageRequests::size);
    metricsManager
        .getMetricsSystem()
        .createLongGauge(
            BesuMetricCategory.SYNCHRONIZER,
            "snap_world_state_pending_code_requests_current",
            "Number of code pending requests for snap sync world state download",
            pendingCodeRequests::size);
    metricsManager
        .getMetricsSystem()
        .createLongGauge(
            BesuMetricCategory.SYNCHRONIZER,
            "snap_world_state_pending_trie_node_requests_current",
            "Number of trie node pending requests for snap sync world state download",
            pendingTrieNodeRequests::size);
  }

  @Override
  public synchronized void notifyTaskAvailable() {
    notifyAll();
  }

  @Override
  protected synchronized void markAsStalled(final int maxNodeRequestRetries) {
    // TODO retry when mark as stalled
  }

  @Override
  public synchronized boolean checkCompletion(final BlockHeader header) {

    // Check if all snapsync tasks are completed
    if (!internalFuture.isDone()
        && pendingAccountRequests.allTasksCompleted()
        && pendingCodeRequests.allTasksCompleted()
        && pendingStorageRequests.allTasksCompleted()
        && pendingLargeStorageRequests.allTasksCompleted()
        && pendingTrieNodeRequests.allTasksCompleted()
        && pendingAccountFlatDatabaseHealingRequests.allTasksCompleted()
        && pendingStorageFlatDatabaseHealingRequests.allTasksCompleted()) {

      // if all snapsync tasks are completed and the healing process was not running
      if (!snapSyncState.isHealTrieInProgress()) {
        // Register blockchain observer if not already registered
        blockObserverId =
            blockObserverId.isEmpty()
                ? OptionalLong.of(blockchain.observeBlockAdded(createBlockchainObserver()))
                : blockObserverId;
        // Start the healing process
        startTrieHeal();
      }
      // if all snapsync tasks are completed and the healing was running and blockchain is behind
      // the pivot block
      else if (pivotBlockSelector.isBlockchainBehind()) {
        LOG.info("Pausing world state download while waiting for sync to complete");
        // Set the snapsync to wait for the blockchain to catch up
        snapSyncState.setWaitingBlockchain(true);
      }
      // if all snapsync tasks are completed and the healing was running and the blockchain is not
      // behind the pivot block
      else {
        // Remove the blockchain observer
        blockObserverId.ifPresent(blockchain::removeObserver);
        // If the flat database healing process is not in progress and the flat database mode is
        // FULL
        if (!snapSyncState.isHealFlatDatabaseInProgress()
            && worldStateStorageCoordinator.isMatchingFlatMode(FlatDbMode.FULL)) {
          startFlatDatabaseHeal(header);
        }
        // If the flat database healing process is in progress or the flat database mode is not FULL
        else {
          final WorldStateKeyValueStorage.Updater updater = worldStateStorageCoordinator.updater();
          applyForStrategy(
              updater,
              onBonsai -> {
                onBonsai.saveWorldState(header.getHash(), header.getStateRoot(), rootNodeData);
              },
              onForest -> {
                onForest.saveWorldState(header.getStateRoot(), rootNodeData);
              });
          updater.commit();

          // Notify that the snap sync has completed
          metricsManager.notifySnapSyncCompleted();
          // Clear the snap context
          snapContext.clear();
          internalFuture.complete(null);
          return true;
        }
      }
    }
    return false;
  }

  @Override
  protected synchronized void cleanupQueues() {
    super.cleanupQueues();
    pendingAccountRequests.clear();
    pendingStorageRequests.clear();
    pendingLargeStorageRequests.clear();
    pendingCodeRequests.clear();
    pendingTrieNodeRequests.clear();
  }

  /** Method to start the healing process of the trie */
  public synchronized void startTrieHeal() {
    snapContext.clearAccountRangeTasks();
    snapSyncState.setHealTrieStatus(true);
    // Try to find a new pivot block before starting the healing process
    pivotBlockSelector.switchToNewPivotBlock(
        (blockHeader, newPivotBlockFound) -> {
          snapContext.clearAccountRangeTasks();
          LOG.info(
              "Running world state heal process from peers with pivot block {}",
              blockHeader.getNumber());
          enqueueRequest(
              createAccountTrieNodeDataRequest(
                  blockHeader.getStateRoot(), Bytes.EMPTY, accountsHealingList));
        });
  }

  /** Method to reload the healing process of the trie */
  public synchronized void reloadTrieHeal() {
    // Clear the flat database and trie log from the world state storage if needed
    worldStateStorageCoordinator.applyOnMatchingStrategy(
        DataStorageFormat.BONSAI,
        worldStateKeyValueStorage -> {
          final BonsaiWorldStateKeyValueStorage strategy =
              worldStateStorageCoordinator.getStrategy(BonsaiWorldStateKeyValueStorage.class);
          strategy.clearFlatDatabase();
          strategy.clearTrieLog();
        });
    // Clear pending trie node and code requests
    pendingTrieNodeRequests.clear();
    pendingCodeRequests.clear();

    snapSyncState.setHealTrieStatus(false);
    checkCompletion(snapSyncState.getPivotBlockHeader().orElseThrow());
  }

  public synchronized void startFlatDatabaseHeal(final BlockHeader header) {
    LOG.info("Initiating the healing process for the flat database");
    snapSyncState.setHealFlatDatabaseInProgress(true);
    final Map<Bytes32, Bytes32> ranges = RangeManager.generateAllRanges(16);
    ranges.forEach(
        (key, value) ->
            enqueueRequest(
                createAccountFlatHealingRangeRequest(header.getStateRoot(), key, value)));
  }

  @Override
  public synchronized void enqueueRequest(final SnapDataRequest request) {
    if (!internalFuture.isDone()) {
      if (request instanceof BytecodeRequest) {
        pendingCodeRequests.add(request);
      } else if (request instanceof StorageRangeDataRequest) {
        if (!((StorageRangeDataRequest) request).getStartKeyHash().equals(RangeManager.MIN_RANGE)) {
          pendingLargeStorageRequests.add(request);
        } else {
          pendingStorageRequests.add(request);
        }
      } else if (request instanceof AccountRangeDataRequest) {
        pendingAccountRequests.add(request);
      } else if (request instanceof AccountFlatDatabaseHealingRangeRequest) {
        pendingAccountFlatDatabaseHealingRequests.add(request);
      } else if (request instanceof StorageFlatDatabaseHealingRangeRequest) {
        pendingStorageFlatDatabaseHealingRequests.add(request);
      } else {
        pendingTrieNodeRequests.add(request);
      }
      notifyAll();
    }
  }

  public synchronized void setAccountsHealingList(final Set<Bytes> addAccountToHealingList) {
    this.accountsHealingList = addAccountToHealingList;
  }

  /**
   * Adds an account to the list of accounts to be repaired during the healing process. If the
   * account is not already in the list, it is added to both the snap context and the internal set
   * of accounts to be repaired.
   *
   * @param account The account to be added for repair.
   */
  public synchronized void addAccountToHealingList(final Bytes account) {
    if (!accountsHealingList.contains(account)) {
      snapContext.addAccountToHealingList(account);
      accountsHealingList.add(account);
    }
  }

  public Set<Bytes> getAccountsHealingList() {
    return accountsHealingList;
  }

  @Override
  public synchronized void enqueueRequests(final Stream<SnapDataRequest> requests) {
    if (!internalFuture.isDone()) {
      requests.forEach(this::enqueueRequest);
    }
  }

  public synchronized Task<SnapDataRequest> dequeueRequestBlocking(
      final List<TaskCollection<SnapDataRequest>> queueDependencies,
      final TaskCollection<SnapDataRequest> queue,
      final Consumer<Void> unBlocked) {
    boolean isWaiting = false;
    while (!internalFuture.isDone()) {
      while (queueDependencies.stream()
          .map(TaskCollection::allTasksCompleted)
          .anyMatch(Predicate.isEqual(false))) {
        try {
          isWaiting = true;
          wait();
        } catch (final InterruptedException e) {
          Thread.currentThread().interrupt();
          return null;
        }
      }
      if (isWaiting) {
        unBlocked.accept(null);
      }
      isWaiting = false;
      Task<SnapDataRequest> task = queue.remove();
      if (task != null) {
        return task;
      }

      try {
        wait();
      } catch (final InterruptedException e) {
        Thread.currentThread().interrupt();
        return null;
      }
    }
    return null;
  }

  public synchronized Task<SnapDataRequest> dequeueAccountRequestBlocking() {
    return dequeueRequestBlocking(
        List.of(pendingStorageRequests, pendingLargeStorageRequests, pendingCodeRequests),
        pendingAccountRequests,
        unused -> snapContext.updatePersistedTasks(pendingAccountRequests.asList()));
  }

  public synchronized Task<SnapDataRequest> dequeueLargeStorageRequestBlocking() {
    return dequeueRequestBlocking(Collections.emptyList(), pendingLargeStorageRequests, __ -> {});
  }

  public synchronized Task<SnapDataRequest> dequeueStorageRequestBlocking() {
    return dequeueRequestBlocking(Collections.emptyList(), pendingStorageRequests, __ -> {});
  }

  public synchronized Task<SnapDataRequest> dequeueCodeRequestBlocking() {
    return dequeueRequestBlocking(List.of(pendingStorageRequests), pendingCodeRequests, __ -> {});
  }

  public synchronized Task<SnapDataRequest> dequeueTrieNodeRequestBlocking() {
    return dequeueRequestBlocking(
        List.of(pendingAccountRequests, pendingStorageRequests, pendingLargeStorageRequests),
        pendingTrieNodeRequests,
        __ -> {});
  }

  public synchronized Task<SnapDataRequest> dequeueAccountFlatDatabaseHealingRequestBlocking() {
    return dequeueRequestBlocking(
        List.of(
            pendingAccountRequests,
            pendingStorageRequests,
            pendingLargeStorageRequests,
            pendingTrieNodeRequests,
            pendingStorageFlatDatabaseHealingRequests),
        pendingAccountFlatDatabaseHealingRequests,
        __ -> {});
  }

  public synchronized Task<SnapDataRequest> dequeueStorageFlatDatabaseHealingRequestBlocking() {
    return dequeueRequestBlocking(
        List.of(
            pendingAccountRequests,
            pendingStorageRequests,
            pendingLargeStorageRequests,
            pendingTrieNodeRequests),
        pendingStorageFlatDatabaseHealingRequests,
        __ -> {});
  }

  public SnapSyncMetricsManager getMetricsManager() {
    return metricsManager;
  }

  public void setPivotBlockSelector(final DynamicPivotBlockSelector pivotBlockSelector) {
    this.pivotBlockSelector = pivotBlockSelector;
  }

  public BlockAddedObserver createBlockchainObserver() {
    return addedBlockContext -> {
      final AtomicBoolean foundNewPivotBlock = new AtomicBoolean(false);
      pivotBlockSelector.check(
          (____, isNewPivotBlock) -> {
            if (isNewPivotBlock) {
              foundNewPivotBlock.set(true);
            }
          });

      final boolean isNewPivotBlockFound = foundNewPivotBlock.get();
      final boolean isBlockchainCaughtUp =
          snapSyncState.isWaitingBlockchain() && !pivotBlockSelector.isBlockchainBehind();

      if (isNewPivotBlockFound
          || isBlockchainCaughtUp) { // restart heal if we found a new pivot block or if close to
        // head again
        snapSyncState.setWaitingBlockchain(false);
        reloadTrieHeal();
      }
    };
  }
}