MergeCoordinator.java
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.consensus.merge.blockcreation;
import static java.util.stream.Collectors.joining;
import static org.hyperledger.besu.consensus.merge.blockcreation.MergeMiningCoordinator.ForkchoiceResult.Status.INVALID;
import org.hyperledger.besu.consensus.merge.MergeContext;
import org.hyperledger.besu.consensus.merge.PayloadWrapper;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.BlockProcessingResult;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.blockcreation.BlockCreator.BlockCreationResult;
import org.hyperledger.besu.ethereum.chain.BadBlockCause;
import org.hyperledger.besu.ethereum.chain.BadBlockManager;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockWithReceipts;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.Withdrawal;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BadChainListener;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.AbstractGasLimitSpecification;
import org.hyperledger.besu.ethereum.mainnet.HeaderValidationMode;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.trie.MerkleTrieException;
import org.hyperledger.besu.plugin.services.exception.StorageException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** The Merge coordinator. */
public class MergeCoordinator implements MergeMiningCoordinator, BadChainListener {
private static final Logger LOG = LoggerFactory.getLogger(MergeCoordinator.class);
/**
* On PoS you do not need to compete with other nodes for block production, since you have an
* allocated slot for that, so in this case make sense to always try to fill the block, if there
* are enough pending transactions, until the remaining gas is less than the minimum needed for
* the smaller transaction. So for PoS the min-block-occupancy-ratio option is set to always try
* to fill 100% of the block.
*/
private static final double TRY_FILL_BLOCK = 1.0;
private static final long DEFAULT_TARGET_GAS_LIMIT = 30000000L;
/** The Mining parameters. */
protected final MiningParameters miningParameters;
/** The Merge block creator factory. */
protected final MergeBlockCreatorFactory mergeBlockCreatorFactory;
/** The Merge context. */
protected final MergeContext mergeContext;
/** The Protocol context. */
protected final ProtocolContext protocolContext;
/** The Block builder executor. */
protected final EthScheduler ethScheduler;
/** The Backward sync context. */
protected final BackwardSyncContext backwardSyncContext;
/** The Protocol schedule. */
protected final ProtocolSchedule protocolSchedule;
private final Map<PayloadIdentifier, BlockCreationTask> blockCreationTasks =
new ConcurrentHashMap<>();
/**
* Instantiates a new Merge coordinator.
*
* @param protocolContext the protocol context
* @param protocolSchedule the protocol schedule
* @param ethScheduler the block builder executor
* @param transactionPool the pending transactions
* @param miningParams the mining params
* @param backwardSyncContext the backward sync context
* @param depositContractAddress the address of the deposit contract
*/
public MergeCoordinator(
final ProtocolContext protocolContext,
final ProtocolSchedule protocolSchedule,
final EthScheduler ethScheduler,
final TransactionPool transactionPool,
final MiningParameters miningParams,
final BackwardSyncContext backwardSyncContext,
final Optional<Address> depositContractAddress) {
this.protocolContext = protocolContext;
this.protocolSchedule = protocolSchedule;
this.ethScheduler = ethScheduler;
this.mergeContext = protocolContext.getConsensusContext(MergeContext.class);
this.backwardSyncContext = backwardSyncContext;
if (miningParams.getCoinbase().isEmpty()) {
miningParams.setCoinbase(Address.ZERO);
}
if (miningParams.getTargetGasLimit().isEmpty()) {
miningParams.setTargetGasLimit(DEFAULT_TARGET_GAS_LIMIT);
}
miningParams.setMinBlockOccupancyRatio(TRY_FILL_BLOCK);
this.miningParameters = miningParams;
this.mergeBlockCreatorFactory =
(parentHeader, address) -> {
address.ifPresent(miningParams::setCoinbase);
return new MergeBlockCreator(
miningParameters,
parent -> miningParameters.getExtraData(),
transactionPool,
protocolContext,
protocolSchedule,
parentHeader,
depositContractAddress,
ethScheduler);
};
this.backwardSyncContext.subscribeBadChainListener(this);
}
/**
* Instantiates a new Merge coordinator.
*
* @param protocolContext the protocol context
* @param protocolSchedule the protocol schedule
* @param ethScheduler the block builder executor
* @param miningParams the mining params
* @param backwardSyncContext the backward sync context
* @param mergeBlockCreatorFactory the merge block creator factory
*/
public MergeCoordinator(
final ProtocolContext protocolContext,
final ProtocolSchedule protocolSchedule,
final EthScheduler ethScheduler,
final MiningParameters miningParams,
final BackwardSyncContext backwardSyncContext,
final MergeBlockCreatorFactory mergeBlockCreatorFactory) {
this.protocolContext = protocolContext;
this.protocolSchedule = protocolSchedule;
this.ethScheduler = ethScheduler;
this.mergeContext = protocolContext.getConsensusContext(MergeContext.class);
this.backwardSyncContext = backwardSyncContext;
if (miningParams.getTargetGasLimit().isEmpty()) {
miningParams.setTargetGasLimit(DEFAULT_TARGET_GAS_LIMIT);
}
miningParams.setMinBlockOccupancyRatio(TRY_FILL_BLOCK);
this.miningParameters = miningParams;
this.mergeBlockCreatorFactory = mergeBlockCreatorFactory;
this.backwardSyncContext.subscribeBadChainListener(this);
}
@Override
public void start() {}
@Override
public void stop() {}
@Override
public void awaitStop() throws InterruptedException {}
@Override
public boolean enable() {
return false;
}
@Override
public boolean disable() {
return true;
}
@Override
public boolean isMining() {
return true;
}
@Override
public Wei getMinTransactionGasPrice() {
return miningParameters.getMinTransactionGasPrice();
}
@Override
public Wei getMinPriorityFeePerGas() {
return miningParameters.getMinPriorityFeePerGas();
}
@Override
public void setExtraData(final Bytes extraData) {
this.miningParameters.setExtraData(extraData);
}
@Override
public Optional<Address> getCoinbase() {
return miningParameters.getCoinbase();
}
@Override
public Optional<Block> createBlock(
final BlockHeader parentHeader,
final List<Transaction> transactions,
final List<BlockHeader> ommers) {
throw new UnsupportedOperationException("random is required");
}
@Override
public Optional<Block> createBlock(final BlockHeader parentHeader, final long timestamp) {
throw new UnsupportedOperationException("random is required");
}
@Override
public void changeTargetGasLimit(final Long newTargetGasLimit) {
if (AbstractGasLimitSpecification.isValidTargetGasLimit(newTargetGasLimit)) {
this.miningParameters.setTargetGasLimit(newTargetGasLimit);
} else {
throw new IllegalArgumentException("Specified target gas limit is invalid");
}
}
@Override
public PayloadIdentifier preparePayload(
final BlockHeader parentHeader,
final Long timestamp,
final Bytes32 prevRandao,
final Address feeRecipient,
final Optional<List<Withdrawal>> withdrawals,
final Optional<Bytes32> parentBeaconBlockRoot) {
// we assume that preparePayload is always called sequentially, since the RPC Engine calls
// are sequential, if this assumption changes then more synchronization should be added to
// shared data structures
final PayloadIdentifier payloadIdentifier =
PayloadIdentifier.forPayloadParams(
parentHeader.getBlockHash(),
timestamp,
prevRandao,
feeRecipient,
withdrawals,
parentBeaconBlockRoot);
if (blockCreationTasks.containsKey(payloadIdentifier)) {
LOG.debug(
"Block proposal for the same payload id {} already present, nothing to do",
payloadIdentifier);
return payloadIdentifier;
}
// it's a new payloadId so...
cancelAnyExistingBlockCreationTasks(payloadIdentifier);
final MergeBlockCreator mergeBlockCreator =
this.mergeBlockCreatorFactory.forParams(parentHeader, Optional.ofNullable(feeRecipient));
blockCreationTasks.put(payloadIdentifier, new BlockCreationTask(mergeBlockCreator));
// put the empty block in first
final Block emptyBlock =
mergeBlockCreator
.createBlock(
Optional.of(Collections.emptyList()),
prevRandao,
timestamp,
withdrawals,
parentBeaconBlockRoot)
.getBlock();
BlockProcessingResult result = validateProposedBlock(emptyBlock);
if (result.isSuccessful()) {
mergeContext.putPayloadById(
new PayloadWrapper(
payloadIdentifier, new BlockWithReceipts(emptyBlock, result.getReceipts())));
LOG.info(
"Start building proposals for block {} identified by {}",
emptyBlock.getHeader().getNumber(),
payloadIdentifier);
} else {
LOG.warn(
"failed to validate empty block proposal {}, reason {}",
emptyBlock.getHash(),
result.errorMessage);
if (result.causedBy().isPresent()) {
LOG.warn("caused by", result.causedBy().get());
}
}
tryToBuildBetterBlock(
timestamp,
prevRandao,
payloadIdentifier,
mergeBlockCreator,
withdrawals,
parentBeaconBlockRoot);
return payloadIdentifier;
}
private void cancelAnyExistingBlockCreationTasks(final PayloadIdentifier payloadIdentifier) {
if (blockCreationTasks.size() > 0) {
String existingPayloadIdsBeingBuilt =
blockCreationTasks.keySet().stream()
.map(PayloadIdentifier::toHexString)
.collect(joining(","));
LOG.warn(
"New payloadId {} received so cancelling block creation tasks for the following payloadIds: {}",
payloadIdentifier,
existingPayloadIdsBeingBuilt);
blockCreationTasks.keySet().forEach(this::cleanupBlockCreationTask);
}
}
private void cleanupBlockCreationTask(final PayloadIdentifier payloadIdentifier) {
blockCreationTasks.computeIfPresent(
payloadIdentifier,
(pid, blockCreationTask) -> {
blockCreationTask.cancel();
return null;
});
}
@Override
public void finalizeProposalById(final PayloadIdentifier payloadId) {
LOG.debug("Finalizing block proposal for payload id {}", payloadId);
cleanupBlockCreationTask(payloadId);
}
private void tryToBuildBetterBlock(
final Long timestamp,
final Bytes32 random,
final PayloadIdentifier payloadIdentifier,
final MergeBlockCreator mergeBlockCreator,
final Optional<List<Withdrawal>> withdrawals,
final Optional<Bytes32> parentBeaconBlockRoot) {
final Supplier<BlockCreationResult> blockCreator =
() ->
mergeBlockCreator.createBlock(
Optional.empty(), random, timestamp, withdrawals, parentBeaconBlockRoot);
LOG.debug(
"Block creation started for payload id {}, remaining time is {}ms",
payloadIdentifier,
miningParameters.getUnstable().getPosBlockCreationMaxTime());
ethScheduler
.scheduleBlockCreationTask(
() -> retryBlockCreationUntilUseful(payloadIdentifier, blockCreator))
.orTimeout(
miningParameters.getUnstable().getPosBlockCreationMaxTime(), TimeUnit.MILLISECONDS)
.whenComplete(
(unused, throwable) -> {
if (throwable != null) {
LOG.atDebug()
.setMessage("Exception building block for payload id {}, reason {}")
.addArgument(payloadIdentifier)
.addArgument(() -> logException(throwable))
.log();
}
cleanupBlockCreationTask(payloadIdentifier);
});
}
private Void retryBlockCreationUntilUseful(
final PayloadIdentifier payloadIdentifier, final Supplier<BlockCreationResult> blockCreator) {
long lastStartAt;
while (!isBlockCreationCancelled(payloadIdentifier)) {
try {
lastStartAt = System.currentTimeMillis();
recoverableBlockCreation(payloadIdentifier, blockCreator, lastStartAt);
final long lastDuration = System.currentTimeMillis() - lastStartAt;
final long waitBeforeRepetition =
Math.max(
100,
miningParameters.getUnstable().getPosBlockCreationRepetitionMinDuration()
- lastDuration);
LOG.debug("Waiting {}ms before repeating block creation", waitBeforeRepetition);
Thread.sleep(waitBeforeRepetition);
} catch (final CancellationException | InterruptedException ce) {
LOG.atDebug()
.setMessage("Block creation for payload id {} has been cancelled, reason {}")
.addArgument(payloadIdentifier)
.addArgument(() -> logException(ce))
.log();
return null;
} catch (final Throwable e) {
LOG.warn(
"Something went wrong creating block for payload id {}, error {}",
payloadIdentifier,
logException(e));
return null;
}
}
return null;
}
private void recoverableBlockCreation(
final PayloadIdentifier payloadIdentifier,
final Supplier<BlockCreationResult> blockCreator,
final long startedAt) {
try {
evaluateNewBlock(blockCreator.get().getBlock(), payloadIdentifier, startedAt);
} catch (final Throwable throwable) {
if (canRetryBlockCreation(throwable) && !isBlockCreationCancelled(payloadIdentifier)) {
LOG.atDebug()
.setMessage("Retrying block creation for payload id {} after recoverable error {}")
.addArgument(payloadIdentifier)
.addArgument(() -> logException(throwable))
.log();
recoverableBlockCreation(payloadIdentifier, blockCreator, startedAt);
} else {
throw throwable;
}
}
}
private void evaluateNewBlock(
final Block bestBlock, final PayloadIdentifier payloadIdentifier, final long startedAt) {
if (isBlockCreationCancelled(payloadIdentifier)) return;
final var resultBest = validateProposedBlock(bestBlock);
if (resultBest.isSuccessful()) {
if (isBlockCreationCancelled(payloadIdentifier)) return;
mergeContext.putPayloadById(
new PayloadWrapper(
payloadIdentifier, new BlockWithReceipts(bestBlock, resultBest.getReceipts())));
LOG.atDebug()
.setMessage(
"Successfully built block {} for proposal identified by {}, with {} transactions, in {}ms")
.addArgument(bestBlock::toLogString)
.addArgument(payloadIdentifier)
.addArgument(bestBlock.getBody().getTransactions()::size)
.addArgument(() -> System.currentTimeMillis() - startedAt)
.log();
} else {
LOG.warn(
"Block {} built for proposal identified by {}, is not valid reason {}",
bestBlock.getHash(),
payloadIdentifier.toString(),
resultBest.errorMessage);
if (resultBest.causedBy().isPresent()) {
LOG.warn("caused by", resultBest.cause.get());
}
}
}
private boolean canRetryBlockCreation(final Throwable throwable) {
if (throwable instanceof StorageException) {
return true;
} else if (throwable instanceof MerkleTrieException) {
return true;
}
return false;
}
@Override
public Optional<BlockHeader> getOrSyncHeadByHash(final Hash headHash, final Hash finalizedHash) {
final var chain = protocolContext.getBlockchain();
final var maybeHeadHeader = chain.getBlockHeader(headHash);
if (maybeHeadHeader.isPresent()) {
LOG.atDebug()
.setMessage("BlockHeader {} is already present in blockchain")
.addArgument(maybeHeadHeader.get()::toLogString)
.log();
} else {
backwardSyncContext.maybeUpdateTargetHeight(headHash);
backwardSyncContext
.syncBackwardsUntil(headHash)
.thenRun(() -> updateFinalized(finalizedHash));
}
return maybeHeadHeader;
}
private void updateFinalized(final Hash finalizedHash) {
if (mergeContext
.getFinalized()
.map(BlockHeader::getHash)
.map(finalizedHash::equals)
.orElse(Boolean.FALSE)) {
LOG.atDebug()
.setMessage("Finalized block already set to {}, nothing to do")
.addArgument(finalizedHash)
.log();
return;
}
protocolContext
.getBlockchain()
.getBlockHeader(finalizedHash)
.ifPresentOrElse(
finalizedHeader -> {
LOG.atDebug()
.setMessage("Setting finalized block header to {}")
.addArgument(finalizedHeader::toLogString)
.log();
mergeContext.setFinalized(finalizedHeader);
},
() ->
LOG.warn(
"Internal error, backward sync completed but failed to import finalized block {}",
finalizedHash));
}
@Override
public BlockProcessingResult validateBlock(final Block block) {
final var validationResult =
protocolSchedule
.getByBlockHeader(block.getHeader())
.getBlockValidator()
.validateAndProcessBlock(
protocolContext,
block,
HeaderValidationMode.FULL,
HeaderValidationMode.NONE,
false);
return validationResult;
}
private BlockProcessingResult validateProposedBlock(final Block block) {
final var validationResult =
protocolSchedule
.getByBlockHeader(block.getHeader())
.getBlockValidator()
.validateAndProcessBlock(
protocolContext,
block,
HeaderValidationMode.FULL,
HeaderValidationMode.NONE,
false,
false);
return validationResult;
}
@Override
public BlockProcessingResult rememberBlock(final Block block) {
LOG.atDebug().setMessage("Remember block {}").addArgument(block::toLogString).log();
final var chain = protocolContext.getBlockchain();
final var validationResult = validateBlock(block);
validationResult
.getYield()
.ifPresentOrElse(
result -> chain.storeBlock(block, result.getReceipts()),
() -> LOG.debug("empty yield in blockProcessingResult"));
return validationResult;
}
@Override
public ForkchoiceResult updateForkChoice(
final BlockHeader newHead, final Hash finalizedBlockHash, final Hash safeBlockHash) {
MutableBlockchain blockchain = protocolContext.getBlockchain();
final Optional<BlockHeader> newFinalized = blockchain.getBlockHeader(finalizedBlockHash);
if (newHead.getNumber() < blockchain.getChainHeadBlockNumber()
&& isDescendantOf(newHead, blockchain.getChainHeadHeader())) {
LOG.atDebug()
.setMessage("Ignoring update to old head {}")
.addArgument(newHead::toLogString)
.log();
return ForkchoiceResult.withIgnoreUpdateToOldHead(newHead);
}
final Optional<Hash> latestValid = getLatestValidAncestor(newHead);
Optional<BlockHeader> parentOfNewHead = blockchain.getBlockHeader(newHead.getParentHash());
if (parentOfNewHead.isPresent()
&& Long.compareUnsigned(newHead.getTimestamp(), parentOfNewHead.get().getTimestamp())
<= 0) {
return ForkchoiceResult.withFailure(
INVALID, "new head timestamp not greater than parent", latestValid);
}
setNewHead(blockchain, newHead);
// set and persist the new finalized block if it is present
newFinalized.ifPresent(
blockHeader -> {
blockchain.setFinalized(blockHeader.getHash());
mergeContext.setFinalized(blockHeader);
});
blockchain
.getBlockHeader(safeBlockHash)
.ifPresent(
newSafeBlock -> {
blockchain.setSafeBlock(safeBlockHash);
mergeContext.setSafeBlock(newSafeBlock);
});
return ForkchoiceResult.withResult(newFinalized, Optional.of(newHead));
}
private boolean setNewHead(final MutableBlockchain blockchain, final BlockHeader newHead) {
if (newHead.getHash().equals(blockchain.getChainHeadHash())) {
LOG.atDebug()
.setMessage("Nothing to do new head {} is already chain head")
.addArgument(newHead::toLogString)
.log();
return true;
}
if (moveWorldStateTo(newHead)) {
if (newHead.getParentHash().equals(blockchain.getChainHeadHash())) {
LOG.atDebug()
.setMessage(
"Forwarding chain head to the block {} saved from a previous newPayload invocation")
.addArgument(newHead::toLogString)
.log();
return blockchain.forwardToBlock(newHead);
} else {
LOG.atDebug()
.setMessage("New head {} is a chain reorg, rewind chain head to it")
.addArgument(newHead::toLogString)
.log();
return blockchain.rewindToBlock(newHead.getHash());
}
}
LOG.atDebug()
.setMessage("Failed to move the worldstate forward to hash {}, not moving chain head")
.addArgument(newHead::toLogString)
.log();
return false;
}
private boolean moveWorldStateTo(final BlockHeader newHead) {
Optional<MutableWorldState> newWorldState =
protocolContext
.getWorldStateArchive()
.getMutable(newHead.getStateRoot(), newHead.getHash());
newWorldState.ifPresentOrElse(
mutableWorldState ->
LOG.atDebug()
.setMessage(
"World state for state root hash {} and block hash {} persisted successfully")
.addArgument(mutableWorldState::rootHash)
.addArgument(newHead::getHash)
.log(),
() ->
LOG.error(
"Could not persist world for root hash {} and block hash {}",
newHead.getStateRoot(),
newHead.getHash()));
return newWorldState.isPresent();
}
@Override
public Optional<Hash> getLatestValidAncestor(final Hash blockHash) {
final var chain = protocolContext.getBlockchain();
return findValidAncestor(chain, blockHash);
}
@Override
public Optional<Hash> getLatestValidAncestor(final BlockHeader blockHeader) {
final var chain = protocolContext.getBlockchain();
final var self = chain.getBlockHeader(blockHeader.getHash());
if (self.isEmpty()) {
return findValidAncestor(chain, blockHeader.getParentHash());
}
return self.map(BlockHeader::getHash);
}
@Override
public boolean isBackwardSyncing() {
return backwardSyncContext.isSyncing();
}
@Override
public CompletableFuture<Void> appendNewPayloadToSync(final Block newPayload) {
return backwardSyncContext.syncBackwardsUntil(newPayload);
}
@Override
public boolean isMiningBeforeMerge() {
return miningParameters.isMiningEnabled();
}
private Optional<Hash> findValidAncestor(final Blockchain chain, final Hash parentHash) {
// check chain first
return chain
.getBlockHeader(parentHash)
.map(
header -> {
// if block is PoW, return ZERO hash
if (header.getDifficulty().greaterThan(Difficulty.ZERO)) {
return Hash.ZERO;
} else {
return header.getHash();
}
})
.map(Optional::of)
.orElseGet(
() ->
protocolContext
.getBadBlockManager()
.getBadBlock(parentHash)
.map(
badParent ->
findValidAncestor(chain, badParent.getHeader().getParentHash()))
.orElse(Optional.empty()));
}
@Override
public boolean isDescendantOf(final BlockHeader ancestorBlock, final BlockHeader newBlock) {
LOG.atDebug()
.setMessage("checking if block {} is ancestor of {}")
.addArgument(ancestorBlock::toLogString)
.addArgument(newBlock::toLogString)
.log();
// start with self, because descending from yourself is valid
Optional<BlockHeader> parentOf = Optional.of(newBlock);
while (parentOf.isPresent()
&& !parentOf.get().getBlockHash().equals(ancestorBlock.getBlockHash())
&& parentOf.get().getNumber()
>= ancestorBlock.getNumber()) { // if on a fork, don't go further back than ancestor
parentOf = protocolContext.getBlockchain().getBlockHeader(parentOf.get().getParentHash());
}
if (parentOf.isPresent()
&& ancestorBlock.getBlockHash().equals(parentOf.get().getBlockHash())) {
return true;
} else {
LOG.atDebug()
.setMessage("looped all the way back, did not find ancestor {} of child {}")
.addArgument(ancestorBlock::toLogString)
.addArgument(newBlock::toLogString)
.log();
return false;
}
}
@Override
public void onBadChain(
final Block badBlock,
final List<Block> badBlockDescendants,
final List<BlockHeader> badBlockHeaderDescendants) {
LOG.trace("Mark descendents of bad block {} as bad", badBlock.getHash());
final BadBlockManager badBlockManager = protocolContext.getBadBlockManager();
final Optional<BlockHeader> parentHeader =
protocolContext.getBlockchain().getBlockHeader(badBlock.getHeader().getParentHash());
final Optional<Hash> maybeLatestValidHash =
parentHeader.isPresent() && isPoSHeader(parentHeader.get())
? Optional.of(parentHeader.get().getHash())
: Optional.empty();
// Bad block has already been marked, but we need to mark the bad block's descendants
badBlockDescendants.forEach(
block -> {
LOG.trace("Add descendant block {} to bad blocks", block.getHash());
badBlockManager.addBadBlock(block, BadBlockCause.fromBadAncestorBlock(badBlock));
maybeLatestValidHash.ifPresent(
latestValidHash ->
badBlockManager.addLatestValidHash(block.getHash(), latestValidHash));
});
badBlockHeaderDescendants.forEach(
header -> {
LOG.trace("Add descendant header {} to bad blocks", header.getHash());
badBlockManager.addBadHeader(header, BadBlockCause.fromBadAncestorBlock(badBlock));
maybeLatestValidHash.ifPresent(
latestValidHash ->
badBlockManager.addLatestValidHash(header.getHash(), latestValidHash));
});
}
/**
* returns the instance of ethScheduler
*
* @return get the Eth scheduler
*/
@Override
public EthScheduler getEthScheduler() {
return ethScheduler;
}
/** The interface Merge block creator factory. */
@FunctionalInterface
protected interface MergeBlockCreatorFactory {
/**
* Create merge block creator for block header and fee recipient.
*
* @param header the header
* @param feeRecipient the fee recipient
* @return the merge block creator
*/
MergeBlockCreator forParams(BlockHeader header, Optional<Address> feeRecipient);
}
@Override
public boolean isBadBlock(final Hash blockHash) {
final BadBlockManager badBlockManager = protocolContext.getBadBlockManager();
return badBlockManager.isBadBlock(blockHash);
}
@Override
public Optional<Hash> getLatestValidHashOfBadBlock(Hash blockHash) {
return protocolContext.getBadBlockManager().getLatestValidHash(blockHash);
}
private boolean isPoSHeader(final BlockHeader header) {
return header.getDifficulty().equals(Difficulty.ZERO);
}
private String logException(final Throwable throwable) {
final StringWriter sw = new StringWriter();
final PrintWriter pw = new PrintWriter(sw);
throwable.printStackTrace(pw);
pw.flush();
return sw.toString();
}
@VisibleForTesting
boolean isBlockCreationCancelled(final PayloadIdentifier payloadId) {
final BlockCreationTask job = blockCreationTasks.get(payloadId);
if (job == null) {
return true;
}
return job.cancelled.get();
}
private static class BlockCreationTask {
/** The Block creator. */
final MergeBlockCreator blockCreator;
/** The Cancelled. */
final AtomicBoolean cancelled;
/**
* Instantiates a new Block creation task.
*
* @param blockCreator the block creator
*/
public BlockCreationTask(final MergeBlockCreator blockCreator) {
this.blockCreator = blockCreator;
this.cancelled = new AtomicBoolean(false);
}
/** Cancel. */
public void cancel() {
cancelled.set(true);
blockCreator.cancel();
}
}
}