PivotBlockConfirmer.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.fastsync;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeerTask;
import org.hyperledger.besu.ethereum.eth.sync.tasks.RetryingGetHeaderFromPeerByNumberTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.FutureUtils;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.tuweni.bytes.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This task will query {@code numberOfPeersToQuery} peers for a particular block number. If any
* peers disagree on the block at this number, the task fails with a {@code
* ContestedPivotBlockException}. The task will succeed only if {@code numberOfPeersToQuery}
* distinct peers all return matching block headers for the specified block number.
*/
class PivotBlockConfirmer {
private static final Logger LOG = LoggerFactory.getLogger(PivotBlockConfirmer.class);
private final EthContext ethContext;
private final MetricsSystem metricsSystem;
private final ProtocolSchedule protocolSchedule;
// The number of peers we need to query to confirm our pivot block
private final int numberOfPeersToQuery;
// The current pivot block number, gets pushed back if peers disagree on the pivot block
private final long pivotBlockNumber;
// The number of times to retry if a peer fails to return an answer to our query
private final int numberOfRetriesPerPeer;
private final CompletableFuture<FastSyncState> result = new CompletableFuture<>();
private final Collection<CompletableFuture<?>> runningQueries = new ConcurrentLinkedQueue<>();
private final Map<Bytes, RetryingGetHeaderFromPeerByNumberTask> pivotBlockQueriesByPeerId =
new ConcurrentHashMap<>();
private final Map<BlockHeader, AtomicInteger> pivotBlockVotes = new ConcurrentHashMap<>();
private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final AtomicBoolean isCancelled = new AtomicBoolean(false);
PivotBlockConfirmer(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final MetricsSystem metricsSystem,
final long pivotBlockNumber,
final int numberOfPeersToQuery,
final int numberOfRetriesPerPeer) {
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.metricsSystem = metricsSystem;
this.pivotBlockNumber = pivotBlockNumber;
this.numberOfPeersToQuery = numberOfPeersToQuery;
this.numberOfRetriesPerPeer = numberOfRetriesPerPeer;
}
public CompletableFuture<FastSyncState> confirmPivotBlock() {
if (isStarted.compareAndSet(false, true)) {
LOG.info(
"Confirm pivot block {} with at least {} peers.", pivotBlockNumber, numberOfPeersToQuery);
queryPeers(pivotBlockNumber);
}
return result;
}
private void queryPeers(final long blockNumber) {
synchronized (runningQueries) {
for (int i = 0; i < numberOfPeersToQuery; i++) {
final CompletableFuture<?> query =
executePivotQuery(blockNumber).whenComplete(this::processReceivedHeader);
runningQueries.add(query);
}
}
}
private void processReceivedHeader(final BlockHeader blockHeader, final Throwable throwable) {
if (throwable != null) {
cancelQueries();
LOG.error("Encountered error while requesting pivot block header", throwable);
result.completeExceptionally(throwable);
return;
}
// Update votes
pivotBlockVotes.putIfAbsent(blockHeader, new AtomicInteger(0));
final int votes = pivotBlockVotes.get(blockHeader).incrementAndGet();
if (pivotBlockVotes.keySet().size() > 1) {
// We've received conflicting signals for the target block, confirmation has failed
cancelQueries();
LOG.info(
"Failed to confirm pivot block {}. Received conflicting headers: {}",
pivotBlockNumber,
votesToString());
result.completeExceptionally(
new ContestedPivotBlockException(pivotBlockNumber, votesToString()));
} else if (votes >= numberOfPeersToQuery) {
// We've received the required number of votes and have selected our pivot block
LOG.info("Confirmed pivot block at {}: {}", pivotBlockNumber, blockHeader.getHash());
result.complete(new FastSyncState(blockHeader));
} else {
LOG.info(
"Received {} confirmation(s) for pivot block header {}: {}",
votes,
pivotBlockNumber,
blockHeader.getHash());
}
}
private void cancelQueries() {
synchronized (runningQueries) {
isCancelled.set(true);
runningQueries.forEach(f -> f.cancel(true));
pivotBlockQueriesByPeerId.values().forEach(EthTask::cancel);
}
}
private String votesToString() {
return pivotBlockVotes.entrySet().stream()
.map(e -> e.getKey().getHash() + " (" + e.getValue().get() + ")")
.collect(Collectors.joining(","));
}
private CompletableFuture<BlockHeader> executePivotQuery(final long blockNumber) {
if (isCancelled.get() || result.isDone()) {
// Stop loop if this task is done
return CompletableFuture.failedFuture(new CancellationException());
}
final Optional<RetryingGetHeaderFromPeerByNumberTask> query = createPivotQuery(blockNumber);
final CompletableFuture<BlockHeader> pivotHeaderFuture;
if (query.isPresent()) {
final CompletableFuture<BlockHeader> headerQuery = query.get().getHeader();
pivotHeaderFuture =
FutureUtils.exceptionallyCompose(headerQuery, (error) -> executePivotQuery(blockNumber));
} else {
// We were unable to find a peer to query, wait and try again
LOG.debug("No peer currently available to query for block {}.", blockNumber);
pivotHeaderFuture =
ethContext
.getScheduler()
.timeout(WaitForPeerTask.create(ethContext, metricsSystem), Duration.ofSeconds(5))
.handle((err, res) -> null) // Ignore result
.thenCompose(res -> executePivotQuery(blockNumber));
}
return pivotHeaderFuture;
}
private Optional<RetryingGetHeaderFromPeerByNumberTask> createPivotQuery(final long blockNumber) {
return ethContext
.getEthPeers()
.streamBestPeers()
.filter(p -> p.chainState().getEstimatedHeight() >= blockNumber)
.filter(EthPeer::isFullyValidated)
.filter(p -> !pivotBlockQueriesByPeerId.keySet().contains(p.nodeId()))
.findFirst()
.flatMap((peer) -> createGetHeaderTask(peer, blockNumber));
}
Optional<RetryingGetHeaderFromPeerByNumberTask> createGetHeaderTask(
final EthPeer peer, final long blockNumber) {
final RetryingGetHeaderFromPeerByNumberTask task =
RetryingGetHeaderFromPeerByNumberTask.forSingleNumber(
protocolSchedule, ethContext, metricsSystem, blockNumber, numberOfRetriesPerPeer);
task.assignPeer(peer);
// Try adding our task
synchronized (runningQueries) {
if (isCancelled.get()) {
// Don't run a new query if this task is already cancelled
return Optional.empty();
}
final RetryingGetHeaderFromPeerByNumberTask preexistingTask =
pivotBlockQueriesByPeerId.putIfAbsent(peer.nodeId(), task);
if (preexistingTask != null) {
// We already have a task for this peer, try again later
return Optional.empty();
}
}
LOG.debug("Query peer {} for block {}.", peer.getLoggableId(), blockNumber);
return Optional.of(task);
}
public static class ContestedPivotBlockException extends RuntimeException {
private final long blockNumber;
public ContestedPivotBlockException(final long blockNumber, final String message) {
super(message);
this.blockNumber = blockNumber;
}
public long getBlockNumber() {
return blockNumber;
}
}
}