FastSyncDownloader.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.fastsync;
import static org.hyperledger.besu.util.FutureUtils.exceptionallyCompose;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.MaxRetriesReachedException;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerRequirements;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.StalledDownloadException;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader;
import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import org.hyperledger.besu.plugin.services.storage.DataStorageFormat;
import org.hyperledger.besu.services.tasks.TaskCollection;
import org.hyperledger.besu.util.ExceptionUtils;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FastSyncDownloader<REQUEST> {
private static final Duration FAST_SYNC_RETRY_DELAY = Duration.ofSeconds(5);
@SuppressWarnings("PrivateStaticFinalLoggers")
protected final Logger LOG = LoggerFactory.getLogger(getClass());
private final WorldStateStorageCoordinator worldStateStorageCoordinator;
private final WorldStateDownloader worldStateDownloader;
private final TaskCollection<REQUEST> taskCollection;
private final Path fastSyncDataDirectory;
private volatile Optional<TrailingPeerRequirements> trailingPeerRequirements = Optional.empty();
private final AtomicBoolean running = new AtomicBoolean(false);
protected final FastSyncActions fastSyncActions;
protected final FastSyncStateStorage fastSyncStateStorage;
protected FastSyncState initialFastSyncState;
public FastSyncDownloader(
final FastSyncActions fastSyncActions,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final WorldStateDownloader worldStateDownloader,
final FastSyncStateStorage fastSyncStateStorage,
final TaskCollection<REQUEST> taskCollection,
final Path fastSyncDataDirectory,
final FastSyncState initialFastSyncState) {
this.fastSyncActions = fastSyncActions;
this.worldStateStorageCoordinator = worldStateStorageCoordinator;
this.worldStateDownloader = worldStateDownloader;
this.fastSyncStateStorage = fastSyncStateStorage;
this.taskCollection = taskCollection;
this.fastSyncDataDirectory = fastSyncDataDirectory;
this.initialFastSyncState = initialFastSyncState;
}
public CompletableFuture<FastSyncState> start() {
if (!running.compareAndSet(false, true)) {
throw new IllegalStateException("SyncDownloader already running");
}
LOG.info("Starting sync");
return start(initialFastSyncState);
}
protected CompletableFuture<FastSyncState> start(final FastSyncState fastSyncState) {
worldStateStorageCoordinator.applyOnMatchingStrategy(
DataStorageFormat.BONSAI,
worldStateKeyValueStorage -> {
BonsaiWorldStateKeyValueStorage onBonsai =
(BonsaiWorldStateKeyValueStorage) worldStateKeyValueStorage;
LOG.info("Clearing bonsai flat account db");
onBonsai.clearFlatDatabase();
onBonsai.clearTrieLog();
});
LOG.debug("Start sync with initial sync state {}", fastSyncState);
return findPivotBlock(fastSyncState, fss -> downloadChainAndWorldState(fastSyncActions, fss));
}
public CompletableFuture<FastSyncState> findPivotBlock(
final FastSyncState fastSyncState,
final Function<FastSyncState, CompletableFuture<FastSyncState>> onNewPivotBlock) {
return exceptionallyCompose(
CompletableFuture.completedFuture(fastSyncState)
.thenCompose(fastSyncActions::selectPivotBlock)
.thenCompose(fastSyncActions::downloadPivotBlockHeader)
.thenApply(this::updateMaxTrailingPeers)
.thenApply(this::storeState)
.thenCompose(onNewPivotBlock),
this::handleFailure);
}
protected CompletableFuture<FastSyncState> handleFailure(final Throwable error) {
trailingPeerRequirements = Optional.empty();
Throwable rootCause = ExceptionUtils.rootCause(error);
if (rootCause instanceof SyncException) {
return CompletableFuture.failedFuture(error);
} else if (rootCause instanceof StalledDownloadException) {
LOG.debug("Stalled sync re-pivoting to newer block.");
return start(FastSyncState.EMPTY_SYNC_STATE);
} else if (rootCause instanceof CancellationException) {
return CompletableFuture.failedFuture(error);
} else if (rootCause instanceof MaxRetriesReachedException) {
LOG.debug(
"A download operation reached the max number of retries, re-pivoting to newer block");
return start(FastSyncState.EMPTY_SYNC_STATE);
} else {
LOG.error(
"Encountered an unexpected error during sync. Restarting sync in "
+ FAST_SYNC_RETRY_DELAY.getSeconds()
+ " seconds.",
error);
return fastSyncActions.scheduleFutureTask(
() -> start(FastSyncState.EMPTY_SYNC_STATE), FAST_SYNC_RETRY_DELAY);
}
}
public void stop() {
synchronized (this) {
if (running.compareAndSet(true, false)) {
LOG.info("Stopping sync");
// Cancelling the world state download will also cause the chain download to be cancelled.
worldStateDownloader.cancel();
}
}
}
public void deleteFastSyncState() {
// Make sure downloader is stopped before we start cleaning up its dependencies
worldStateDownloader.cancel();
try {
taskCollection.close();
if (fastSyncDataDirectory.toFile().exists()) {
// Clean up this data for now (until fast sync resume functionality is in place)
MoreFiles.deleteRecursively(fastSyncDataDirectory, RecursiveDeleteOption.ALLOW_INSECURE);
}
} catch (final IOException e) {
LOG.error("Unable to clean up sync state", e);
}
}
protected FastSyncState updateMaxTrailingPeers(final FastSyncState state) {
if (state.getPivotBlockNumber().isPresent()) {
trailingPeerRequirements =
Optional.of(new TrailingPeerRequirements(state.getPivotBlockNumber().getAsLong(), 0));
} else {
trailingPeerRequirements = Optional.empty();
}
return state;
}
protected FastSyncState storeState(final FastSyncState state) {
fastSyncStateStorage.storeState(state);
return state;
}
protected CompletableFuture<FastSyncState> downloadChainAndWorldState(
final FastSyncActions fastSyncActions, final FastSyncState currentState) {
// Synchronized ensures that stop isn't called while we're in the process of starting a
// world state and chain download. If it did we might wind up starting a new download
// after the stop method had called cancel.
synchronized (this) {
if (!running.get()) {
return CompletableFuture.failedFuture(
new CancellationException("FastSyncDownloader stopped"));
}
final CompletableFuture<Void> worldStateFuture =
worldStateDownloader.run(fastSyncActions, currentState);
final ChainDownloader chainDownloader = fastSyncActions.createChainDownloader(currentState);
final CompletableFuture<Void> chainFuture = chainDownloader.start();
// If either download fails, cancel the other one.
chainFuture.exceptionally(
error -> {
worldStateFuture.cancel(true);
return null;
});
worldStateFuture.exceptionally(
error -> {
chainDownloader.cancel();
return null;
});
return CompletableFuture.allOf(worldStateFuture, chainFuture)
.thenApply(
complete -> {
trailingPeerRequirements = Optional.empty();
return currentState;
});
}
}
public Optional<TrailingPeerRequirements> calculateTrailingPeerRequirements() {
return trailingPeerRequirements;
}
}