BlockPropagationManager.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync;
import static org.hyperledger.besu.util.FutureUtils.exceptionallyCompose;
import org.hyperledger.besu.consensus.merge.ForkchoiceEvent;
import org.hyperledger.besu.consensus.merge.UnverifiedForkchoiceListener;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.BadBlockCause;
import org.hyperledger.besu.ethereum.chain.BadBlockManager;
import org.hyperledger.besu.ethereum.chain.BlockAddedEvent;
import org.hyperledger.besu.ethereum.chain.BlockAddedEvent.EventType;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.ProcessableBlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthMessage;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetBlockFromPeersTask;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.NewBlockHashesMessage;
import org.hyperledger.besu.ethereum.eth.messages.NewBlockHashesMessage.NewBlockHash;
import org.hyperledger.besu.ethereum.eth.messages.NewBlockMessage;
import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.tasks.PersistBlockTask;
import org.hyperledger.besu.ethereum.mainnet.BlockHeaderValidator;
import org.hyperledger.besu.ethereum.mainnet.HeaderValidationMode;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.ethereum.rlp.RLPException;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import org.apache.tuweni.bytes.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BlockPropagationManager implements UnverifiedForkchoiceListener {
private static final Logger LOG = LoggerFactory.getLogger(BlockPropagationManager.class);
private final SynchronizerConfiguration config;
private final ProtocolSchedule protocolSchedule;
private final ProtocolContext protocolContext;
private final EthContext ethContext;
private final SyncState syncState;
private final MetricsSystem metricsSystem;
private final BlockBroadcaster blockBroadcaster;
private final AtomicBoolean started = new AtomicBoolean(false);
private final ProcessingBlocksManager processingBlocksManager;
private final PendingBlocksManager pendingBlocksManager;
private final Duration getBlockTimeoutMillis;
private Optional<Long> onBlockAddedSId = Optional.empty();
private Optional<Long> newBlockSId;
private Optional<Long> newBlockHashesSId;
BlockPropagationManager(
final SynchronizerConfiguration config,
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext,
final SyncState syncState,
final PendingBlocksManager pendingBlocksManager,
final MetricsSystem metricsSystem,
final BlockBroadcaster blockBroadcaster) {
this(
config,
protocolSchedule,
protocolContext,
ethContext,
syncState,
pendingBlocksManager,
metricsSystem,
blockBroadcaster,
new ProcessingBlocksManager());
}
BlockPropagationManager(
final SynchronizerConfiguration config,
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext,
final SyncState syncState,
final PendingBlocksManager pendingBlocksManager,
final MetricsSystem metricsSystem,
final BlockBroadcaster blockBroadcaster,
final ProcessingBlocksManager processingBlocksManager) {
this.config = config;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.metricsSystem = metricsSystem;
this.blockBroadcaster = blockBroadcaster;
this.syncState = syncState;
this.pendingBlocksManager = pendingBlocksManager;
this.syncState.subscribeTTDReached(this::reactToTTDReachedEvent);
this.getBlockTimeoutMillis =
Duration.ofMillis(config.getPropagationManagerGetBlockTimeoutMillis());
this.processingBlocksManager = processingBlocksManager;
}
public void start() {
if (started.compareAndSet(false, true)) {
setupListeners();
} else {
throw new IllegalStateException(
"Attempt to start an already started " + this.getClass().getSimpleName() + ".");
}
}
public void stop() {
if (started.get()) {
clearListeners();
started.set(false);
} else {
LOG.debug("Attempted to stop when we are not even running...");
}
}
public boolean isRunning() {
return started.get();
}
private void setupListeners() {
onBlockAddedSId =
Optional.of(protocolContext.getBlockchain().observeBlockAdded(this::onBlockAdded));
newBlockSId =
Optional.of(
ethContext
.getEthMessages()
.subscribe(EthPV62.NEW_BLOCK, this::handleNewBlockFromNetwork));
newBlockHashesSId =
Optional.of(
ethContext
.getEthMessages()
.subscribe(EthPV62.NEW_BLOCK_HASHES, this::handleNewBlockHashesFromNetwork));
}
private void clearListeners() {
onBlockAddedSId.ifPresent(id -> protocolContext.getBlockchain().removeObserver(id));
newBlockSId.ifPresent(id -> ethContext.getEthMessages().unsubscribe(id, EthPV62.NEW_BLOCK));
newBlockHashesSId.ifPresent(
id -> ethContext.getEthMessages().unsubscribe(id, EthPV62.NEW_BLOCK_HASHES));
onBlockAddedSId = Optional.empty();
newBlockSId = Optional.empty();
newBlockHashesSId = Optional.empty();
}
private void onBlockAdded(final BlockAddedEvent blockAddedEvent) {
// Check to see if any of our pending blocks are now ready for import
final Block newBlock = blockAddedEvent.getBlock();
LOG.atTrace()
.setMessage("Block added event type {} for block {}. Current status {}")
.addArgument(blockAddedEvent::getEventType)
.addArgument(newBlock::toLogString)
.addArgument(this)
.log();
// If there is no children to process, maybe try non announced blocks
if (!maybeProcessPendingChildrenBlocks(newBlock)) {
LOG.atTrace()
.setMessage("There are no pending blocks ready to import for block {}")
.addArgument(newBlock::toLogString)
.log();
maybeProcessNonAnnouncedBlocks(newBlock);
}
if (blockAddedEvent.getEventType().equals(EventType.HEAD_ADVANCED)) {
final long head = protocolContext.getBlockchain().getChainHeadBlockNumber();
final long cutoff = head + config.getBlockPropagationRange().lowerEndpoint();
pendingBlocksManager.purgeBlocksOlderThan(cutoff);
}
}
/**
* Process pending Children if any
*
* @param block the block to process the children
* @return true if block has any pending child
*/
private boolean maybeProcessPendingChildrenBlocks(final Block block) {
final List<Block> readyForImport;
synchronized (pendingBlocksManager) {
// Remove block from pendingBlocks list
pendingBlocksManager.deregisterPendingBlock(block);
// Import any pending blocks that are children of the newly added block
readyForImport = pendingBlocksManager.childrenOf(block.getHash());
}
if (!readyForImport.isEmpty()) {
LOG.atTrace()
.setMessage("Ready to import pending blocks found [{}] for block {}")
.addArgument(
() ->
readyForImport.stream().map(Block::toLogString).collect(Collectors.joining(", ")))
.addArgument(block::toLogString)
.log();
final Supplier<CompletableFuture<List<Block>>> importBlocksTask =
PersistBlockTask.forUnorderedBlocks(
protocolSchedule,
protocolContext,
ethContext,
readyForImport,
HeaderValidationMode.FULL,
metricsSystem);
ethContext
.getScheduler()
.scheduleSyncWorkerTask(importBlocksTask)
.whenComplete(
(r, t) -> {
if (r != null) {
LOG.info(
"Imported {} pending blocks: {}",
r.size(),
r.stream().map(b -> b.getHeader().getNumber()).collect(Collectors.toList()));
}
if (t != null) {
LOG.error("Error importing pending blocks", t);
}
});
}
return !readyForImport.isEmpty();
}
private void maybeProcessNonAnnouncedBlocks(final Block newBlock) {
final long localHeadBlockNumber = protocolContext.getBlockchain().getChainHeadBlockNumber();
if (newBlock.getHeader().getNumber() > localHeadBlockNumber) {
pendingBlocksManager
.lowestAnnouncedBlock()
.map(ProcessableBlockHeader::getNumber)
.ifPresent(
minAnnouncedBlockNumber -> {
final long distance = minAnnouncedBlockNumber - localHeadBlockNumber;
LOG.trace(
"Found lowest announced block {} with distance {}",
minAnnouncedBlockNumber,
distance);
final long firstNonAnnouncedBlockNumber = newBlock.getHeader().getNumber() + 1;
if (distance < config.getBlockPropagationRange().upperEndpoint()
&& minAnnouncedBlockNumber > firstNonAnnouncedBlockNumber) {
if (processingBlocksManager.addNonAnnouncedBlocks(firstNonAnnouncedBlockNumber)) {
retrieveNonAnnouncedBlock(firstNonAnnouncedBlockNumber);
}
}
});
}
}
private void handleNewBlockFromNetwork(final EthMessage message) {
final Blockchain blockchain = protocolContext.getBlockchain();
final NewBlockMessage newBlockMessage = NewBlockMessage.readFrom(message.getData());
try {
final Block block = newBlockMessage.block(protocolSchedule);
LOG.atTrace()
.setMessage("New block from network {} from peer {}. Current status {}")
.addArgument(block::toLogString)
.addArgument(message::getPeer)
.addArgument(this)
.log();
final Difficulty totalDifficulty = newBlockMessage.totalDifficulty(protocolSchedule);
message.getPeer().chainState().updateForAnnouncedBlock(block.getHeader(), totalDifficulty);
// Return early if we don't care about this block
final long localChainHeight = protocolContext.getBlockchain().getChainHeadBlockNumber();
final long bestChainHeight = syncState.bestChainHeight(localChainHeight);
if (!shouldImportBlockAtHeight(
block.getHeader().getNumber(), localChainHeight, bestChainHeight)) {
LOG.atTrace()
.setMessage(
"Do not import new block from network {}, current chain heights are: local {}, best {}")
.addArgument(block::toLogString)
.addArgument(localChainHeight)
.addArgument(bestChainHeight)
.log();
return;
}
if (pendingBlocksManager.contains(block.getHash())) {
LOG.atTrace()
.setMessage("New block from network {} is already pending")
.addArgument(block::toLogString)
.log();
return;
}
if (blockchain.contains(block.getHash())) {
LOG.atTrace()
.setMessage("New block from network {} is already present")
.addArgument(block::toLogString)
.log();
return;
}
importOrSavePendingBlock(block, message.getPeer().nodeId());
} catch (final RLPException e) {
LOG.debug(
"Malformed NEW_BLOCK message received from peer (BREACH_OF_PROTOCOL), disconnecting: {}",
message.getPeer(),
e);
message.getPeer().disconnect(DisconnectReason.BREACH_OF_PROTOCOL_MALFORMED_MESSAGE_RECEIVED);
}
}
private void handleNewBlockHashesFromNetwork(final EthMessage message) {
final Blockchain blockchain = protocolContext.getBlockchain();
final NewBlockHashesMessage newBlockHashesMessage =
NewBlockHashesMessage.readFrom(message.getData());
try {
// Register announced blocks
final List<NewBlockHash> announcedBlocks =
Lists.newArrayList(newBlockHashesMessage.getNewHashes());
LOG.atTrace()
.setMessage("New block hashes from network {} from peer {}. Current status {}")
.addArgument(() -> toLogString(announcedBlocks))
.addArgument(message::getPeer)
.addArgument(this)
.log();
for (final NewBlockHash announcedBlock : announcedBlocks) {
message.getPeer().registerKnownBlock(announcedBlock.hash());
message.getPeer().registerHeight(announcedBlock.hash(), announcedBlock.number());
}
// Filter announced blocks for blocks we care to import
final long localChainHeight = protocolContext.getBlockchain().getChainHeadBlockNumber();
final long bestChainHeight = syncState.bestChainHeight(localChainHeight);
final List<NewBlockHash> relevantAnnouncements =
announcedBlocks.stream()
.filter(a -> shouldImportBlockAtHeight(a.number(), localChainHeight, bestChainHeight))
.collect(Collectors.toList());
// Filter for blocks we don't yet know about
final List<NewBlockHash> newBlocks = new ArrayList<>();
for (final NewBlockHash announcedBlock : relevantAnnouncements) {
if (pendingBlocksManager.contains(announcedBlock.hash())) {
LOG.trace("New block hash from network {} is already pending", announcedBlock);
continue;
}
if (processingBlocksManager.alreadyImporting(announcedBlock.hash())) {
LOG.trace("New block hash from network {} is already importing", announcedBlock);
continue;
}
if (blockchain.contains(announcedBlock.hash())) {
LOG.trace("New block hash from network {} was already imported", announcedBlock);
continue;
}
if (processingBlocksManager.addRequestedBlock(announcedBlock.hash())) {
newBlocks.add(announcedBlock);
} else {
LOG.trace("New block hash from network {} was already requested", announcedBlock);
}
}
// Process known blocks we care about
for (final NewBlockHash newBlock : newBlocks) {
processAnnouncedBlock(message.getPeer(), newBlock);
}
} catch (final RLPException e) {
LOG.debug(
"Malformed NEW_BLOCK_HASHES message received from peer (BREACH_OF_PROTOCOL), disconnecting: {}",
message.getPeer(),
e);
message.getPeer().disconnect(DisconnectReason.BREACH_OF_PROTOCOL_MALFORMED_MESSAGE_RECEIVED);
}
}
private CompletableFuture<Block> retrieveNonAnnouncedBlock(final long blockNumber) {
LOG.trace("Retrieve non announced block {} from peers", blockNumber);
return getBlockFromPeers(Optional.empty(), blockNumber, Optional.empty());
}
private CompletableFuture<Block> processAnnouncedBlock(
final EthPeer peer, final NewBlockHash blockHash) {
LOG.trace("Retrieve announced block by header {} from peers", blockHash);
return getBlockFromPeers(Optional.of(peer), blockHash.number(), Optional.of(blockHash.hash()));
}
private void requestParentBlock(final Block block) {
final BlockHeader blockHeader = block.getHeader();
if (processingBlocksManager.addRequestedBlock(blockHeader.getParentHash())) {
retrieveParentBlock(blockHeader);
} else {
LOG.debug("Parent block with hash {} is already requested", blockHeader.getParentHash());
}
}
private CompletableFuture<Block> retrieveParentBlock(final BlockHeader blockHeader) {
final long targetParentBlockNumber = blockHeader.getNumber() - 1L;
final Hash targetParentBlockHash = blockHeader.getParentHash();
LOG.info("Retrieving parent {} of block {}", targetParentBlockHash, blockHeader.toLogString());
return getBlockFromPeers(
Optional.empty(), targetParentBlockNumber, Optional.of(targetParentBlockHash));
}
private CompletableFuture<Block> getBlockFromPeers(
final Optional<EthPeer> preferredPeer,
final long blockNumber,
final Optional<Hash> maybeBlockHash) {
return repeatableGetBlockFromPeer(preferredPeer, blockNumber, maybeBlockHash)
.whenComplete(
(block, throwable) -> {
if (block != null) {
LOG.atDebug()
.setMessage("Successfully retrieved block {}")
.addArgument(block::toLogString)
.log();
processingBlocksManager.registerReceivedBlock(block);
} else {
if (throwable != null) {
LOG.warn(
"Failed to retrieve block "
+ logBlockNumberMaybeHash(blockNumber, maybeBlockHash),
throwable);
} else {
// this could happen if we give up at some point since we find that it make no
// sense to retry
LOG.atDebug()
.setMessage("Block {} not retrieved")
.addArgument(() -> logBlockNumberMaybeHash(blockNumber, maybeBlockHash))
.log();
}
processingBlocksManager.registerFailedGetBlock(blockNumber, maybeBlockHash);
}
});
}
private CompletableFuture<Block> repeatableGetBlockFromPeer(
final Optional<EthPeer> preferredPeer,
final long blockNumber,
final Optional<Hash> maybeBlockHash) {
return exceptionallyCompose(
scheduleGetBlockFromPeers(preferredPeer, blockNumber, maybeBlockHash),
handleGetBlockErrors(blockNumber, maybeBlockHash))
.thenCompose(r -> maybeRepeatGetBlock(blockNumber, maybeBlockHash));
}
private Function<Throwable, CompletionStage<Block>> handleGetBlockErrors(
final long blockNumber, final Optional<Hash> maybeBlockHash) {
return throwable -> {
LOG.atDebug()
.setMessage("Temporary failure retrieving block {} from peers with error {}")
.addArgument(() -> logBlockNumberMaybeHash(blockNumber, maybeBlockHash))
.addArgument(throwable)
.log();
return CompletableFuture.completedFuture(null);
};
}
private CompletableFuture<Block> maybeRepeatGetBlock(
final long blockNumber, final Optional<Hash> maybeBlockHash) {
final MutableBlockchain blockchain = protocolContext.getBlockchain();
final Optional<Block> maybeBlock =
maybeBlockHash
.map(hash -> blockchain.getBlockByHash(hash))
.orElseGet(() -> blockchain.getBlockByNumber(blockNumber));
// check if we got this block by other means
if (maybeBlock.isPresent()) {
final Block block = maybeBlock.get();
LOG.atDebug()
.setMessage("No need to retry to get block {} since it is already present")
.addArgument(block::toLogString)
.log();
return CompletableFuture.completedFuture(block);
}
final long localChainHeight = blockchain.getChainHeadBlockNumber();
final long bestChainHeight = syncState.bestChainHeight(localChainHeight);
if (!shouldImportBlockAtHeight(blockNumber, localChainHeight, bestChainHeight)) {
LOG.atDebug()
.setMessage("Not retrying to get block {} since we are too far from local chain head {}")
.addArgument(() -> logBlockNumberMaybeHash(blockNumber, maybeBlockHash))
.addArgument(blockchain.getChainHead()::toLogString)
.log();
return CompletableFuture.completedFuture(null);
}
LOG.atDebug()
.setMessage("Retrying to get block {}")
.addArgument(() -> logBlockNumberMaybeHash(blockNumber, maybeBlockHash))
.log();
return ethContext
.getScheduler()
.scheduleSyncWorkerTask(
() -> repeatableGetBlockFromPeer(Optional.empty(), blockNumber, maybeBlockHash));
}
private CompletableFuture<Block> scheduleGetBlockFromPeers(
final Optional<EthPeer> maybePreferredPeer,
final long blockNumber,
final Optional<Hash> maybeBlockHash) {
final RetryingGetBlockFromPeersTask getBlockTask =
RetryingGetBlockFromPeersTask.create(
protocolSchedule,
ethContext,
metricsSystem,
Math.max(1, ethContext.getEthPeers().peerCount()),
maybeBlockHash,
blockNumber);
maybePreferredPeer.ifPresent(getBlockTask::assignPeer);
var future =
ethContext
.getScheduler()
.scheduleSyncWorkerTask(getBlockTask::run)
.thenCompose(r -> importOrSavePendingBlock(r.getResult(), r.getPeer().nodeId()));
ethContext.getScheduler().failAfterTimeout(future, getBlockTimeoutMillis);
return future;
}
private void broadcastBlock(final Block block, final BlockHeader parent) {
final Difficulty totalDifficulty =
protocolContext
.getBlockchain()
.getTotalDifficultyByHash(parent.getHash())
.get()
.add(block.getHeader().getDifficulty());
blockBroadcaster.propagate(block, totalDifficulty);
}
@VisibleForTesting
CompletableFuture<Block> importOrSavePendingBlock(final Block block, final Bytes nodeId) {
// Synchronize to avoid race condition where block import event fires after the
// blockchain.contains() check and before the block is registered, causing onBlockAdded() to be
// invoked for the parent of this block before we are able to register it.
LOG.atTrace()
.setMessage("Import or save pending block {}")
.addArgument(block::toLogString)
.log();
if (!protocolContext.getBlockchain().contains(block.getHeader().getParentHash())) {
// Block isn't connected to local chain, save it to pending blocks collection
if (savePendingBlock(block, nodeId)) {
// if block is saved as pending, try to resolve it
maybeProcessPendingBlocks(block);
}
return CompletableFuture.completedFuture(block);
}
if (!processingBlocksManager.addImportingBlock(block.getHash())) {
LOG.atTrace()
.setMessage("We're already importing this block {}")
.addArgument(block::toLogString)
.log();
return CompletableFuture.completedFuture(block);
}
if (protocolContext.getBlockchain().contains(block.getHash())) {
LOG.atTrace()
.setMessage("We've already imported this block {}")
.addArgument(block::toLogString)
.log();
processingBlocksManager.registerBlockImportDone(block.getHash());
return CompletableFuture.completedFuture(block);
}
final BlockHeader parent =
protocolContext
.getBlockchain()
.getBlockHeader(block.getHeader().getParentHash())
.orElseThrow(
() ->
new IllegalArgumentException(
"Incapable of retrieving header from non-existent parent of "
+ block.toLogString()));
final ProtocolSpec protocolSpec = protocolSchedule.getByBlockHeader(block.getHeader());
final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator();
final BadBlockManager badBlockManager = protocolContext.getBadBlockManager();
return ethContext
.getScheduler()
.scheduleSyncWorkerTask(
() ->
validateAndProcessPendingBlock(
blockHeaderValidator, block, parent, badBlockManager));
}
/**
* Save the given block.
*
* @param block the block to track
* @param nodeId node that sent the block
* @return true if the block was added (was not previously present)
*/
private boolean savePendingBlock(final Block block, final Bytes nodeId) {
synchronized (pendingBlocksManager) {
if (pendingBlocksManager.registerPendingBlock(block, nodeId)) {
LOG.info(
"Saved announced block for future import {} - {} saved block(s)",
block.toLogString(),
pendingBlocksManager.size());
return true;
}
return false;
}
}
/**
* Try to request the lowest ancestor for the given pending block or process the descendants if
* the ancestor is already in the chain
*/
private void maybeProcessPendingBlocks(final Block block) {
// Try to get the lowest ancestor pending for this block, so we can import it
final Optional<Block> lowestPending = pendingBlocksManager.pendingAncestorBlockOf(block);
if (lowestPending.isPresent()) {
final Block lowestPendingBlock = lowestPending.get();
// If the parent of the lowest ancestor is not in the chain, request it.
if (!protocolContext
.getBlockchain()
.contains(lowestPendingBlock.getHeader().getParentHash())) {
requestParentBlock(lowestPendingBlock);
} else {
LOG.trace("Parent block is already in the chain");
// if the parent is already imported, process its children
maybeProcessPendingChildrenBlocks(lowestPendingBlock);
}
}
}
private CompletableFuture<Block> validateAndProcessPendingBlock(
final BlockHeaderValidator blockHeaderValidator,
final Block block,
final BlockHeader parent,
final BadBlockManager badBlockManager) {
final HeaderValidationMode validationMode = HeaderValidationMode.FULL;
if (blockHeaderValidator.validateHeader(
block.getHeader(), parent, protocolContext, validationMode)) {
ethContext.getScheduler().scheduleSyncWorkerTask(() -> broadcastBlock(block, parent));
return runImportTask(block);
} else {
processingBlocksManager.registerBlockImportDone(block.getHash());
final String description = String.format("Failed header validation (%s)", validationMode);
badBlockManager.addBadBlock(block, BadBlockCause.fromValidationFailure(description));
LOG.warn(
"Added to bad block manager for invalid header, failed to import announced block {}",
block.toLogString());
return CompletableFuture.completedFuture(block);
}
}
private CompletableFuture<Block> runImportTask(final Block block) {
final PersistBlockTask importTask =
PersistBlockTask.create(
protocolSchedule,
protocolContext,
ethContext,
block,
HeaderValidationMode.NONE,
metricsSystem);
return importTask
.run()
.whenComplete(
(result, throwable) -> {
processingBlocksManager.registerBlockImportDone(block.getHash());
if (throwable != null) {
LOG.warn("Failed to import announced block {}", block.toLogString());
}
});
}
// Only import blocks within a certain range of our head and sync target
private boolean shouldImportBlockAtHeight(
final long blockNumber, final long localHeight, final long bestChainHeight) {
final long distanceFromLocalHead = blockNumber - localHeight;
final long distanceFromBestPeer = blockNumber - bestChainHeight;
final Range<Long> importRange = config.getBlockPropagationRange();
return importRange.contains(distanceFromLocalHead)
&& importRange.contains(distanceFromBestPeer);
}
private String toLogString(final Collection<NewBlockHash> newBlockHashs) {
return newBlockHashs.stream()
.map(NewBlockHash::toString)
.collect(Collectors.joining(", ", "[", "]"));
}
private void reactToTTDReachedEvent(final boolean ttdReached) {
if (started.get() && ttdReached) {
LOG.info("Block propagation was running, then ttd reached");
} else if (!started.get()) {
start();
}
}
@Override
public String toString() {
return "BlockPropagationManager{"
+ processingBlocksManager
+ ", pendingBlocksManager="
+ pendingBlocksManager
+ '}';
}
private String logBlockNumberMaybeHash(
final long blockNumber, final Optional<Hash> maybeBlockHash) {
return blockNumber + maybeBlockHash.map(h -> " (" + h + ")").orElse("");
}
@Override
public void onNewUnverifiedForkchoice(final ForkchoiceEvent event) {
if (event.hasValidFinalizedBlockHash()) {
stop();
}
}
static class ProcessingBlocksManager {
private final Set<Hash> importingBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<Hash> requestedBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<Long> requestedNonAnnouncedBlocks =
Collections.newSetFromMap(new ConcurrentHashMap<>());
boolean addRequestedBlock(final Hash hash) {
return requestedBlocks.add(hash);
}
public boolean addNonAnnouncedBlocks(final long blockNumber) {
return requestedNonAnnouncedBlocks.add(blockNumber);
}
public boolean alreadyImporting(final Hash hash) {
return importingBlocks.contains(hash);
}
public synchronized void registerReceivedBlock(final Block block) {
requestedBlocks.remove(block.getHash());
requestedNonAnnouncedBlocks.remove(block.getHeader().getNumber());
}
public synchronized void registerFailedGetBlock(
final long blockNumber, final Optional<Hash> maybeBlockHash) {
requestedNonAnnouncedBlocks.remove(blockNumber);
maybeBlockHash.ifPresent(requestedBlocks::remove);
}
public boolean addImportingBlock(final Hash hash) {
return importingBlocks.add(hash);
}
public void registerBlockImportDone(final Hash hash) {
importingBlocks.remove(hash);
}
@Override
public synchronized String toString() {
return "ProcessingBlocksManager{"
+ "importingBlocks="
+ importingBlocks
+ ", requestedBlocks="
+ requestedBlocks
+ ", requestedNonAnnouncedBlocks="
+ requestedNonAnnouncedBlocks
+ '}';
}
}
}