FastSyncActions.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.fastsync;
import static java.util.concurrent.CompletableFuture.completedFuture;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.tasks.RetryingGetHeaderFromPeerByHashTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FastSyncActions {
private static final Logger LOG = LoggerFactory.getLogger(FastSyncActions.class);
protected final SynchronizerConfiguration syncConfig;
protected final WorldStateStorageCoordinator worldStateStorageCoordinator;
protected final ProtocolSchedule protocolSchedule;
protected final ProtocolContext protocolContext;
protected final EthContext ethContext;
protected final SyncState syncState;
protected final PivotBlockSelector pivotBlockSelector;
protected final MetricsSystem metricsSystem;
protected final Counter pivotBlockSelectionCounter;
protected final AtomicLong pivotBlockGauge = new AtomicLong(0);
public FastSyncActions(
final SynchronizerConfiguration syncConfig,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext,
final SyncState syncState,
final PivotBlockSelector pivotBlockSelector,
final MetricsSystem metricsSystem) {
this.syncConfig = syncConfig;
this.worldStateStorageCoordinator = worldStateStorageCoordinator;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.syncState = syncState;
this.pivotBlockSelector = pivotBlockSelector;
this.metricsSystem = metricsSystem;
pivotBlockSelectionCounter =
metricsSystem.createCounter(
BesuMetricCategory.SYNCHRONIZER,
"fast_sync_pivot_block_selected_count",
"Number of times a fast sync pivot block has been selected");
metricsSystem.createLongGauge(
BesuMetricCategory.SYNCHRONIZER,
"fast_sync_pivot_block_current",
"The current fast sync pivot block",
pivotBlockGauge::get);
}
public SyncState getSyncState() {
return syncState;
}
public long getBestChainHeight() {
return pivotBlockSelector.getBestChainHeight();
}
public CompletableFuture<FastSyncState> selectPivotBlock(final FastSyncState fastSyncState) {
return fastSyncState.hasPivotBlockHeader()
? completedFuture(fastSyncState)
: selectNewPivotBlock();
}
private CompletableFuture<FastSyncState> selectNewPivotBlock() {
return pivotBlockSelector
.selectNewPivotBlock()
.map(CompletableFuture::completedFuture)
.orElseGet(this::retrySelectPivotBlockAfterDelay);
}
<T> CompletableFuture<T> scheduleFutureTask(
final Supplier<CompletableFuture<T>> future, final Duration duration) {
return ethContext.getScheduler().scheduleFutureTask(future, duration);
}
private CompletableFuture<FastSyncState> retrySelectPivotBlockAfterDelay() {
return ethContext
.getScheduler()
.scheduleFutureTask(pivotBlockSelector::prepareRetry, Duration.ofSeconds(5))
.thenCompose(ignore -> selectNewPivotBlock());
}
public CompletableFuture<FastSyncState> downloadPivotBlockHeader(
final FastSyncState currentState) {
return internalDownloadPivotBlockHeader(currentState).thenApply(this::updateStats);
}
private CompletableFuture<FastSyncState> internalDownloadPivotBlockHeader(
final FastSyncState currentState) {
if (currentState.hasPivotBlockHeader()) {
LOG.debug("Initial sync state {} already contains the block header", currentState);
return completedFuture(currentState);
}
return waitForPeers(1)
.thenCompose(
unused ->
currentState
.getPivotBlockHash()
.map(this::downloadPivotBlockHeader)
.orElseGet(
() ->
new PivotBlockRetriever(
protocolSchedule,
ethContext,
metricsSystem,
currentState.getPivotBlockNumber().getAsLong(),
syncConfig.getFastSyncMinimumPeerCount(),
syncConfig.getFastSyncPivotDistance())
.downloadPivotBlockHeader()));
}
private FastSyncState updateStats(final FastSyncState fastSyncState) {
pivotBlockSelectionCounter.inc();
fastSyncState
.getPivotBlockHeader()
.ifPresent(blockHeader -> pivotBlockGauge.set(blockHeader.getNumber()));
return fastSyncState;
}
public ChainDownloader createChainDownloader(final FastSyncState currentState) {
return FastSyncChainDownloader.create(
syncConfig,
worldStateStorageCoordinator,
protocolSchedule,
protocolContext,
ethContext,
syncState,
metricsSystem,
currentState);
}
private CompletableFuture<FastSyncState> downloadPivotBlockHeader(final Hash hash) {
LOG.debug("Downloading pivot block header by hash {}", hash);
return RetryingGetHeaderFromPeerByHashTask.byHash(
protocolSchedule,
ethContext,
hash,
pivotBlockSelector.getMinRequiredBlockNumber(),
metricsSystem)
.getHeader()
.whenComplete(
(blockHeader, throwable) -> {
if (throwable != null) {
LOG.debug("Error downloading block header by hash {}", hash);
} else {
LOG.atDebug()
.setMessage("Successfully downloaded pivot block header by hash {}")
.addArgument(blockHeader::toLogString)
.log();
}
})
.thenApply(FastSyncState::new);
}
public boolean isBlockchainBehind(final long blockNumber) {
return protocolContext.getBlockchain().getChainHeadHeader().getNumber() < blockNumber;
}
private CompletableFuture<Void> waitForPeers(final int count) {
final WaitForPeersTask waitForPeersTask =
WaitForPeersTask.create(ethContext, count, metricsSystem);
return waitForPeersTask.run();
}
}