PostMergeContext.java
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.consensus.merge;
import org.hyperledger.besu.consensus.merge.blockcreation.PayloadIdentifier;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.ConsensusContext;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockValueCalculator;
import org.hyperledger.besu.ethereum.core.BlockWithReceipts;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.util.Subscribers;
import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.EvictingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** The Post merge context. */
public class PostMergeContext implements MergeContext {
private static final Logger LOG = LoggerFactory.getLogger(PostMergeContext.class);
/** The Max blocks in progress. */
static final int MAX_BLOCKS_IN_PROGRESS = 12;
private static final AtomicReference<PostMergeContext> singleton = new AtomicReference<>();
private static final Comparator<BlockWithReceipts> compareByGasUsedDesc =
Comparator.comparingLong(
(BlockWithReceipts blockWithReceipts) ->
blockWithReceipts.getBlock().getHeader().getGasUsed())
.reversed();
private final AtomicReference<SyncState> syncState;
private final AtomicReference<Difficulty> terminalTotalDifficulty;
// initial postMerge state is indeterminate until it is set:
private final AtomicReference<Optional<Boolean>> isPostMerge =
new AtomicReference<>(Optional.empty());
private final Subscribers<MergeStateHandler> newMergeStateCallbackSubscribers =
Subscribers.create();
private final Subscribers<UnverifiedForkchoiceListener>
newUnverifiedForkchoiceCallbackSubscribers = Subscribers.create();
private final EvictingQueue<PayloadWrapper> blocksInProgress =
EvictingQueue.create(MAX_BLOCKS_IN_PROGRESS);
// latest finalized block
private final AtomicReference<BlockHeader> lastFinalized = new AtomicReference<>();
private final AtomicReference<BlockHeader> lastSafeBlock = new AtomicReference<>();
private final AtomicReference<Optional<BlockHeader>> terminalPoWBlock =
new AtomicReference<>(Optional.empty());
private final BlockValueCalculator blockValueCalculator = new BlockValueCalculator();
private boolean isPostMergeAtGenesis;
/** Instantiates a new Post merge context. */
@VisibleForTesting
PostMergeContext() {
this(Difficulty.ZERO);
}
/**
* Instantiates a new Post merge context.
*
* @param difficulty the difficulty
*/
@VisibleForTesting
PostMergeContext(final Difficulty difficulty) {
this.terminalTotalDifficulty = new AtomicReference<>(difficulty);
this.syncState = new AtomicReference<>();
}
/**
* Get post merge context.
*
* @return the post merge context
*/
public static PostMergeContext get() {
if (singleton.get() == null) {
singleton.compareAndSet(null, new PostMergeContext());
}
return singleton.get();
}
@Override
public <C extends ConsensusContext> C as(final Class<C> klass) {
return klass.cast(this);
}
@Override
public PostMergeContext setTerminalTotalDifficulty(final Difficulty newTerminalTotalDifficulty) {
if (newTerminalTotalDifficulty == null) {
throw new IllegalStateException("cannot set null terminal total difficulty");
}
terminalTotalDifficulty.set(newTerminalTotalDifficulty);
return this;
}
@Override
public void setIsPostMerge(final Difficulty totalDifficulty) {
if (isPostMerge.get().orElse(Boolean.FALSE)) {
// if we have finalized, we never switch back to a pre-merge once we have transitioned
// post-TTD.
return;
}
final boolean newState = terminalTotalDifficulty.get().lessOrEqualThan(totalDifficulty);
final Optional<Boolean> oldState = isPostMerge.getAndSet(Optional.of(newState));
// if we are past TTD, set it:
if (newState)
Optional.ofNullable(syncState.get())
.ifPresent(ss -> ss.setReachedTerminalDifficulty(newState));
if (oldState.isEmpty() || oldState.get() != newState) {
newMergeStateCallbackSubscribers.forEach(
newMergeStateCallback ->
newMergeStateCallback.mergeStateChanged(
newState, oldState, Optional.of(totalDifficulty)));
}
}
@Override
public boolean isPostMerge() {
return isPostMerge.get().orElse(Boolean.FALSE);
}
@Override
public PostMergeContext setSyncState(final SyncState syncState) {
this.syncState.set(syncState);
return this;
}
@Override
public boolean isSyncing() {
return Optional.ofNullable(syncState.get()).map(s -> !s.isInSync()).orElse(Boolean.TRUE)
// this is necessary for when we do not have a sync target yet, like at startup.
// not being stopped at ttd implies we are syncing.
&& Optional.ofNullable(syncState.get())
.map(s -> !(s.hasReachedTerminalDifficulty().orElse(Boolean.FALSE)))
.orElse(Boolean.TRUE);
}
@Override
public void observeNewIsPostMergeState(final MergeStateHandler mergeStateHandler) {
newMergeStateCallbackSubscribers.subscribe(mergeStateHandler);
}
@Override
public long addNewUnverifiedForkchoiceListener(
final UnverifiedForkchoiceListener unverifiedForkchoiceListener) {
return newUnverifiedForkchoiceCallbackSubscribers.subscribe(unverifiedForkchoiceListener);
}
@Override
public void removeNewUnverifiedForkchoiceListener(final long subscriberId) {
newUnverifiedForkchoiceCallbackSubscribers.unsubscribe(subscriberId);
}
@Override
public void fireNewUnverifiedForkchoiceEvent(
final Hash headBlockHash, final Hash safeBlockHash, final Hash finalizedBlockHash) {
final ForkchoiceEvent event =
new ForkchoiceEvent(headBlockHash, safeBlockHash, finalizedBlockHash);
newUnverifiedForkchoiceCallbackSubscribers.forEach(cb -> cb.onNewUnverifiedForkchoice(event));
}
@Override
public Difficulty getTerminalTotalDifficulty() {
return terminalTotalDifficulty.get();
}
@Override
public void setFinalized(final BlockHeader blockHeader) {
lastFinalized.set(blockHeader);
}
@Override
public Optional<BlockHeader> getFinalized() {
return Optional.ofNullable(lastFinalized.get());
}
@Override
public void setSafeBlock(final BlockHeader blockHeader) {
lastSafeBlock.set(blockHeader);
}
@Override
public Optional<BlockHeader> getSafeBlock() {
return Optional.ofNullable(lastSafeBlock.get());
}
@Override
public Optional<BlockHeader> getTerminalPoWBlock() {
return terminalPoWBlock.get();
}
@Override
public void setTerminalPoWBlock(final Optional<BlockHeader> hashAndNumber) {
terminalPoWBlock.set(hashAndNumber);
}
@Override
public boolean validateCandidateHead(final BlockHeader candidateHeader) {
return Optional.ofNullable(lastFinalized.get())
.map(finalized -> candidateHeader.getNumber() >= finalized.getNumber())
.orElse(Boolean.TRUE);
}
@Override
public void putPayloadById(final PayloadWrapper payloadWrapper) {
synchronized (blocksInProgress) {
final Optional<BlockWithReceipts> maybeCurrBestBlock =
retrieveBlockById(payloadWrapper.payloadIdentifier());
maybeCurrBestBlock.ifPresentOrElse(
currBestBlock -> {
if (compareByGasUsedDesc.compare(payloadWrapper.blockWithReceipts(), currBestBlock)
< 0) {
LOG.atDebug()
.setMessage("New proposal for payloadId {} {} is better than the previous one {}")
.addArgument(payloadWrapper.payloadIdentifier())
.addArgument(
() -> logBlockProposal(payloadWrapper.blockWithReceipts().getBlock()))
.addArgument(() -> logBlockProposal(currBestBlock.getBlock()))
.log();
blocksInProgress.removeAll(
retrievePayloadsById(payloadWrapper.payloadIdentifier())
.collect(Collectors.toUnmodifiableList()));
blocksInProgress.add(
new PayloadWrapper(
payloadWrapper.payloadIdentifier(), payloadWrapper.blockWithReceipts()));
logCurrentBestBlock(payloadWrapper.blockWithReceipts());
}
},
() ->
blocksInProgress.add(
new PayloadWrapper(
payloadWrapper.payloadIdentifier(), payloadWrapper.blockWithReceipts())));
}
}
private void logCurrentBestBlock(final BlockWithReceipts blockWithReceipts) {
if (LOG.isDebugEnabled()) {
final Block block = blockWithReceipts.getBlock();
final float gasUsedPerc =
100.0f * block.getHeader().getGasUsed() / block.getHeader().getGasLimit();
final int txsNum = block.getBody().getTransactions().size();
final Wei reward = blockValueCalculator.calculateBlockValue(blockWithReceipts);
LOG.debug(
"Current best proposal for block {}: txs {}, gas used {}%, reward {}",
blockWithReceipts.getNumber(),
txsNum,
String.format("%1.2f", gasUsedPerc),
reward.toHumanReadableString());
}
}
@Override
public Optional<BlockWithReceipts> retrieveBlockById(final PayloadIdentifier payloadId) {
synchronized (blocksInProgress) {
return retrievePayloadsById(payloadId)
.map(payloadWrapper -> payloadWrapper.blockWithReceipts())
.sorted(compareByGasUsedDesc)
.findFirst();
}
}
private Stream<PayloadWrapper> retrievePayloadsById(final PayloadIdentifier payloadId) {
return blocksInProgress.stream().filter(z -> z.payloadIdentifier().equals(payloadId));
}
private String logBlockProposal(final Block block) {
return "block "
+ block.toLogString()
+ " gas used "
+ block.getHeader().getGasUsed()
+ " transactions "
+ block.getBody().getTransactions().size();
}
@Override
public boolean isPostMergeAtGenesis() {
return this.isPostMergeAtGenesis;
}
/**
* Sets whether it is post merge at genesis
*
* @param isPostMergeAtGenesis the is post merge at genesis state
* @return the post merge context
*/
public PostMergeContext setPostMergeAtGenesis(final boolean isPostMergeAtGenesis) {
this.isPostMergeAtGenesis = isPostMergeAtGenesis;
return this;
}
}