KeyValueStoragePrefixedKeyBlockchainStorage.java
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.storage.keyvalue;
import static org.hyperledger.besu.ethereum.chain.VariablesStorage.Keys.CHAIN_HEAD_HASH;
import static org.hyperledger.besu.ethereum.chain.VariablesStorage.Keys.FINALIZED_BLOCK_HASH;
import static org.hyperledger.besu.ethereum.chain.VariablesStorage.Keys.FORK_HEADS;
import static org.hyperledger.besu.ethereum.chain.VariablesStorage.Keys.SAFE_BLOCK_HASH;
import static org.hyperledger.besu.ethereum.chain.VariablesStorage.Keys.SEQ_NO_STORE;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.BlockchainStorage;
import org.hyperledger.besu.ethereum.chain.TransactionLocation;
import org.hyperledger.besu.ethereum.chain.VariablesStorage;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderFunctions;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.units.bigints.UInt256;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KeyValueStoragePrefixedKeyBlockchainStorage implements BlockchainStorage {
private static final Logger LOG =
LoggerFactory.getLogger(KeyValueStoragePrefixedKeyBlockchainStorage.class);
@Deprecated(since = "23.4.2", forRemoval = true)
private static final Bytes VARIABLES_PREFIX = Bytes.of(1);
private static final Bytes BLOCK_HEADER_PREFIX = Bytes.of(2);
private static final Bytes BLOCK_BODY_PREFIX = Bytes.of(3);
private static final Bytes TRANSACTION_RECEIPTS_PREFIX = Bytes.of(4);
private static final Bytes BLOCK_HASH_PREFIX = Bytes.of(5);
private static final Bytes TOTAL_DIFFICULTY_PREFIX = Bytes.of(6);
private static final Bytes TRANSACTION_LOCATION_PREFIX = Bytes.of(7);
final KeyValueStorage blockchainStorage;
final VariablesStorage variablesStorage;
final BlockHeaderFunctions blockHeaderFunctions;
final boolean receiptCompaction;
public KeyValueStoragePrefixedKeyBlockchainStorage(
final KeyValueStorage blockchainStorage,
final VariablesStorage variablesStorage,
final BlockHeaderFunctions blockHeaderFunctions,
final boolean receiptCompaction) {
this.blockchainStorage = blockchainStorage;
this.variablesStorage = variablesStorage;
this.blockHeaderFunctions = blockHeaderFunctions;
this.receiptCompaction = receiptCompaction;
migrateVariables();
}
@Override
public Optional<Hash> getChainHead() {
return variablesStorage.getChainHead();
}
@Override
public Collection<Hash> getForkHeads() {
return variablesStorage.getForkHeads();
}
@Override
public Optional<Hash> getFinalized() {
return variablesStorage.getFinalized();
}
@Override
public Optional<Hash> getSafeBlock() {
return variablesStorage.getSafeBlock();
}
@Override
public Optional<BlockHeader> getBlockHeader(final Hash blockHash) {
return get(BLOCK_HEADER_PREFIX, blockHash)
.map(b -> BlockHeader.readFrom(RLP.input(b), blockHeaderFunctions));
}
@Override
public Optional<BlockBody> getBlockBody(final Hash blockHash) {
return get(BLOCK_BODY_PREFIX, blockHash)
.map(bytes -> BlockBody.readWrappedBodyFrom(RLP.input(bytes), blockHeaderFunctions));
}
@Override
public Optional<List<TransactionReceipt>> getTransactionReceipts(final Hash blockHash) {
return get(TRANSACTION_RECEIPTS_PREFIX, blockHash).map(this::rlpDecodeTransactionReceipts);
}
@Override
public Optional<Hash> getBlockHash(final long blockNumber) {
return get(BLOCK_HASH_PREFIX, UInt256.valueOf(blockNumber)).map(this::bytesToHash);
}
@Override
public Optional<Difficulty> getTotalDifficulty(final Hash blockHash) {
return get(TOTAL_DIFFICULTY_PREFIX, blockHash).map(b -> Difficulty.wrap(Bytes32.wrap(b, 0)));
}
@Override
public Optional<TransactionLocation> getTransactionLocation(final Hash transactionHash) {
return get(TRANSACTION_LOCATION_PREFIX, transactionHash)
.map(bytes -> TransactionLocation.readFrom(RLP.input(bytes)));
}
@Override
public Updater updater() {
return new Updater(
blockchainStorage.startTransaction(), variablesStorage.updater(), receiptCompaction);
}
private List<TransactionReceipt> rlpDecodeTransactionReceipts(final Bytes bytes) {
return RLP.input(bytes).readList(TransactionReceipt::readFrom);
}
private Hash bytesToHash(final Bytes bytes) {
return Hash.wrap(Bytes32.wrap(bytes, 0));
}
Optional<Bytes> get(final Bytes prefix, final Bytes key) {
return blockchainStorage.get(Bytes.concatenate(prefix, key).toArrayUnsafe()).map(Bytes::wrap);
}
/**
* One time migration of variables from the blockchain storage to the dedicated variable storage.
* To avoid state inconsistency in case of a downgrade done without running the storage
* revert-variables subcommand it fails giving the possibility to retry the downgrade procedure.
*/
private void migrateVariables() {
final var blockchainUpdater = updater();
final var variablesUpdater = variablesStorage.updater();
get(VARIABLES_PREFIX, CHAIN_HEAD_HASH.getBytes())
.map(this::bytesToHash)
.ifPresent(
bch ->
variablesStorage
.getChainHead()
.ifPresentOrElse(
vch -> {
if (!vch.equals(bch)) {
logInconsistencyAndFail(CHAIN_HEAD_HASH, bch, vch);
}
},
() -> {
variablesUpdater.setChainHead(bch);
LOG.info("Migrated key {} to variables storage", CHAIN_HEAD_HASH);
}));
get(VARIABLES_PREFIX, FINALIZED_BLOCK_HASH.getBytes())
.map(this::bytesToHash)
.ifPresent(
bfh -> {
variablesStorage
.getFinalized()
.ifPresentOrElse(
vfh -> {
if (!vfh.equals(bfh)) {
logInconsistencyAndFail(FINALIZED_BLOCK_HASH, bfh, vfh);
}
},
() -> {
variablesUpdater.setFinalized(bfh);
LOG.info("Migrated key {} to variables storage", FINALIZED_BLOCK_HASH);
});
});
get(VARIABLES_PREFIX, SAFE_BLOCK_HASH.getBytes())
.map(this::bytesToHash)
.ifPresent(
bsh -> {
variablesStorage
.getSafeBlock()
.ifPresentOrElse(
vsh -> {
if (!vsh.equals(bsh)) {
logInconsistencyAndFail(SAFE_BLOCK_HASH, bsh, vsh);
}
},
() -> {
variablesUpdater.setSafeBlock(bsh);
LOG.info("Migrated key {} to variables storage", SAFE_BLOCK_HASH);
});
});
get(VARIABLES_PREFIX, FORK_HEADS.getBytes())
.map(bytes -> RLP.input(bytes).readList(in -> this.bytesToHash(in.readBytes32())))
.ifPresent(
bfh -> {
final var vfh = variablesStorage.getForkHeads();
if (vfh.isEmpty()) {
variablesUpdater.setForkHeads(bfh);
LOG.info("Migrated key {} to variables storage", FORK_HEADS);
} else if (!List.copyOf(vfh).equals(bfh)) {
logInconsistencyAndFail(FORK_HEADS, bfh, vfh);
}
});
get(Bytes.EMPTY, SEQ_NO_STORE.getBytes())
.ifPresent(
bsns -> {
variablesStorage
.getLocalEnrSeqno()
.ifPresentOrElse(
vsns -> {
if (!vsns.equals(bsns)) {
logInconsistencyAndFail(SEQ_NO_STORE, bsns, vsns);
}
},
() -> {
variablesUpdater.setLocalEnrSeqno(bsns);
LOG.info("Migrated key {} to variables storage", SEQ_NO_STORE);
});
});
blockchainUpdater.removeVariables();
variablesUpdater.commit();
blockchainUpdater.commit();
}
private static void logInconsistencyAndFail(
final VariablesStorage.Keys key, final Object bch, final Object vch) {
LOG.error(
"Inconsistency found when migrating {} to variables storage,"
+ " probably this is due to a downgrade done without running the `storage revert-variables`"
+ " subcommand first, see https://github.com/hyperledger/besu/pull/5471",
key);
throw new IllegalStateException(
key + " mismatch: blockchain storage value=" + bch + ", variables storage value=" + vch);
}
public static class Updater implements BlockchainStorage.Updater {
private final KeyValueStorageTransaction blockchainTransaction;
private final VariablesStorage.Updater variablesUpdater;
private final boolean receiptCompaction;
Updater(
final KeyValueStorageTransaction blockchainTransaction,
final VariablesStorage.Updater variablesUpdater,
final boolean receiptCompaction) {
this.blockchainTransaction = blockchainTransaction;
this.variablesUpdater = variablesUpdater;
this.receiptCompaction = receiptCompaction;
}
@Override
public void putBlockHeader(final Hash blockHash, final BlockHeader blockHeader) {
set(BLOCK_HEADER_PREFIX, blockHash, RLP.encode(blockHeader::writeTo));
}
@Override
public void putBlockBody(final Hash blockHash, final BlockBody blockBody) {
set(BLOCK_BODY_PREFIX, blockHash, RLP.encode(blockBody::writeWrappedBodyTo));
}
@Override
public void putTransactionLocation(
final Hash transactionHash, final TransactionLocation transactionLocation) {
set(TRANSACTION_LOCATION_PREFIX, transactionHash, RLP.encode(transactionLocation::writeTo));
}
@Override
public void putTransactionReceipts(
final Hash blockHash, final List<TransactionReceipt> transactionReceipts) {
set(TRANSACTION_RECEIPTS_PREFIX, blockHash, rlpEncode(transactionReceipts));
}
@Override
public void putBlockHash(final long blockNumber, final Hash blockHash) {
set(BLOCK_HASH_PREFIX, UInt256.valueOf(blockNumber), blockHash);
}
@Override
public void putTotalDifficulty(final Hash blockHash, final Difficulty totalDifficulty) {
set(TOTAL_DIFFICULTY_PREFIX, blockHash, totalDifficulty);
}
@Override
public void setChainHead(final Hash blockHash) {
variablesUpdater.setChainHead(blockHash);
}
@Override
public void setForkHeads(final Collection<Hash> forkHeadHashes) {
variablesUpdater.setForkHeads(forkHeadHashes);
}
@Override
public void setFinalized(final Hash blockHash) {
variablesUpdater.setFinalized(blockHash);
}
@Override
public void setSafeBlock(final Hash blockHash) {
variablesUpdater.setSafeBlock(blockHash);
}
@Override
public void removeBlockHash(final long blockNumber) {
remove(BLOCK_HASH_PREFIX, UInt256.valueOf(blockNumber));
}
@Override
public void removeBlockHeader(final Hash blockHash) {
remove(BLOCK_HEADER_PREFIX, blockHash);
}
@Override
public void removeBlockBody(final Hash blockHash) {
remove(BLOCK_BODY_PREFIX, blockHash);
}
@Override
public void removeTransactionReceipts(final Hash blockHash) {
remove(TRANSACTION_RECEIPTS_PREFIX, blockHash);
}
@Override
public void removeTransactionLocation(final Hash transactionHash) {
remove(TRANSACTION_LOCATION_PREFIX, transactionHash);
}
@Override
public void removeTotalDifficulty(final Hash blockHash) {
remove(TOTAL_DIFFICULTY_PREFIX, blockHash);
}
@Override
public void commit() {
blockchainTransaction.commit();
variablesUpdater.commit();
}
@Override
public void rollback() {
variablesUpdater.rollback();
blockchainTransaction.rollback();
}
void set(final Bytes prefix, final Bytes key, final Bytes value) {
blockchainTransaction.put(
Bytes.concatenate(prefix, key).toArrayUnsafe(), value.toArrayUnsafe());
}
private void remove(final Bytes prefix, final Bytes key) {
blockchainTransaction.remove(Bytes.concatenate(prefix, key).toArrayUnsafe());
}
private Bytes rlpEncode(final List<TransactionReceipt> receipts) {
return RLP.encode(
o ->
o.writeList(
receipts, (r, rlpOutput) -> r.writeToForStorage(rlpOutput, receiptCompaction)));
}
private void removeVariables() {
remove(VARIABLES_PREFIX, CHAIN_HEAD_HASH.getBytes());
remove(VARIABLES_PREFIX, FINALIZED_BLOCK_HASH.getBytes());
remove(VARIABLES_PREFIX, SAFE_BLOCK_HASH.getBytes());
remove(VARIABLES_PREFIX, FORK_HEADS.getBytes());
remove(Bytes.EMPTY, SEQ_NO_STORE.getBytes());
}
}
}