SnapWorldStateDownloader.java
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync;
import static org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncMetricsManager.Step.DOWNLOAD;
import static org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest.createAccountRangeDataRequest;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.context.SnapSyncStatePersistenceManager;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.AccountRangeDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader;
import org.hyperledger.besu.ethereum.trie.RangeManager;
import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.storage.DataStorageFormat;
import org.hyperledger.besu.services.tasks.InMemoryTasksPriorityQueues;
import java.time.Clock;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.IntSupplier;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SnapWorldStateDownloader implements WorldStateDownloader {
private static final Logger LOG = LoggerFactory.getLogger(SnapWorldStateDownloader.class);
private final long minMillisBeforeStalling;
private final Clock clock;
private final MetricsSystem metricsSystem;
private final EthContext ethContext;
private final SnapSyncStatePersistenceManager snapContext;
private final InMemoryTasksPriorityQueues<SnapDataRequest> snapTaskCollection;
private final SnapSyncConfiguration snapSyncConfiguration;
private final int maxOutstandingRequests;
private final int maxNodeRequestsWithoutProgress;
private final ProtocolContext protocolContext;
private final WorldStateStorageCoordinator worldStateStorageCoordinator;
private final AtomicReference<SnapWorldDownloadState> downloadState = new AtomicReference<>();
public SnapWorldStateDownloader(
final EthContext ethContext,
final SnapSyncStatePersistenceManager snapContext,
final ProtocolContext protocolContext,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final InMemoryTasksPriorityQueues<SnapDataRequest> snapTaskCollection,
final SnapSyncConfiguration snapSyncConfiguration,
final int maxOutstandingRequests,
final int maxNodeRequestsWithoutProgress,
final long minMillisBeforeStalling,
final Clock clock,
final MetricsSystem metricsSystem) {
this.ethContext = ethContext;
this.protocolContext = protocolContext;
this.worldStateStorageCoordinator = worldStateStorageCoordinator;
this.snapContext = snapContext;
this.snapTaskCollection = snapTaskCollection;
this.snapSyncConfiguration = snapSyncConfiguration;
this.maxOutstandingRequests = maxOutstandingRequests;
this.maxNodeRequestsWithoutProgress = maxNodeRequestsWithoutProgress;
this.minMillisBeforeStalling = minMillisBeforeStalling;
this.clock = clock;
this.metricsSystem = metricsSystem;
metricsSystem.createIntegerGauge(
BesuMetricCategory.SYNCHRONIZER,
"snap_world_state_node_requests_since_last_progress_current",
"Number of world state requests made since the last time new data was returned",
downloadStateValue(SnapWorldDownloadState::getRequestsSinceLastProgress));
metricsSystem.createIntegerGauge(
BesuMetricCategory.SYNCHRONIZER,
"snap_world_state_inflight_requests_current",
"Number of in progress requests for world state data",
downloadStateValue(SnapWorldDownloadState::getOutstandingTaskCount));
}
private IntSupplier downloadStateValue(final Function<SnapWorldDownloadState, Integer> getter) {
return () -> {
final SnapWorldDownloadState state = this.downloadState.get();
return state != null ? getter.apply(state) : 0;
};
}
@Override
public CompletableFuture<Void> run(
final FastSyncActions fastSyncActions, final FastSyncState fastSyncState) {
synchronized (this) {
final SnapWorldDownloadState oldDownloadState = this.downloadState.get();
if (oldDownloadState != null && oldDownloadState.isDownloading()) {
final CompletableFuture<Void> failed = new CompletableFuture<>();
failed.completeExceptionally(
new IllegalStateException(
"Cannot run an already running " + this.getClass().getSimpleName()));
return failed;
}
final SnapSyncProcessState snapSyncState = (SnapSyncProcessState) fastSyncState;
final BlockHeader header = fastSyncState.getPivotBlockHeader().get();
final Hash stateRoot = header.getStateRoot();
LOG.info(
"Downloading world state from peers for pivot block {}. State root {} pending requests {}",
header.toLogString(),
stateRoot,
snapTaskCollection.size());
final SnapSyncMetricsManager snapsyncMetricsManager =
new SnapSyncMetricsManager(metricsSystem, ethContext);
final SnapWorldDownloadState newDownloadState =
new SnapWorldDownloadState(
worldStateStorageCoordinator,
snapContext,
protocolContext.getBlockchain(),
snapSyncState,
snapTaskCollection,
maxNodeRequestsWithoutProgress,
minMillisBeforeStalling,
snapsyncMetricsManager,
clock);
final Map<Bytes32, Bytes32> ranges = RangeManager.generateAllRanges(16);
snapsyncMetricsManager.initRange(ranges);
final List<AccountRangeDataRequest> currentAccountRange =
snapContext.getCurrentAccountRange();
final Set<Bytes> inconsistentAccounts = snapContext.getAccountsHealingList();
if (!currentAccountRange.isEmpty()) { // continue to download worldstate ranges
newDownloadState.setAccountsHealingList(inconsistentAccounts);
snapContext
.getCurrentAccountRange()
.forEach(
snapDataRequest -> {
snapsyncMetricsManager.notifyRangeProgress(
DOWNLOAD, snapDataRequest.getStartKeyHash(), snapDataRequest.getEndKeyHash());
newDownloadState.enqueueRequest(snapDataRequest);
});
} else if (!snapContext.getAccountsHealingList().isEmpty()) { // restart only the heal step
snapSyncState.setHealTrieStatus(true);
worldStateStorageCoordinator.applyOnMatchingStrategy(
DataStorageFormat.BONSAI,
strategy -> {
BonsaiWorldStateKeyValueStorage onBonsai = (BonsaiWorldStateKeyValueStorage) strategy;
onBonsai.clearFlatDatabase();
onBonsai.clearTrieLog();
});
newDownloadState.setAccountsHealingList(inconsistentAccounts);
newDownloadState.enqueueRequest(
SnapDataRequest.createAccountTrieNodeDataRequest(
stateRoot, Bytes.EMPTY, snapContext.getAccountsHealingList()));
} else {
// start from scratch
worldStateStorageCoordinator.clear();
// we have to upgrade to full flat db mode if we are in bonsai mode
if (snapSyncConfiguration.isFlatDbHealingEnabled()) {
worldStateStorageCoordinator.applyOnMatchingStrategy(
DataStorageFormat.BONSAI,
strategy -> {
BonsaiWorldStateKeyValueStorage onBonsai =
(BonsaiWorldStateKeyValueStorage) strategy;
onBonsai.upgradeToFullFlatDbMode();
});
}
ranges.forEach(
(key, value) ->
newDownloadState.enqueueRequest(
createAccountRangeDataRequest(stateRoot, key, value)));
}
Optional<CompleteTaskStep> maybeCompleteTask =
Optional.of(new CompleteTaskStep(snapSyncState, metricsSystem));
final DynamicPivotBlockSelector dynamicPivotBlockManager =
new DynamicPivotBlockSelector(
ethContext,
fastSyncActions,
snapSyncState,
snapSyncConfiguration.getPivotBlockWindowValidity(),
snapSyncConfiguration.getPivotBlockDistanceBeforeCaching());
SnapWorldStateDownloadProcess downloadProcess =
SnapWorldStateDownloadProcess.builder()
.configuration(snapSyncConfiguration)
.maxOutstandingRequests(maxOutstandingRequests)
.dynamicPivotBlockSelector(dynamicPivotBlockManager)
.loadLocalDataStep(
new LoadLocalDataStep(
worldStateStorageCoordinator,
newDownloadState,
snapSyncConfiguration,
metricsSystem,
snapSyncState))
.requestDataStep(
new RequestDataStep(
ethContext,
worldStateStorageCoordinator,
snapSyncState,
newDownloadState,
snapSyncConfiguration,
metricsSystem))
.persistDataStep(
new PersistDataStep(
snapSyncState,
worldStateStorageCoordinator,
newDownloadState,
snapSyncConfiguration))
.completeTaskStep(maybeCompleteTask.get())
.downloadState(newDownloadState)
.fastSyncState(snapSyncState)
.metricsSystem(metricsSystem)
.build();
newDownloadState.setPivotBlockSelector(dynamicPivotBlockManager);
return newDownloadState.startDownload(downloadProcess, ethContext.getScheduler());
}
}
@Override
public void cancel() {
synchronized (this) {
final SnapWorldDownloadState downloadState = this.downloadState.get();
if (downloadState != null) {
downloadState.getDownloadFuture().cancel(true);
}
}
}
@Override
public Optional<Long> getPulledStates() {
return Optional.empty();
}
@Override
public Optional<Long> getKnownStates() {
return Optional.empty();
}
}