WorldDownloadState.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.worldstate;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import org.hyperledger.besu.services.tasks.InMemoryTasksPriorityQueues;
import org.hyperledger.besu.services.tasks.Task;
import org.hyperledger.besu.services.tasks.TasksPriorityProvider;
import org.hyperledger.besu.util.ExceptionUtils;
import java.time.Clock;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class WorldDownloadState<REQUEST extends TasksPriorityProvider> {
private static final Logger LOG = LoggerFactory.getLogger(WorldDownloadState.class);
private boolean downloadWasResumed;
protected final InMemoryTasksPriorityQueues<REQUEST> pendingRequests;
protected final int maxRequestsWithoutProgress;
private final Clock clock;
private final Set<EthTask<?>> outstandingRequests =
Collections.newSetFromMap(new ConcurrentHashMap<>());
protected CompletableFuture<Void> internalFuture;
private CompletableFuture<Void> downloadFuture;
// Volatile so monitoring can access it without having to synchronize.
protected volatile int requestsSinceLastProgress = 0;
private final long minMillisBeforeStalling;
private volatile long timestampOfLastProgress;
protected Bytes rootNodeData;
protected final WorldStateStorageCoordinator worldStateStorageCoordinator;
protected WorldStateDownloadProcess worldStateDownloadProcess;
public WorldDownloadState(
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final InMemoryTasksPriorityQueues<REQUEST> pendingRequests,
final int maxRequestsWithoutProgress,
final long minMillisBeforeStalling,
final Clock clock) {
this.worldStateStorageCoordinator = worldStateStorageCoordinator;
this.minMillisBeforeStalling = minMillisBeforeStalling;
this.timestampOfLastProgress = clock.millis();
this.downloadWasResumed = !pendingRequests.isEmpty();
this.pendingRequests = pendingRequests;
this.maxRequestsWithoutProgress = maxRequestsWithoutProgress;
this.clock = clock;
this.internalFuture = new CompletableFuture<>();
this.downloadFuture = new CompletableFuture<>();
this.internalFuture.whenComplete(this::cleanup);
this.downloadFuture.exceptionally(
error -> {
// Propagate cancellation back to our internal future.
if (error instanceof CancellationException) {
this.internalFuture.cancel(true);
}
return null;
});
}
public void reset() {
this.timestampOfLastProgress = clock.millis();
this.requestsSinceLastProgress = 0;
this.downloadWasResumed = true;
this.internalFuture = new CompletableFuture<>();
this.downloadFuture = new CompletableFuture<>();
this.internalFuture.whenComplete(this::cleanup);
this.downloadFuture.exceptionally(
error -> {
// Propagate cancellation back to our internal future.
if (error instanceof CancellationException) {
this.internalFuture.cancel(true);
}
return null;
});
}
private synchronized void cleanup(final Void result, final Throwable error) {
// Handle cancellations
if (internalFuture.isCancelled()) {
LOG.info("World state download cancelled");
} else if (error != null) {
if (!(ExceptionUtils.rootCause(error) instanceof StalledDownloadException)) {
LOG.info("World state download failed. ", error);
}
}
cleanupQueues();
if (error != null) {
if (worldStateDownloadProcess != null) {
worldStateDownloadProcess.abort();
}
downloadFuture.completeExceptionally(error);
} else {
downloadFuture.complete(result);
}
}
protected synchronized void cleanupQueues() {
for (final EthTask<?> outstandingRequest : outstandingRequests) {
outstandingRequest.cancel();
}
pendingRequests.clear();
}
public boolean downloadWasResumed() {
return downloadWasResumed;
}
public void addOutstandingTask(final EthTask<?> task) {
outstandingRequests.add(task);
}
public void removeOutstandingTask(final EthTask<?> task) {
outstandingRequests.remove(task);
}
public int getOutstandingTaskCount() {
return outstandingRequests.size();
}
public CompletableFuture<Void> getDownloadFuture() {
return downloadFuture;
}
public synchronized void enqueueRequest(final REQUEST request) {
if (!internalFuture.isDone()) {
pendingRequests.add(request);
notifyAll();
}
}
public synchronized void enqueueRequests(final Stream<REQUEST> requests) {
if (!internalFuture.isDone()) {
requests.forEach(pendingRequests::add);
notifyAll();
}
}
public synchronized Task<REQUEST> dequeueRequestBlocking() {
while (!internalFuture.isDone()) {
Task<REQUEST> task = pendingRequests.remove();
if (task != null) {
return task;
}
try {
wait();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
return null;
}
public synchronized void setRootNodeData(final Bytes rootNodeData) {
this.rootNodeData = rootNodeData;
}
public synchronized void requestComplete(
final boolean madeProgress, final long minMillisBeforeStalling) {
if (madeProgress) {
requestsSinceLastProgress = 0;
timestampOfLastProgress = clock.millis();
} else {
requestsSinceLastProgress++;
if (requestsSinceLastProgress >= maxRequestsWithoutProgress
&& timestampOfLastProgress + minMillisBeforeStalling < clock.millis()) {
markAsStalled(maxRequestsWithoutProgress);
}
}
}
public synchronized void requestComplete(final boolean madeProgress) {
requestComplete(madeProgress, minMillisBeforeStalling);
}
public int getRequestsSinceLastProgress() {
return requestsSinceLastProgress;
}
protected synchronized void markAsStalled(final int maxNodeRequestRetries) {
final String message =
"Download stalled due to too many failures to retrieve node data (>"
+ maxNodeRequestRetries
+ " requests without making progress)";
final WorldStateDownloaderException e = new StalledDownloadException(message);
internalFuture.completeExceptionally(e);
}
public synchronized boolean isDownloading() {
return !internalFuture.isDone();
}
public synchronized void notifyTaskAvailable() {
notifyAll();
}
public CompletableFuture<Void> startDownload(
final WorldStateDownloadProcess worldStateDownloadProcess, final EthScheduler ethScheduler) {
this.worldStateDownloadProcess = worldStateDownloadProcess;
final CompletableFuture<Void> processFuture = worldStateDownloadProcess.start(ethScheduler);
processFuture.whenComplete(
(result, error) -> {
if (error != null
&& !(ExceptionUtils.rootCause(error) instanceof CancellationException)) {
// The pipeline is only ever cancelled by us or shutdown closing the EthScheduler
// In either case we don't want to consider the download failed as we either already
// dealing with it or it's just a normal shutdown. Hence, don't propagate
// CancellationException
internalFuture.completeExceptionally(error);
}
});
return downloadFuture;
}
public void setWorldStateDownloadProcess(
final WorldStateDownloadProcess worldStateDownloadProcess) {
this.worldStateDownloadProcess = worldStateDownloadProcess;
}
public abstract boolean checkCompletion(final BlockHeader header);
}