SyncTargetRangeSource.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.range;
import static java.util.Collections.emptyList;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
private static final Logger LOG = LoggerFactory.getLogger(SyncTargetRangeSource.class);
private static final Duration RETRY_DELAY_DURATION = Duration.ofSeconds(2);
private final RangeHeadersFetcher fetcher;
private final SyncTargetChecker syncTargetChecker;
private final EthPeer peer;
private final EthScheduler ethScheduler;
private final int rangeTimeoutsPermitted;
private final Duration newHeaderWaitDuration;
private final SyncTerminationCondition terminationCondition;
private final Queue<SyncTargetRange> retrievedRanges = new ArrayDeque<>();
private BlockHeader lastRangeEnd;
private boolean reachedEndOfRanges = false;
private Optional<CompletableFuture<List<BlockHeader>>> pendingRequests = Optional.empty();
private int requestFailureCount = 0;
public SyncTargetRangeSource(
final RangeHeadersFetcher fetcher,
final SyncTargetChecker syncTargetChecker,
final EthScheduler ethScheduler,
final EthPeer peer,
final BlockHeader commonAncestor,
final int rangeTimeoutsPermitted,
final SyncTerminationCondition terminationCondition) {
this(
fetcher,
syncTargetChecker,
ethScheduler,
peer,
commonAncestor,
rangeTimeoutsPermitted,
Duration.ofSeconds(5),
terminationCondition);
}
public SyncTargetRangeSource(
final RangeHeadersFetcher fetcher,
final SyncTargetChecker syncTargetChecker,
final EthScheduler ethScheduler,
final EthPeer peer,
final BlockHeader commonAncestor,
final int rangeTimeoutsPermitted,
final Duration newHeaderWaitDuration,
final SyncTerminationCondition terminationCondition) {
this.fetcher = fetcher;
this.syncTargetChecker = syncTargetChecker;
this.ethScheduler = ethScheduler;
this.peer = peer;
this.lastRangeEnd = commonAncestor;
this.rangeTimeoutsPermitted = rangeTimeoutsPermitted;
this.newHeaderWaitDuration = newHeaderWaitDuration;
this.terminationCondition = terminationCondition;
}
@Override
public boolean hasNext() {
return terminationCondition.shouldContinueDownload()
&& (!retrievedRanges.isEmpty()
|| (requestFailureCount < rangeTimeoutsPermitted
&& syncTargetChecker.shouldContinueDownloadingFromSyncTarget(peer, lastRangeEnd)
&& !reachedEndOfRanges));
}
@Override
public SyncTargetRange next() {
if (!retrievedRanges.isEmpty()) {
return retrievedRanges.poll();
}
if (pendingRequests.isPresent()) {
return getRangeFromPendingRequest();
}
if (reachedEndOfRanges) {
return null;
}
if (fetcher.nextRangeEndsAtChainHead(peer, lastRangeEnd)) {
reachedEndOfRanges = true;
return new SyncTargetRange(peer, lastRangeEnd);
}
pendingRequests = Optional.of(getNextRangeHeaders());
return getRangeFromPendingRequest();
}
private CompletableFuture<List<BlockHeader>> getNextRangeHeaders() {
return fetcher
.getNextRangeHeaders(peer, lastRangeEnd)
.exceptionally(
error -> {
LOG.debug("Failed to retrieve range headers", error);
return emptyList();
})
.thenCompose(range -> range.isEmpty() ? pauseBriefly() : completedFuture(range));
}
/**
* Pause after failing to get new range to prevent requesting new range headers in a tight loop.
*
* @return a future that after the pause completes with an empty list.
*/
private CompletableFuture<List<BlockHeader>> pauseBriefly() {
return ethScheduler.scheduleFutureTask(
() -> completedFuture(emptyList()), RETRY_DELAY_DURATION);
}
private SyncTargetRange getRangeFromPendingRequest() {
final CompletableFuture<List<BlockHeader>> pendingRequest = this.pendingRequests.get();
try {
final List<BlockHeader> newHeaders =
pendingRequest.get(newHeaderWaitDuration.toMillis(), MILLISECONDS);
this.pendingRequests = Optional.empty();
if (newHeaders.isEmpty()) {
requestFailureCount++;
} else {
requestFailureCount = 0;
}
for (final BlockHeader header : newHeaders) {
retrievedRanges.add(new SyncTargetRange(peer, lastRangeEnd, header));
lastRangeEnd = header;
}
return retrievedRanges.poll();
} catch (final InterruptedException e) {
LOG.trace("Interrupted while waiting for new range headers", e);
return null;
} catch (final ExecutionException e) {
LOG.debug("Failed to retrieve new range headers", e);
this.pendingRequests = Optional.empty();
requestFailureCount++;
return null;
} catch (final TimeoutException e) {
return null;
}
}
public interface SyncTargetChecker {
boolean shouldContinueDownloadingFromSyncTarget(EthPeer peer, BlockHeader lastRangeHeader);
}
}