QbftRound.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.consensus.qbft.statemachine;
import static java.util.Collections.emptyList;
import org.hyperledger.besu.consensus.common.bft.BftBlockHashing;
import org.hyperledger.besu.consensus.common.bft.BftBlockHeaderFunctions;
import org.hyperledger.besu.consensus.common.bft.BftBlockInterface;
import org.hyperledger.besu.consensus.common.bft.BftContext;
import org.hyperledger.besu.consensus.common.bft.BftExtraData;
import org.hyperledger.besu.consensus.common.bft.BftExtraDataCodec;
import org.hyperledger.besu.consensus.common.bft.BftHelpers;
import org.hyperledger.besu.consensus.common.bft.ConsensusRoundIdentifier;
import org.hyperledger.besu.consensus.common.bft.RoundTimer;
import org.hyperledger.besu.consensus.common.bft.payload.SignedData;
import org.hyperledger.besu.consensus.qbft.messagewrappers.Commit;
import org.hyperledger.besu.consensus.qbft.messagewrappers.Prepare;
import org.hyperledger.besu.consensus.qbft.messagewrappers.Proposal;
import org.hyperledger.besu.consensus.qbft.network.QbftMessageTransmitter;
import org.hyperledger.besu.consensus.qbft.payload.MessageFactory;
import org.hyperledger.besu.consensus.qbft.payload.PreparePayload;
import org.hyperledger.besu.consensus.qbft.payload.RoundChangePayload;
import org.hyperledger.besu.crypto.SECPSignature;
import org.hyperledger.besu.cryptoservices.NodeKey;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.blockcreation.BlockCreator;
import org.hyperledger.besu.ethereum.chain.MinedBlockObserver;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockImporter;
import org.hyperledger.besu.ethereum.mainnet.BlockImportResult;
import org.hyperledger.besu.ethereum.mainnet.HeaderValidationMode;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.securitymodule.SecurityModuleException;
import org.hyperledger.besu.util.Subscribers;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** The Qbft round. */
public class QbftRound {
private static final Logger LOG = LoggerFactory.getLogger(QbftRound.class);
private final Subscribers<MinedBlockObserver> observers;
/** The Round state. */
protected final RoundState roundState;
/** The Block creator. */
protected final BlockCreator blockCreator;
/** The Protocol context. */
protected final ProtocolContext protocolContext;
/** The Protocol schedule. */
protected final ProtocolSchedule protocolSchedule;
private final NodeKey nodeKey;
private final MessageFactory messageFactory; // used only to create stored local msgs
private final QbftMessageTransmitter transmitter;
/** The Bft extra data codec. */
protected final BftExtraDataCodec bftExtraDataCodec;
/**
* Instantiates a new Qbft round.
*
* @param roundState the round state
* @param blockCreator the block creator
* @param protocolContext the protocol context
* @param protocolSchedule the protocol schedule
* @param observers the observers
* @param nodeKey the node key
* @param messageFactory the message factory
* @param transmitter the transmitter
* @param roundTimer the round timer
* @param bftExtraDataCodec the bft extra data codec
*/
public QbftRound(
final RoundState roundState,
final BlockCreator blockCreator,
final ProtocolContext protocolContext,
final ProtocolSchedule protocolSchedule,
final Subscribers<MinedBlockObserver> observers,
final NodeKey nodeKey,
final MessageFactory messageFactory,
final QbftMessageTransmitter transmitter,
final RoundTimer roundTimer,
final BftExtraDataCodec bftExtraDataCodec) {
this.roundState = roundState;
this.blockCreator = blockCreator;
this.protocolContext = protocolContext;
this.protocolSchedule = protocolSchedule;
this.observers = observers;
this.nodeKey = nodeKey;
this.messageFactory = messageFactory;
this.transmitter = transmitter;
this.bftExtraDataCodec = bftExtraDataCodec;
roundTimer.startTimer(getRoundIdentifier());
}
/**
* Gets round identifier.
*
* @return the round identifier
*/
public ConsensusRoundIdentifier getRoundIdentifier() {
return roundState.getRoundIdentifier();
}
/**
* Create and send proposal message.
*
* @param headerTimeStampSeconds the header time stamp seconds
*/
public void createAndSendProposalMessage(final long headerTimeStampSeconds) {
LOG.debug("Creating proposed block. round={}", roundState.getRoundIdentifier());
final Block block = blockCreator.createBlock(headerTimeStampSeconds).getBlock();
LOG.trace("Creating proposed block blockHeader={}", block.getHeader());
updateStateWithProposalAndTransmit(block, emptyList(), emptyList());
}
/**
* Start round with.
*
* @param roundChangeArtifacts the round change artifacts
* @param headerTimestamp the header timestamp
*/
public void startRoundWith(
final RoundChangeArtifacts roundChangeArtifacts, final long headerTimestamp) {
final Optional<PreparedCertificate> bestPreparedCertificate =
roundChangeArtifacts.getBestPreparedPeer();
final Block blockToPublish;
if (bestPreparedCertificate.isEmpty()) {
LOG.debug("Sending proposal with new block. round={}", roundState.getRoundIdentifier());
blockToPublish = blockCreator.createBlock(headerTimestamp).getBlock();
} else {
LOG.debug(
"Sending proposal from PreparedCertificate. round={}", roundState.getRoundIdentifier());
blockToPublish = bestPreparedCertificate.get().getBlock();
}
updateStateWithProposalAndTransmit(
blockToPublish,
roundChangeArtifacts.getRoundChanges(),
bestPreparedCertificate.map(PreparedCertificate::getPrepares).orElse(emptyList()));
}
/**
* Update state with proposal and transmit.
*
* @param block the block
* @param roundChanges the round changes
* @param prepares the prepares
*/
protected void updateStateWithProposalAndTransmit(
final Block block,
final List<SignedData<RoundChangePayload>> roundChanges,
final List<SignedData<PreparePayload>> prepares) {
final Proposal proposal;
try {
proposal = messageFactory.createProposal(getRoundIdentifier(), block, roundChanges, prepares);
} catch (final SecurityModuleException e) {
LOG.warn("Failed to create a signed Proposal, waiting for next round.", e);
return;
}
transmitter.multicastProposal(
proposal.getRoundIdentifier(),
proposal.getSignedPayload().getPayload().getProposedBlock(),
roundChanges,
prepares);
updateStateWithProposedBlock(proposal);
sendPrepare(block);
}
/**
* Handle proposal message.
*
* @param msg the msg
*/
public void handleProposalMessage(final Proposal msg) {
LOG.debug(
"Received a proposal message. round={}. author={}",
roundState.getRoundIdentifier(),
msg.getAuthor());
final Block block = msg.getSignedPayload().getPayload().getProposedBlock();
if (updateStateWithProposedBlock(msg)) {
sendPrepare(block);
}
}
private void sendPrepare(final Block block) {
LOG.debug("Sending prepare message. round={}", roundState.getRoundIdentifier());
try {
final Prepare localPrepareMessage =
messageFactory.createPrepare(getRoundIdentifier(), block.getHash());
peerIsPrepared(localPrepareMessage);
transmitter.multicastPrepare(
localPrepareMessage.getRoundIdentifier(), localPrepareMessage.getDigest());
} catch (final SecurityModuleException e) {
LOG.warn("Failed to create a signed Prepare; {}", e.getMessage());
}
}
/**
* Handle prepare message.
*
* @param msg the msg
*/
public void handlePrepareMessage(final Prepare msg) {
LOG.debug(
"Received a prepare message. round={}. author={}",
roundState.getRoundIdentifier(),
msg.getAuthor());
peerIsPrepared(msg);
}
/**
* Handle commit message.
*
* @param msg the msg
*/
public void handleCommitMessage(final Commit msg) {
LOG.debug(
"Received a commit message. round={}. author={}",
roundState.getRoundIdentifier(),
msg.getAuthor());
peerIsCommitted(msg);
}
/**
* Construct prepared certificate.
*
* @return the optional PreparedCertificate
*/
public Optional<PreparedCertificate> constructPreparedCertificate() {
return roundState.constructPreparedCertificate();
}
private boolean updateStateWithProposedBlock(final Proposal msg) {
final boolean wasPrepared = roundState.isPrepared();
final boolean wasCommitted = roundState.isCommitted();
final boolean blockAccepted = roundState.setProposedBlock(msg);
if (blockAccepted) {
final Block block = roundState.getProposedBlock().get();
final SECPSignature commitSeal;
try {
commitSeal = createCommitSeal(block);
} catch (final SecurityModuleException e) {
LOG.warn("Failed to construct commit seal; {}", e.getMessage());
return true;
}
// There are times handling a proposed block is enough to enter prepared.
if (wasPrepared != roundState.isPrepared()) {
LOG.debug("Sending commit message. round={}", roundState.getRoundIdentifier());
transmitter.multicastCommit(getRoundIdentifier(), block.getHash(), commitSeal);
}
// can automatically add _our_ commit message to the roundState
// cannot create a prepare message here, as it may be _our_ proposal, and thus we cannot also
// prepare
try {
final Commit localCommitMessage =
messageFactory.createCommit(
roundState.getRoundIdentifier(), msg.getBlock().getHash(), commitSeal);
roundState.addCommitMessage(localCommitMessage);
} catch (final SecurityModuleException e) {
LOG.warn("Failed to create signed Commit message; {}", e.getMessage());
return true;
}
// It is possible sufficient commit seals are now available and the block should be imported
if (wasCommitted != roundState.isCommitted()) {
importBlockToChain();
}
}
return blockAccepted;
}
private void peerIsPrepared(final Prepare msg) {
final boolean wasPrepared = roundState.isPrepared();
roundState.addPrepareMessage(msg);
if (wasPrepared != roundState.isPrepared()) {
LOG.debug("Sending commit message. round={}", roundState.getRoundIdentifier());
final Block block = roundState.getProposedBlock().get();
try {
transmitter.multicastCommit(getRoundIdentifier(), block.getHash(), createCommitSeal(block));
// Note: the local-node's commit message was added to RoundState on block acceptance
// and thus does not need to be done again here.
} catch (final SecurityModuleException e) {
LOG.warn("Failed to construct a commit seal: {}", e.getMessage());
}
}
}
private void peerIsCommitted(final Commit msg) {
final boolean wasCommitted = roundState.isCommitted();
roundState.addCommitMessage(msg);
if (wasCommitted != roundState.isCommitted()) {
importBlockToChain();
}
}
private void importBlockToChain() {
final Block blockToImport =
BftHelpers.createSealedBlock(
bftExtraDataCodec,
roundState.getProposedBlock().get(),
roundState.getRoundIdentifier().getRoundNumber(),
roundState.getCommitSeals());
final long blockNumber = blockToImport.getHeader().getNumber();
final BftExtraData extraData = bftExtraDataCodec.decode(blockToImport.getHeader());
if (getRoundIdentifier().getRoundNumber() > 0) {
LOG.info(
"Importing proposed block to chain. round={}, hash={}",
getRoundIdentifier(),
blockToImport.getHash());
} else {
LOG.debug(
"Importing proposed block to chain. round={}, hash={}",
getRoundIdentifier(),
blockToImport.getHash());
}
LOG.trace("Importing proposed block with extraData={}", extraData);
final BlockImporter blockImporter =
protocolSchedule.getByBlockHeader(blockToImport.getHeader()).getBlockImporter();
final BlockImportResult result =
blockImporter.importBlock(protocolContext, blockToImport, HeaderValidationMode.FULL);
if (!result.isImported()) {
LOG.error(
"Failed to import proposed block to chain. block={} extraData={} blockHeader={}",
blockNumber,
extraData,
blockToImport.getHeader());
} else {
notifyNewBlockListeners(blockToImport);
}
}
private SECPSignature createCommitSeal(final Block block) {
final Block commitBlock = createCommitBlock(block);
final BlockHeader proposedHeader = commitBlock.getHeader();
final BftExtraData extraData = bftExtraDataCodec.decode(proposedHeader);
final Hash commitHash =
new BftBlockHashing(bftExtraDataCodec)
.calculateDataHashForCommittedSeal(proposedHeader, extraData);
return nodeKey.sign(commitHash);
}
private Block createCommitBlock(final Block block) {
final BftBlockInterface bftBlockInterface =
protocolContext.getConsensusContext(BftContext.class).getBlockInterface();
return bftBlockInterface.replaceRoundInBlock(
block,
getRoundIdentifier().getRoundNumber(),
BftBlockHeaderFunctions.forCommittedSeal(bftExtraDataCodec));
}
private void notifyNewBlockListeners(final Block block) {
observers.forEach(obs -> obs.blockMined(block));
}
}