DefaultBlockchain.java
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.chain;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.BlockchainStorage.Updater;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockWithReceipts;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.LogWithMetadata;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.prometheus.PrometheusMetricsSystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.InvalidConfigurationException;
import org.hyperledger.besu.util.Subscribers;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Streams;
import io.prometheus.client.guava.cache.CacheMetricsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefaultBlockchain implements MutableBlockchain {
private static final Logger LOG = LoggerFactory.getLogger(DefaultBlockchain.class);
private final Comparator<BlockHeader> heaviestChainBlockChoiceRule =
Comparator.comparing(this::calculateTotalDifficulty);
protected final BlockchainStorage blockchainStorage;
private final Subscribers<BlockAddedObserver> blockAddedObservers = Subscribers.create();
private final Subscribers<ChainReorgObserver> blockReorgObservers = Subscribers.create();
private final long reorgLoggingThreshold;
private volatile BlockHeader chainHeader;
private volatile Difficulty totalDifficulty;
private volatile int chainHeadTransactionCount;
private volatile int chainHeadOmmerCount;
private Comparator<BlockHeader> blockChoiceRule;
private final int numberOfBlocksToCache;
private final Optional<Cache<Hash, BlockHeader>> blockHeadersCache;
private final Optional<Cache<Hash, BlockBody>> blockBodiesCache;
private final Optional<Cache<Hash, List<TransactionReceipt>>> transactionReceiptsCache;
private final Optional<Cache<Hash, Difficulty>> totalDifficultyCache;
private DefaultBlockchain(
final Optional<Block> genesisBlock,
final BlockchainStorage blockchainStorage,
final MetricsSystem metricsSystem,
final long reorgLoggingThreshold) {
this(genesisBlock, blockchainStorage, metricsSystem, reorgLoggingThreshold, null, 0);
}
private DefaultBlockchain(
final Optional<Block> genesisBlock,
final BlockchainStorage blockchainStorage,
final MetricsSystem metricsSystem,
final long reorgLoggingThreshold,
final String dataDirectory,
final int numberOfBlocksToCache) {
checkNotNull(genesisBlock);
checkNotNull(blockchainStorage);
checkNotNull(metricsSystem);
this.blockchainStorage = blockchainStorage;
genesisBlock.ifPresent(block -> this.setGenesis(block, dataDirectory));
final Hash chainHead = blockchainStorage.getChainHead().get();
chainHeader = blockchainStorage.getBlockHeader(chainHead).get();
totalDifficulty = blockchainStorage.getTotalDifficulty(chainHead).get();
final BlockBody chainHeadBody = blockchainStorage.getBlockBody(chainHead).get();
chainHeadTransactionCount = chainHeadBody.getTransactions().size();
chainHeadOmmerCount = chainHeadBody.getOmmers().size();
metricsSystem.createLongGauge(
BesuMetricCategory.ETHEREUM,
"blockchain_height",
"The current height of the canonical chain",
this::getChainHeadBlockNumber);
metricsSystem.createGauge(
BesuMetricCategory.BLOCKCHAIN,
"difficulty_total",
"Total difficulty of the chainhead",
() -> this.getChainHead().getTotalDifficulty().toBigInteger().doubleValue());
metricsSystem.createLongGauge(
BesuMetricCategory.BLOCKCHAIN,
"chain_head_timestamp",
"Timestamp from the current chain head",
() -> getChainHeadHeader().getTimestamp());
metricsSystem.createLongGauge(
BesuMetricCategory.BLOCKCHAIN,
"chain_head_gas_used",
"Gas used by the current chain head block",
() -> getChainHeadHeader().getGasUsed());
metricsSystem.createLongGauge(
BesuMetricCategory.BLOCKCHAIN,
"chain_head_gas_limit",
"Block gas limit of the current chain head block",
() -> getChainHeadHeader().getGasLimit());
metricsSystem.createIntegerGauge(
BesuMetricCategory.BLOCKCHAIN,
"chain_head_transaction_count",
"Number of transactions in the current chain head block",
() -> chainHeadTransactionCount);
metricsSystem.createIntegerGauge(
BesuMetricCategory.BLOCKCHAIN,
"chain_head_ommer_count",
"Number of ommers in the current chain head block",
() -> chainHeadOmmerCount);
this.reorgLoggingThreshold = reorgLoggingThreshold;
this.blockChoiceRule = heaviestChainBlockChoiceRule;
this.numberOfBlocksToCache = numberOfBlocksToCache;
if (numberOfBlocksToCache != 0) {
blockHeadersCache =
Optional.of(
CacheBuilder.newBuilder().recordStats().maximumSize(numberOfBlocksToCache).build());
blockBodiesCache =
Optional.of(
CacheBuilder.newBuilder().recordStats().maximumSize(numberOfBlocksToCache).build());
transactionReceiptsCache =
Optional.of(
CacheBuilder.newBuilder().recordStats().maximumSize(numberOfBlocksToCache).build());
totalDifficultyCache =
Optional.of(
CacheBuilder.newBuilder().recordStats().maximumSize(numberOfBlocksToCache).build());
CacheMetricsCollector cacheMetrics = new CacheMetricsCollector();
cacheMetrics.addCache("blockHeaders", blockHeadersCache.get());
cacheMetrics.addCache("blockBodies", blockBodiesCache.get());
cacheMetrics.addCache("transactionReceipts", transactionReceiptsCache.get());
cacheMetrics.addCache("totalDifficulty", totalDifficultyCache.get());
if (metricsSystem instanceof PrometheusMetricsSystem prometheusMetricsSystem)
prometheusMetricsSystem.addCollector(BesuMetricCategory.BLOCKCHAIN, () -> cacheMetrics);
} else {
blockHeadersCache = Optional.empty();
blockBodiesCache = Optional.empty();
transactionReceiptsCache = Optional.empty();
totalDifficultyCache = Optional.empty();
}
}
public static MutableBlockchain createMutable(
final Block genesisBlock,
final BlockchainStorage blockchainStorage,
final MetricsSystem metricsSystem,
final long reorgLoggingThreshold) {
checkNotNull(genesisBlock);
return new DefaultBlockchain(
Optional.of(genesisBlock),
blockchainStorage,
metricsSystem,
reorgLoggingThreshold,
null,
0);
}
public static MutableBlockchain createMutable(
final Block genesisBlock,
final BlockchainStorage blockchainStorage,
final MetricsSystem metricsSystem,
final long reorgLoggingThreshold,
final String dataDirectory) {
checkNotNull(genesisBlock);
return new DefaultBlockchain(
Optional.of(genesisBlock),
blockchainStorage,
metricsSystem,
reorgLoggingThreshold,
dataDirectory,
0);
}
public static MutableBlockchain createMutable(
final Block genesisBlock,
final BlockchainStorage blockchainStorage,
final MetricsSystem metricsSystem,
final long reorgLoggingThreshold,
final String dataDirectory,
final int numberOfBlocksToCache) {
checkNotNull(genesisBlock);
return new DefaultBlockchain(
Optional.of(genesisBlock),
blockchainStorage,
metricsSystem,
reorgLoggingThreshold,
dataDirectory,
numberOfBlocksToCache);
}
public static Blockchain create(
final BlockchainStorage blockchainStorage,
final MetricsSystem metricsSystem,
final long reorgLoggingThreshold) {
checkArgument(
validateStorageNonEmpty(blockchainStorage), "Cannot create Blockchain from empty storage");
return new DefaultBlockchain(
Optional.empty(), blockchainStorage, metricsSystem, reorgLoggingThreshold);
}
private static boolean validateStorageNonEmpty(final BlockchainStorage blockchainStorage) {
// Run a few basic checks to make sure data looks available and consistent
return blockchainStorage
.getChainHead()
.flatMap(blockchainStorage::getTotalDifficulty)
.isPresent()
&& blockchainStorage.getBlockHash(BlockHeader.GENESIS_BLOCK_NUMBER).isPresent();
}
@Override
public ChainHead getChainHead() {
return new ChainHead(chainHeader, totalDifficulty, chainHeader.getNumber());
}
@Override
public Optional<Hash> getFinalized() {
return blockchainStorage.getFinalized();
}
@Override
public Optional<Hash> getSafeBlock() {
return blockchainStorage.getSafeBlock();
}
@Override
public Hash getChainHeadHash() {
return chainHeader.getHash();
}
@Override
public long getChainHeadBlockNumber() {
return chainHeader.getNumber();
}
@Override
public BlockHeader getChainHeadHeader() {
return chainHeader;
}
@Override
public Block getChainHeadBlock() {
return new Block(chainHeader, blockchainStorage.getBlockBody(chainHeader.getHash()).get());
}
@Override
public Optional<BlockHeader> getBlockHeader(final long blockNumber) {
return blockchainStorage.getBlockHash(blockNumber).flatMap(this::getBlockHeader);
}
@Override
public Optional<BlockHeader> getBlockHeader(final Hash blockHeaderHash) {
return blockHeadersCache
.map(
cache ->
Optional.ofNullable(cache.getIfPresent(blockHeaderHash))
.or(() -> blockchainStorage.getBlockHeader(blockHeaderHash)))
.orElseGet(() -> blockchainStorage.getBlockHeader(blockHeaderHash));
}
@Override
public synchronized Optional<BlockHeader> getBlockHeaderSafe(final Hash blockHeaderHash) {
return blockHeadersCache
.map(
cache ->
Optional.ofNullable(cache.getIfPresent(blockHeaderHash))
.or(() -> blockchainStorage.getBlockHeader(blockHeaderHash)))
.orElseGet(() -> blockchainStorage.getBlockHeader(blockHeaderHash));
}
@Override
public Optional<BlockBody> getBlockBody(final Hash blockHeaderHash) {
return blockBodiesCache
.map(
cache ->
Optional.ofNullable(cache.getIfPresent(blockHeaderHash))
.or(() -> blockchainStorage.getBlockBody(blockHeaderHash)))
.orElseGet(() -> blockchainStorage.getBlockBody(blockHeaderHash));
}
@Override
public Optional<List<TransactionReceipt>> getTxReceipts(final Hash blockHeaderHash) {
return transactionReceiptsCache
.map(
cache ->
Optional.ofNullable(cache.getIfPresent(blockHeaderHash))
.or(() -> blockchainStorage.getTransactionReceipts(blockHeaderHash)))
.orElseGet(() -> blockchainStorage.getTransactionReceipts(blockHeaderHash));
}
@Override
public Optional<Hash> getBlockHashByNumber(final long number) {
return blockchainStorage.getBlockHash(number);
}
@Override
public Optional<Difficulty> getTotalDifficultyByHash(final Hash blockHeaderHash) {
return totalDifficultyCache
.map(
cache ->
Optional.ofNullable(cache.getIfPresent(blockHeaderHash))
.or(() -> blockchainStorage.getTotalDifficulty(blockHeaderHash)))
.orElseGet(() -> blockchainStorage.getTotalDifficulty(blockHeaderHash));
}
@Override
public Optional<Transaction> getTransactionByHash(final Hash transactionHash) {
return blockchainStorage
.getTransactionLocation(transactionHash)
.flatMap(
l ->
blockchainStorage
.getBlockBody(l.getBlockHash())
.map(b -> b.getTransactions().get(l.getTransactionIndex())));
}
@Override
public Optional<TransactionLocation> getTransactionLocation(final Hash transactionHash) {
return blockchainStorage.getTransactionLocation(transactionHash);
}
@Override
public Comparator<BlockHeader> getBlockChoiceRule() {
return blockChoiceRule;
}
@Override
public void setBlockChoiceRule(final Comparator<BlockHeader> blockChoiceRule) {
this.blockChoiceRule = blockChoiceRule;
}
@Override
public synchronized void appendBlock(final Block block, final List<TransactionReceipt> receipts) {
if (numberOfBlocksToCache != 0) cacheBlockData(block, receipts);
appendBlockHelper(new BlockWithReceipts(block, receipts), false);
}
@Override
public synchronized void storeBlock(final Block block, final List<TransactionReceipt> receipts) {
if (numberOfBlocksToCache != 0) cacheBlockData(block, receipts);
appendBlockHelper(new BlockWithReceipts(block, receipts), true);
}
private void cacheBlockData(final Block block, final List<TransactionReceipt> receipts) {
blockHeadersCache.ifPresent(cache -> cache.put(block.getHash(), block.getHeader()));
blockBodiesCache.ifPresent(cache -> cache.put(block.getHash(), block.getBody()));
transactionReceiptsCache.ifPresent(cache -> cache.put(block.getHash(), receipts));
totalDifficultyCache.ifPresent(
cache -> cache.put(block.getHash(), block.getHeader().getDifficulty()));
}
private boolean blockShouldBeProcessed(
final Block block, final List<TransactionReceipt> receipts) {
checkArgument(
block.getBody().getTransactions().size() == receipts.size(),
"Supplied receipts do not match block transactions.");
if (blockIsAlreadyTracked(block)) {
return false;
}
checkArgument(blockIsConnected(block), "Attempt to append non-connected block.");
return true;
}
private void appendBlockHelper(
final BlockWithReceipts blockWithReceipts, final boolean storeOnly) {
if (!blockShouldBeProcessed(blockWithReceipts.getBlock(), blockWithReceipts.getReceipts())) {
return;
}
final Block block = blockWithReceipts.getBlock();
final List<TransactionReceipt> receipts = blockWithReceipts.getReceipts();
final Hash hash = block.getHash();
final Difficulty td = calculateTotalDifficulty(block.getHeader());
final BlockchainStorage.Updater updater = blockchainStorage.updater();
updater.putBlockHeader(hash, block.getHeader());
updater.putBlockBody(hash, block.getBody());
updater.putTransactionReceipts(hash, receipts);
updater.putTotalDifficulty(hash, td);
final BlockAddedEvent blockAddedEvent;
if (storeOnly) {
blockAddedEvent = handleStoreOnly(blockWithReceipts);
} else {
blockAddedEvent = updateCanonicalChainData(updater, blockWithReceipts);
if (blockAddedEvent.isNewCanonicalHead()) {
updateCacheForNewCanonicalHead(block, td);
}
}
updater.commit();
blockAddedObservers.forEach(observer -> observer.onBlockAdded(blockAddedEvent));
}
@Override
public synchronized void unsafeImportBlock(
final Block block,
final List<TransactionReceipt> transactionReceipts,
final Optional<Difficulty> maybeTotalDifficulty) {
final BlockchainStorage.Updater updater = blockchainStorage.updater();
final Hash hash = block.getHash();
updater.putBlockHeader(hash, block.getHeader());
updater.putBlockHash(block.getHeader().getNumber(), hash);
updater.putBlockBody(hash, block.getBody());
final int nbTrx = block.getBody().getTransactions().size();
for (int i = 0; i < nbTrx; i++) {
final Hash transactionHash = block.getBody().getTransactions().get(i).getHash();
updater.putTransactionLocation(transactionHash, new TransactionLocation(transactionHash, i));
}
updater.putTransactionReceipts(hash, transactionReceipts);
maybeTotalDifficulty.ifPresent(
totalDifficulty -> updater.putTotalDifficulty(hash, totalDifficulty));
updater.commit();
}
@Override
public synchronized void unsafeSetChainHead(
final BlockHeader blockHeader, final Difficulty totalDifficulty) {
final BlockchainStorage.Updater updater = blockchainStorage.updater();
this.chainHeader = blockHeader;
this.totalDifficulty = totalDifficulty;
updater.setChainHead(blockHeader.getBlockHash());
updater.commit();
}
private Difficulty calculateTotalDifficulty(final BlockHeader blockHeader) {
if (blockHeader.getNumber() == BlockHeader.GENESIS_BLOCK_NUMBER) {
return blockHeader.getDifficulty();
}
final Difficulty parentTotalDifficulty =
blockchainStorage
.getTotalDifficulty(blockHeader.getParentHash())
.orElseThrow(
() -> new IllegalStateException("Blockchain is missing total difficulty data."));
return blockHeader.getDifficulty().add(parentTotalDifficulty);
}
private BlockAddedEvent updateCanonicalChainData(
final BlockchainStorage.Updater updater, final BlockWithReceipts blockWithReceipts) {
final Block newBlock = blockWithReceipts.getBlock();
final Hash chainHead = blockchainStorage.getChainHead().orElse(null);
if (newBlock.getHeader().getNumber() != BlockHeader.GENESIS_BLOCK_NUMBER && chainHead == null) {
throw new IllegalStateException("Blockchain is missing chain head.");
}
try {
if (newBlock.getHeader().getParentHash().equals(chainHead) || chainHead == null) {
return handleNewHead(updater, blockWithReceipts);
} else if (blockChoiceRule.compare(newBlock.getHeader(), chainHeader) > 0) {
// New block represents a chain reorganization
return handleChainReorg(updater, blockWithReceipts);
} else {
// New block represents a fork
return handleFork(updater, newBlock);
}
} catch (final NoSuchElementException e) {
// Any Optional.get() calls in this block should be present, missing data means data
// corruption or a bug.
updater.rollback();
throw new IllegalStateException("Blockchain is missing data that should be present.", e);
}
}
private BlockAddedEvent handleStoreOnly(final BlockWithReceipts blockWithReceipts) {
return BlockAddedEvent.createForStoredOnly(blockWithReceipts.getBlock());
}
private BlockAddedEvent handleNewHead(
final Updater updater, final BlockWithReceipts blockWithReceipts) {
// This block advances the chain, update the chain head
final Hash newBlockHash = blockWithReceipts.getHash();
updater.putBlockHash(blockWithReceipts.getNumber(), newBlockHash);
updater.setChainHead(newBlockHash);
indexTransactionForBlock(
updater, newBlockHash, blockWithReceipts.getBlock().getBody().getTransactions());
return BlockAddedEvent.createForHeadAdvancement(
blockWithReceipts.getBlock(),
LogWithMetadata.generate(
blockWithReceipts.getBlock(), blockWithReceipts.getReceipts(), false),
blockWithReceipts.getReceipts());
}
private BlockAddedEvent handleFork(final BlockchainStorage.Updater updater, final Block fork) {
final Collection<Hash> forkHeads = blockchainStorage.getForkHeads();
// Check to see if this block advances any existing fork.
// This block will replace its parent
forkHeads.stream()
.filter(head -> head.equals(fork.getHeader().getParentHash()))
.findAny()
.ifPresent(forkHeads::remove);
forkHeads.add(fork.getHash());
updater.setForkHeads(forkHeads);
return BlockAddedEvent.createForFork(fork);
}
private BlockAddedEvent handleChainReorg(
final BlockchainStorage.Updater updater, final BlockWithReceipts newChainHeadWithReceipts) {
final BlockWithReceipts oldChainWithReceipts = getBlockWithReceipts(chainHeader).get();
BlockWithReceipts currentOldChainWithReceipts = oldChainWithReceipts;
BlockWithReceipts currentNewChainWithReceipts = newChainHeadWithReceipts;
// Update chain head
updater.setChainHead(currentNewChainWithReceipts.getHeader().getHash());
// Track transactions and logs to be added and removed
final Map<Hash, List<Transaction>> newTransactions = new HashMap<>();
final List<Transaction> removedTransactions = new ArrayList<>();
final List<LogWithMetadata> addedLogsWithMetadata = new ArrayList<>();
final List<LogWithMetadata> removedLogsWithMetadata = new ArrayList<>();
while (currentNewChainWithReceipts.getNumber() > currentOldChainWithReceipts.getNumber()) {
// If new chain is longer than old chain, walk back until we meet the old chain by number
// adding indexing for new chain along the way.
final Hash blockHash = currentNewChainWithReceipts.getHash();
updater.putBlockHash(currentNewChainWithReceipts.getNumber(), blockHash);
newTransactions.put(
blockHash, currentNewChainWithReceipts.getBlock().getBody().getTransactions());
addAddedLogsWithMetadata(addedLogsWithMetadata, currentNewChainWithReceipts);
notifyChainReorgBlockAdded(currentNewChainWithReceipts);
currentNewChainWithReceipts = getParentBlockWithReceipts(currentNewChainWithReceipts);
}
while (currentOldChainWithReceipts.getNumber() > currentNewChainWithReceipts.getNumber()) {
// If oldChain is longer than new chain, walk back until we meet the new chain by number,
// updating as we go.
updater.removeBlockHash(currentOldChainWithReceipts.getNumber());
removedTransactions.addAll(
currentOldChainWithReceipts.getBlock().getBody().getTransactions());
addRemovedLogsWithMetadata(removedLogsWithMetadata, currentOldChainWithReceipts);
currentOldChainWithReceipts = getParentBlockWithReceipts(currentOldChainWithReceipts);
}
while (!currentOldChainWithReceipts.getHash().equals(currentNewChainWithReceipts.getHash())) {
// Walk back until we meet the common ancestor between the two chains, updating as we go.
final Hash newBlockHash = currentNewChainWithReceipts.getHash();
updater.putBlockHash(currentNewChainWithReceipts.getNumber(), newBlockHash);
newTransactions.put(
newBlockHash, currentNewChainWithReceipts.getBlock().getBody().getTransactions());
removedTransactions.addAll(
currentOldChainWithReceipts.getBlock().getBody().getTransactions());
addAddedLogsWithMetadata(addedLogsWithMetadata, currentNewChainWithReceipts);
addRemovedLogsWithMetadata(removedLogsWithMetadata, currentOldChainWithReceipts);
currentNewChainWithReceipts = getParentBlockWithReceipts(currentNewChainWithReceipts);
currentOldChainWithReceipts = getParentBlockWithReceipts(currentOldChainWithReceipts);
}
final BlockWithReceipts commonAncestorWithReceipts = currentNewChainWithReceipts;
// Update indexed transactions
newTransactions.forEach(
(blockHash, transactionsInBlock) -> {
indexTransactionForBlock(updater, blockHash, transactionsInBlock);
// Don't remove transactions that are being re-indexed.
removedTransactions.removeAll(transactionsInBlock);
});
clearIndexedTransactionsForBlock(updater, removedTransactions);
// Update tracked forks
final Collection<Hash> forks = blockchainStorage.getForkHeads();
// Old head is now a fork
forks.add(oldChainWithReceipts.getHash());
// Remove new chain head's parent if it was tracked as a fork
final Optional<Hash> parentFork =
forks.stream()
.filter(f -> f.equals(newChainHeadWithReceipts.getHeader().getParentHash()))
.findAny();
parentFork.ifPresent(forks::remove);
updater.setForkHeads(forks);
maybeLogReorg(newChainHeadWithReceipts, oldChainWithReceipts, commonAncestorWithReceipts);
return BlockAddedEvent.createForChainReorg(
newChainHeadWithReceipts.getBlock(),
newTransactions.values().stream().flatMap(Collection::stream).collect(toList()),
removedTransactions,
newChainHeadWithReceipts.getReceipts(),
Stream.concat(removedLogsWithMetadata.stream(), addedLogsWithMetadata.stream())
.collect(Collectors.toUnmodifiableList()),
currentNewChainWithReceipts.getBlock().getHash());
}
private void maybeLogReorg(
final BlockWithReceipts newChainHeadWithReceipts,
final BlockWithReceipts oldChainWithReceipts,
final BlockWithReceipts commonAncestorWithReceipts) {
if ((newChainHeadWithReceipts.getNumber() - commonAncestorWithReceipts.getNumber()
> reorgLoggingThreshold
|| oldChainWithReceipts.getNumber() - commonAncestorWithReceipts.getNumber()
> reorgLoggingThreshold)
&& LOG.isWarnEnabled()) {
LOG.warn(
"Chain Reorganization +{} new / -{} old\n{}",
newChainHeadWithReceipts.getNumber() - commonAncestorWithReceipts.getNumber(),
oldChainWithReceipts.getNumber() - commonAncestorWithReceipts.getNumber(),
Streams.zip(
Stream.of("Old", "New", "Ancestor"),
Stream.of(
oldChainWithReceipts,
newChainHeadWithReceipts,
commonAncestorWithReceipts)
.map(
blockWithReceipts ->
String.format(
"hash: %s, height: %s",
blockWithReceipts.getHash(), blockWithReceipts.getNumber())),
(label, values) -> String.format("%10s - %s", label, values))
.collect(joining("\n")));
}
}
@Override
public boolean rewindToBlock(final long blockNumber) {
return blockchainStorage.getBlockHash(blockNumber).map(this::rewindToBlock).orElse(false);
}
@Override
public boolean rewindToBlock(final Hash blockHash) {
final BlockchainStorage.Updater updater = blockchainStorage.updater();
try {
final BlockHeader oldBlockHeader = blockchainStorage.getBlockHeader(blockHash).get();
final BlockWithReceipts blockWithReceipts = getBlockWithReceipts(oldBlockHeader).get();
final Block block = blockWithReceipts.getBlock();
var reorgEvent = handleChainReorg(updater, blockWithReceipts);
updater.commit();
blockAddedObservers.forEach(o -> o.onBlockAdded(reorgEvent));
updateCacheForNewCanonicalHead(block, calculateTotalDifficulty(block.getHeader()));
return true;
} catch (final NoSuchElementException e) {
// Any Optional.get() calls in this block should be present, missing data means data
// corruption or a bug.
updater.rollback();
throw new IllegalStateException("Blockchain is missing data that should be present.", e);
}
}
@Override
public boolean forwardToBlock(final BlockHeader blockHeader) {
checkArgument(
chainHeader.getHash().equals(blockHeader.getParentHash()),
"Supplied block header is not a child of the current chain head.");
final BlockchainStorage.Updater updater = blockchainStorage.updater();
try {
final BlockWithReceipts blockWithReceipts = getBlockWithReceipts(blockHeader).get();
BlockAddedEvent newHeadEvent = handleNewHead(updater, blockWithReceipts);
updateCacheForNewCanonicalHead(
blockWithReceipts.getBlock(), calculateTotalDifficulty(blockHeader));
updater.commit();
blockAddedObservers.forEach(observer -> observer.onBlockAdded(newHeadEvent));
return true;
} catch (final NoSuchElementException e) {
// Any Optional.get() calls in this block should be present, missing data means data
// corruption or a bug.
updater.rollback();
throw new IllegalStateException("Blockchain is missing data that should be present.", e);
}
}
@Override
public void setFinalized(final Hash blockHash) {
final var updater = blockchainStorage.updater();
updater.setFinalized(blockHash);
updater.commit();
}
@Override
public void setSafeBlock(final Hash blockHash) {
final var updater = blockchainStorage.updater();
updater.setSafeBlock(blockHash);
updater.commit();
}
private void updateCacheForNewCanonicalHead(final Block block, final Difficulty uInt256) {
chainHeader = block.getHeader();
totalDifficulty = uInt256;
chainHeadTransactionCount = block.getBody().getTransactions().size();
chainHeadOmmerCount = block.getBody().getOmmers().size();
}
private static void indexTransactionForBlock(
final BlockchainStorage.Updater updater, final Hash hash, final List<Transaction> txs) {
for (int i = 0; i < txs.size(); i++) {
final Hash txHash = txs.get(i).getHash();
final TransactionLocation loc = new TransactionLocation(hash, i);
updater.putTransactionLocation(txHash, loc);
}
}
private static void clearIndexedTransactionsForBlock(
final BlockchainStorage.Updater updater, final List<Transaction> txs) {
for (final Transaction tx : txs) {
updater.removeTransactionLocation(tx.getHash());
}
}
@VisibleForTesting
Set<Hash> getForks() {
return new HashSet<>(blockchainStorage.getForkHeads());
}
private void setGenesis(final Block genesisBlock, final String dataDirectory) {
checkArgument(
genesisBlock.getHeader().getNumber() == BlockHeader.GENESIS_BLOCK_NUMBER,
"Invalid genesis block.");
final Optional<Hash> maybeHead = blockchainStorage.getChainHead();
if (maybeHead.isEmpty()) {
// Initialize blockchain store with genesis block.
final BlockchainStorage.Updater updater = blockchainStorage.updater();
final Hash hash = genesisBlock.getHash();
updater.putBlockHeader(hash, genesisBlock.getHeader());
updater.putBlockBody(hash, genesisBlock.getBody());
updater.putTransactionReceipts(hash, emptyList());
updater.putTotalDifficulty(hash, calculateTotalDifficulty(genesisBlock.getHeader()));
updater.putBlockHash(genesisBlock.getHeader().getNumber(), hash);
updater.setChainHead(hash);
updater.commit();
} else {
// Verify genesis block is consistent with stored blockchain.
final Optional<Hash> genesisHash = getBlockHashByNumber(BlockHeader.GENESIS_BLOCK_NUMBER);
if (genesisHash.isEmpty()) {
throw new IllegalStateException("Blockchain is missing genesis block data.");
}
if (!genesisHash.get().equals(genesisBlock.getHash())) {
throw new InvalidConfigurationException(
"Supplied genesis block does not match chain data stored in "
+ dataDirectory
+ ".\n"
+ "Please specify a different data directory with --data-path, specify the original genesis file with "
+ "--genesis-file or supply a testnet/mainnet option with --network.");
}
}
}
private boolean blockIsAlreadyTracked(final Block block) {
if (block.getHeader().getParentHash().equals(chainHeader.getHash())) {
// If this block builds on our chain head it would have a higher TD and be the chain head
// but since it isn't we mustn't have imported it yet.
// Saves a db read for the most common case
return false;
}
return blockchainStorage.getBlockHeader(block.getHash()).isPresent();
}
private boolean blockIsConnected(final Block block) {
return blockchainStorage.getBlockHeader(block.getHeader().getParentHash()).isPresent();
}
private void addAddedLogsWithMetadata(
final List<LogWithMetadata> logsWithMetadata, final BlockWithReceipts blockWithReceipts) {
logsWithMetadata.addAll(
0,
LogWithMetadata.generate(
blockWithReceipts.getBlock(), blockWithReceipts.getReceipts(), false));
}
private void addRemovedLogsWithMetadata(
final List<LogWithMetadata> logsWithMetadata, final BlockWithReceipts blockWithReceipts) {
logsWithMetadata.addAll(
Lists.reverse(
LogWithMetadata.generate(
blockWithReceipts.getBlock(), blockWithReceipts.getReceipts(), true)));
}
private Optional<BlockWithReceipts> getBlockWithReceipts(final BlockHeader blockHeader) {
return blockchainStorage
.getBlockBody(blockHeader.getHash())
.map(body -> new Block(blockHeader, body))
.flatMap(
block ->
blockchainStorage
.getTransactionReceipts(blockHeader.getHash())
.map(receipts -> new BlockWithReceipts(block, receipts)));
}
private BlockWithReceipts getParentBlockWithReceipts(final BlockWithReceipts blockWithReceipts) {
return blockchainStorage
.getBlockHeader(blockWithReceipts.getHeader().getParentHash())
.flatMap(this::getBlockWithReceipts)
.get();
}
@Override
public long observeBlockAdded(final BlockAddedObserver observer) {
checkNotNull(observer);
return blockAddedObservers.subscribe(observer);
}
@Override
public boolean removeObserver(final long observerId) {
return blockAddedObservers.unsubscribe(observerId);
}
@Override
public long observeChainReorg(final ChainReorgObserver observer) {
checkNotNull(observer);
return blockReorgObservers.subscribe(observer);
}
@Override
public boolean removeChainReorgObserver(final long observerId) {
return blockReorgObservers.unsubscribe(observerId);
}
@VisibleForTesting
int observerCount() {
return blockAddedObservers.getSubscriberCount();
}
private void notifyChainReorgBlockAdded(final BlockWithReceipts blockWithReceipts) {
blockReorgObservers.forEach(observer -> observer.onBlockAdded(blockWithReceipts, this));
}
public Optional<Cache<Hash, BlockHeader>> getBlockHeadersCache() {
return blockHeadersCache;
}
public Optional<Cache<Hash, BlockBody>> getBlockBodiesCache() {
return blockBodiesCache;
}
public Optional<Cache<Hash, List<TransactionReceipt>>> getTransactionReceiptsCache() {
return transactionReceiptsCache;
}
public Optional<Cache<Hash, Difficulty>> getTotalDifficultyCache() {
return totalDifficultyCache;
}
}