DynamicPivotBlockSelector.java
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.snapsync;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.ProcessableBlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* DynamicPivotBlockSelector is responsible for dynamically selecting the pivot block for Snapsync.
* It uses an algorithm to find the most suitable pivot block based on the current network
* conditions.
*/
public class DynamicPivotBlockSelector {
private static final Duration DEFAULT_CHECK_INTERVAL = Duration.ofSeconds(60);
public static final BiConsumer<BlockHeader, Boolean> doNothingOnPivotChange = (___, __) -> {};
private static final Logger LOG = LoggerFactory.getLogger(DynamicPivotBlockSelector.class);
private final AtomicBoolean isTimeToCheckAgain = new AtomicBoolean(true);
private final EthContext ethContext;
private final FastSyncActions syncActions;
private final SnapSyncProcessState syncState;
private final int pivotBlockWindowValidity;
private final int pivotBlockDistanceBeforeCaching;
private Optional<BlockHeader> lastPivotBlockFound;
public DynamicPivotBlockSelector(
final EthContext ethContext,
final FastSyncActions fastSyncActions,
final SnapSyncProcessState fastSyncState,
final int pivotBlockWindowValidity,
final int pivotBlockDistanceBeforeCaching) {
this.ethContext = ethContext;
this.syncActions = fastSyncActions;
this.syncState = fastSyncState;
this.pivotBlockWindowValidity = pivotBlockWindowValidity;
this.pivotBlockDistanceBeforeCaching = pivotBlockDistanceBeforeCaching;
this.lastPivotBlockFound = Optional.empty();
}
public void check(final BiConsumer<BlockHeader, Boolean> onNewPivotBlock) {
if (isTimeToCheckAgain.compareAndSet(true, false)) {
AtomicBoolean delayNextCheck = new AtomicBoolean(false);
syncState
.getPivotBlockNumber()
.ifPresent(
currentPivotBlockNumber -> {
final long bestChainHeight = syncActions.getBestChainHeight();
final long distanceNextPivotBlock =
bestChainHeight
- lastPivotBlockFound
.map(ProcessableBlockHeader::getNumber)
.orElse(currentPivotBlockNumber);
final CompletableFuture<Void> searchForNewPivot;
if (distanceNextPivotBlock > pivotBlockDistanceBeforeCaching) {
LOG.atDebug()
.setMessage(
"Searching for a new pivot: current pivot {} best chain height {} distance next pivot {} last pivot block found {}")
.addArgument(currentPivotBlockNumber)
.addArgument(bestChainHeight)
.addArgument(distanceNextPivotBlock)
.addArgument(this::logLastPivotBlockFound)
.log();
searchForNewPivot =
CompletableFuture.completedFuture(FastSyncState.EMPTY_SYNC_STATE)
.thenCompose(syncActions::selectPivotBlock)
.thenCompose(
fss -> {
if (isSamePivotBlock(fss)) {
LOG.atDebug()
.setMessage(
"New pivot {} is equal to last found {}, nothing to do")
.addArgument(fss::getPivotBlockHash)
.addArgument(this::logLastPivotBlockFound)
.log();
return CompletableFuture.completedFuture(null);
}
return downloadNewPivotBlock(fss);
})
.whenComplete(
(unused, throwable) -> {
if (throwable != null) {
LOG.debug("Error while searching for a new pivot", throwable);
}
});
} else {
searchForNewPivot = CompletableFuture.completedFuture(null);
}
try {
searchForNewPivot
.thenRun(
() -> {
final long distance = bestChainHeight - currentPivotBlockNumber;
if (distance > pivotBlockWindowValidity) {
LOG.atDebug()
.setMessage(
"Switch to new pivot: current pivot {} is distant {} from current best chain height {} last pivot block found {}")
.addArgument(currentPivotBlockNumber)
.addArgument(distance)
.addArgument(bestChainHeight)
.addArgument(this::logLastPivotBlockFound)
.log();
switchToNewPivotBlock(onNewPivotBlock);
}
// delay next check only if we are successful
delayNextCheck.set(true);
})
.get();
} catch (InterruptedException | ExecutionException e) {
LOG.debug("Exception while searching for new pivot", e);
}
});
scheduleNextCheck(delayNextCheck.get());
}
}
private CompletableFuture<Void> downloadNewPivotBlock(final FastSyncState fss) {
return syncActions
.downloadPivotBlockHeader(fss)
.thenAccept(
fssWithHeader -> {
lastPivotBlockFound = fssWithHeader.getPivotBlockHeader();
LOG.atDebug()
.setMessage("Found new pivot block {}")
.addArgument(this::logLastPivotBlockFound)
.log();
})
.orTimeout(5, TimeUnit.MINUTES);
}
private boolean isSamePivotBlock(final FastSyncState fss) {
return lastPivotBlockFound.isPresent()
&& fss.hasPivotBlockHash()
&& lastPivotBlockFound.get().getHash().equals(fss.getPivotBlockHash().get());
}
private void scheduleNextCheck(final boolean delayNextCheck) {
if (delayNextCheck) {
ethContext
.getScheduler()
.scheduleFutureTask(
() -> {
LOG.debug("Is time to check the pivot again");
isTimeToCheckAgain.set(true);
},
DEFAULT_CHECK_INTERVAL);
} else {
isTimeToCheckAgain.set(true);
}
}
public void switchToNewPivotBlock(final BiConsumer<BlockHeader, Boolean> onSwitchDone) {
lastPivotBlockFound.ifPresentOrElse(
blockHeader -> {
if (syncState.getPivotBlockHeader().filter(blockHeader::equals).isEmpty()) {
LOG.atDebug()
.setMessage("Setting new pivot block {} with state root {}")
.addArgument(blockHeader::toLogString)
.addArgument(blockHeader.getStateRoot())
.log();
syncState.setCurrentHeader(blockHeader);
lastPivotBlockFound = Optional.empty();
}
onSwitchDone.accept(blockHeader, true);
},
() -> onSwitchDone.accept(syncState.getPivotBlockHeader().orElseThrow(), false));
}
public boolean isBlockchainBehind() {
return syncState
.getPivotBlockHeader()
.map(pivot -> syncActions.isBlockchainBehind(pivot.getNumber()))
.orElse(false);
}
private String logLastPivotBlockFound() {
return lastPivotBlockFound.map(BlockHeader::toLogString).orElse("empty");
}
}