StateBackupService.java
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package org.hyperledger.besu.ethereum.api.query;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import org.hyperledger.besu.config.JsonUtil;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPInput;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.ethereum.trie.Node;
import org.hyperledger.besu.ethereum.trie.TrieIterator;
import org.hyperledger.besu.ethereum.trie.TrieIterator.State;
import org.hyperledger.besu.ethereum.trie.forest.storage.ForestWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.patricia.StoredMerklePatriciaTrie;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import org.hyperledger.besu.util.io.RollingFileWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StateBackupService {
private static final Logger LOG = LoggerFactory.getLogger(StateBackupService.class);
private static final Bytes ACCOUNT_END_MARKER;
static {
final BytesValueRLPOutput endMarker = new BytesValueRLPOutput();
endMarker.startList();
endMarker.endList();
ACCOUNT_END_MARKER = endMarker.encoded();
}
private final String besuVersion;
private final Lock submissionLock = new ReentrantLock();
private final EthScheduler scheduler;
private final Blockchain blockchain;
private final ForestWorldStateKeyValueStorage worldStateKeyValueStorage;
private final BackupStatus backupStatus = new BackupStatus();
private Path backupDir;
private RollingFileWriter accountFileWriter;
public StateBackupService(
final String besuVersion,
final Blockchain blockchain,
final Path backupDir,
final EthScheduler scheduler,
final ForestWorldStateKeyValueStorage worldStateKeyValueStorage) {
this.besuVersion = besuVersion;
this.blockchain = blockchain;
this.backupDir = backupDir;
this.scheduler = scheduler;
this.worldStateKeyValueStorage = worldStateKeyValueStorage;
}
public Path getBackupDir() {
return backupDir;
}
public BackupStatus requestBackup(
final long block, final boolean compress, final Optional<Path> backupDir) {
boolean requestAccepted = false;
try {
if (submissionLock.tryLock(100, TimeUnit.MILLISECONDS)) {
try {
if (!backupStatus.isBackingUp()) {
requestAccepted = true;
this.backupDir = backupDir.orElse(this.backupDir);
backupStatus.targetBlock = block;
backupStatus.compressed = compress;
backupStatus.currentAccount = Bytes32.ZERO;
scheduler.scheduleComputationTask(
() -> {
try {
return backup(block, compress);
} catch (final IOException ioe) {
LOG.error("Error writing backups", ioe);
return backupStatus;
}
});
}
} finally {
submissionLock.unlock();
}
}
} catch (final InterruptedException e) {
// ignore
}
backupStatus.requestAccepted = requestAccepted;
return backupStatus;
}
public static Path dataFileToIndex(final Path dataName) {
return Path.of(dataName.toString().replaceAll("(.*)[-.]\\d\\d\\d\\d\\.(.)dat", "$1.$2idx"));
}
public static Path accountFileName(
final Path backupDir,
final long targetBlock,
final int fileNumber,
final boolean compressed) {
return backupDir.resolve(
String.format(
"besu-account-backup-%08d-%04d.%sdat",
targetBlock, fileNumber, compressed ? "c" : "r"));
}
public static Path headerFileName(
final Path backupDir, final int fileNumber, final boolean compressed) {
return backupDir.resolve(
String.format("besu-header-backup-%04d.%sdat", fileNumber, compressed ? "c" : "r"));
}
public static Path bodyFileName(
final Path backupDir, final int fileNumber, final boolean compressed) {
return backupDir.resolve(
String.format("besu-body-backup-%04d.%sdat", fileNumber, compressed ? "c" : "r"));
}
public static Path receiptFileName(
final Path backupDir, final int fileNumber, final boolean compressed) {
return backupDir.resolve(
String.format("besu-receipt-backup-%04d.%sdat", fileNumber, compressed ? "c" : "r"));
}
private Path accountFileName(final int fileNumber, final boolean compressed) {
return accountFileName(backupDir, backupStatus.targetBlock, fileNumber, compressed);
}
private Path headerFileName(final int fileNumber, final boolean compressed) {
return headerFileName(backupDir, fileNumber, compressed);
}
private Path bodyFileName(final int fileNumber, final boolean compressed) {
return bodyFileName(backupDir, fileNumber, compressed);
}
private Path receiptFileName(final int fileNumber, final boolean compressed) {
return receiptFileName(backupDir, fileNumber, compressed);
}
private BackupStatus backup(final long block, final boolean compress) throws IOException {
checkArgument(
block >= 0 && block <= blockchain.getChainHeadBlockNumber(),
"Backup Block must be within blockchain");
backupStatus.targetBlock = block;
backupStatus.compressed = compress;
backupStatus.currentAccount = Bytes32.ZERO;
backupChainData();
backupLeaves();
writeManifest();
return backupStatus;
}
private void writeManifest() throws IOException {
final Map<String, Object> manifest = new HashMap<>();
manifest.put("clientVersion", besuVersion);
manifest.put("compressed", backupStatus.compressed);
manifest.put("targetBlock", backupStatus.targetBlock);
manifest.put("accountCount", backupStatus.accountCount);
Files.write(
backupDir.resolve("besu-backup-manifest.json"),
JsonUtil.getJson(manifest).getBytes(StandardCharsets.UTF_8));
}
private void backupLeaves() throws IOException {
final Optional<BlockHeader> header = blockchain.getBlockHeader(backupStatus.targetBlock);
if (header.isEmpty()) {
backupStatus.currentAccount = null;
return;
}
final Optional<Bytes> worldStateRoot =
worldStateKeyValueStorage.getAccountStateTrieNode(header.get().getStateRoot());
if (worldStateRoot.isEmpty()) {
backupStatus.currentAccount = null;
return;
}
try (final RollingFileWriter accountFileWriter =
new RollingFileWriter(this::accountFileName, backupStatus.compressed)) {
this.accountFileWriter = accountFileWriter;
final StoredMerklePatriciaTrie<Bytes32, Bytes> accountTrie =
new StoredMerklePatriciaTrie<>(
(location, hash) -> worldStateKeyValueStorage.getAccountStateTrieNode(hash),
header.get().getStateRoot(),
Function.identity(),
Function.identity());
accountTrie.visitLeafs(this::visitAccount);
backupStatus.currentAccount = null;
}
}
private TrieIterator.State visitAccount(final Bytes32 nodeKey, final Node<Bytes> node) {
if (node.getValue().isEmpty()) {
return State.CONTINUE;
}
backupStatus.currentAccount = nodeKey;
final Bytes nodeValue = node.getValue().orElse(Hash.EMPTY);
final StateTrieAccountValue account =
StateTrieAccountValue.readFrom(new BytesValueRLPInput(nodeValue, false));
final Bytes code = worldStateKeyValueStorage.getCode(account.getCodeHash()).orElse(Bytes.EMPTY);
backupStatus.codeSize.addAndGet(code.size());
final BytesValueRLPOutput accountOutput = new BytesValueRLPOutput();
accountOutput.startList();
accountOutput.writeBytes(nodeKey); // trie hash
accountOutput.writeBytes(nodeValue); // account rlp
accountOutput.writeBytes(code); // code
accountOutput.endList();
try {
accountFileWriter.writeBytes(accountOutput.encoded().toArrayUnsafe());
} catch (final IOException ioe) {
LOG.error("Failure writing backup", ioe);
return State.STOP;
}
// storage is written for each leaf, otherwise the whole trie would have to fit in memory
final StoredMerklePatriciaTrie<Bytes32, Bytes> storageTrie =
new StoredMerklePatriciaTrie<>(
(location, hash) -> worldStateKeyValueStorage.getAccountStateTrieNode(hash),
account.getStorageRoot(),
Function.identity(),
Function.identity());
storageTrie.visitLeafs(
(storageKey, storageValue) ->
visitAccountStorage(storageKey, storageValue, accountFileWriter));
try {
accountFileWriter.writeBytes(ACCOUNT_END_MARKER.toArrayUnsafe());
} catch (final IOException ioe) {
LOG.error("Failure writing backup", ioe);
return State.STOP;
}
backupStatus.accountCount.incrementAndGet();
return State.CONTINUE;
}
private void backupChainData() throws IOException {
try (final RollingFileWriter headerWriter =
new RollingFileWriter(this::headerFileName, backupStatus.compressed);
final RollingFileWriter bodyWriter =
new RollingFileWriter(this::bodyFileName, backupStatus.compressed);
final RollingFileWriter receiptsWriter =
new RollingFileWriter(this::receiptFileName, backupStatus.compressed)) {
for (long blockNumber = 0; blockNumber <= backupStatus.targetBlock; blockNumber++) {
final Optional<Block> block = blockchain.getBlockByNumber(blockNumber);
checkState(
block.isPresent(), "Block data for %s was not found in the archive", blockNumber);
final Optional<List<TransactionReceipt>> receipts =
blockchain.getTxReceipts(block.get().getHash());
checkState(
receipts.isPresent(), "Receipts for %s was not found in the archive", blockNumber);
final BytesValueRLPOutput headerOutput = new BytesValueRLPOutput();
block.get().getHeader().writeTo(headerOutput);
headerWriter.writeBytes(headerOutput.encoded().toArrayUnsafe());
final BytesValueRLPOutput bodyOutput = new BytesValueRLPOutput();
block.get().getBody().writeWrappedBodyTo(bodyOutput);
bodyWriter.writeBytes(bodyOutput.encoded().toArrayUnsafe());
final BytesValueRLPOutput receiptsOutput = new BytesValueRLPOutput();
receiptsOutput.writeList(receipts.get(), (r, rlpOut) -> r.writeToForStorage(rlpOut, false));
receiptsWriter.writeBytes(receiptsOutput.encoded().toArrayUnsafe());
backupStatus.storedBlock = blockNumber;
}
}
}
private TrieIterator.State visitAccountStorage(
final Bytes32 nodeKey, final Node<Bytes> node, final RollingFileWriter accountFileWriter) {
backupStatus.currentStorage = nodeKey;
final BytesValueRLPOutput output = new BytesValueRLPOutput();
output.startList();
output.writeBytes(nodeKey);
output.writeBytes(node.getValue().orElse(Bytes.EMPTY));
output.endList();
try {
accountFileWriter.writeBytes(output.encoded().toArrayUnsafe());
} catch (final IOException ioe) {
LOG.error("Failure writing backup", ioe);
return State.STOP;
}
backupStatus.storageCount.incrementAndGet();
return State.CONTINUE;
}
public static final class BackupStatus {
long targetBlock;
long storedBlock;
boolean compressed;
Bytes32 currentAccount;
Bytes32 currentStorage;
AtomicLong accountCount = new AtomicLong(0);
AtomicLong codeSize = new AtomicLong(0);
AtomicLong storageCount = new AtomicLong(0);
boolean requestAccepted;
@JsonGetter
public String getTargetBlock() {
return "0x" + Long.toHexString(targetBlock);
}
@JsonGetter
public String getStoredBlock() {
return "0x" + Long.toHexString(storedBlock);
}
@JsonGetter
public String getCurrentAccount() {
return currentAccount.toHexString();
}
@JsonGetter
public String getCurrentStorage() {
return currentStorage.toHexString();
}
@JsonGetter
public boolean isBackingUp() {
return currentAccount != null;
}
@JsonIgnore
public long getAccountCount() {
return accountCount.get();
}
@JsonIgnore
public long getCodeSize() {
return codeSize.get();
}
@JsonIgnore
public long getStorageCount() {
return storageCount.get();
}
@JsonIgnore
public Bytes getCurrentAccountBytes() {
return currentAccount;
}
@JsonIgnore
public long getStoredBlockNum() {
return storedBlock;
}
@JsonIgnore
public long getTargetBlockNum() {
return targetBlock;
}
}
}