BackwardChain.java
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.backwardsync;
import static org.slf4j.LoggerFactory.getLogger;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderFunctions;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import org.slf4j.Logger;
public class BackwardChain {
private static final Logger LOG = getLogger(BackwardChain.class);
private static final String FIRST_STORED_ANCESTOR_KEY = "firstStoredAncestor";
private static final String LAST_STORED_PIVOT_KEY = "lastStoredPivot";
private final GenericKeyValueStorageFacade<Hash, BlockHeader> headers;
private final GenericKeyValueStorageFacade<Hash, Block> blocks;
private final GenericKeyValueStorageFacade<Hash, Hash> chainStorage;
private final GenericKeyValueStorageFacade<String, BlockHeader> sessionDataStorage;
private Optional<BlockHeader> firstStoredAncestor;
private Optional<BlockHeader> lastStoredPivot;
private final Queue<Hash> hashesToAppend = new ArrayDeque<>();
public BackwardChain(
final GenericKeyValueStorageFacade<Hash, BlockHeader> headersStorage,
final GenericKeyValueStorageFacade<Hash, Block> blocksStorage,
final GenericKeyValueStorageFacade<Hash, Hash> chainStorage,
final GenericKeyValueStorageFacade<String, BlockHeader> sessionDataStorage) {
this.headers = headersStorage;
this.blocks = blocksStorage;
this.chainStorage = chainStorage;
this.sessionDataStorage = sessionDataStorage;
firstStoredAncestor =
sessionDataStorage
.get(FIRST_STORED_ANCESTOR_KEY)
.map(
header -> {
LOG.atDebug()
.setMessage(FIRST_STORED_ANCESTOR_KEY + " loaded from storage with value {}")
.addArgument(header::toLogString)
.log();
return header;
});
lastStoredPivot =
sessionDataStorage
.get(LAST_STORED_PIVOT_KEY)
.map(
header -> {
LOG.atDebug()
.setMessage(LAST_STORED_PIVOT_KEY + " loaded from storage with value {}")
.addArgument(header::toLogString)
.log();
return header;
});
}
public static BackwardChain from(
final StorageProvider storageProvider, final BlockHeaderFunctions blockHeaderFunctions) {
return new BackwardChain(
new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe,
BlocksHeadersConvertor.of(blockHeaderFunctions),
storageProvider.getStorageBySegmentIdentifier(
KeyValueSegmentIdentifier.BACKWARD_SYNC_HEADERS)),
new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe,
BlocksConvertor.of(blockHeaderFunctions),
storageProvider.getStorageBySegmentIdentifier(
KeyValueSegmentIdentifier.BACKWARD_SYNC_BLOCKS)),
new GenericKeyValueStorageFacade<>(
Hash::toArrayUnsafe,
new HashConvertor(),
storageProvider.getStorageBySegmentIdentifier(
KeyValueSegmentIdentifier.BACKWARD_SYNC_CHAIN)),
// using BACKWARD_SYNC_CHAIN that contains the sequence of the work to do,
// to also store the session data that will be used to resume
// the backward sync from where it was left before the restart
new GenericKeyValueStorageFacade<>(
key -> key.getBytes(StandardCharsets.UTF_8),
BlocksHeadersConvertor.of(blockHeaderFunctions),
storageProvider.getStorageBySegmentIdentifier(
KeyValueSegmentIdentifier.BACKWARD_SYNC_CHAIN)));
}
public synchronized Optional<BlockHeader> getFirstAncestorHeader() {
return firstStoredAncestor;
}
public synchronized List<BlockHeader> getFirstNAncestorHeaders(final int size) {
List<BlockHeader> result = new ArrayList<>(size);
Optional<BlockHeader> it = firstStoredAncestor;
while (it.isPresent() && result.size() < size) {
result.add(it.get());
it = chainStorage.get(it.get().getHash()).flatMap(headers::get);
}
return result;
}
public synchronized void prependAncestorsHeader(final BlockHeader blockHeader) {
prependAncestorsHeader(blockHeader, false);
}
public synchronized void prependAncestorsHeader(
final BlockHeader blockHeader, final boolean alreadyStored) {
if (!alreadyStored) {
headers.put(blockHeader.getHash(), blockHeader);
}
if (firstStoredAncestor.isEmpty()) {
updateLastStoredPivot(Optional.of(blockHeader));
} else {
final BlockHeader firstHeader = firstStoredAncestor.get();
chainStorage.put(blockHeader.getHash(), firstHeader.getHash());
LOG.atDebug()
.setMessage("Added header {} to backward chain led by pivot {} on height {}")
.addArgument(blockHeader::toLogString)
.addArgument(() -> lastStoredPivot.orElseThrow().toLogString())
.addArgument(firstHeader::getNumber)
.log();
}
updateFirstStoredAncestor(Optional.of(blockHeader));
}
private void updateFirstStoredAncestor(final Optional<BlockHeader> maybeHeader) {
maybeHeader.ifPresentOrElse(
header -> sessionDataStorage.put(FIRST_STORED_ANCESTOR_KEY, header),
() -> sessionDataStorage.drop(FIRST_STORED_ANCESTOR_KEY));
firstStoredAncestor = maybeHeader;
}
private void updateLastStoredPivot(final Optional<BlockHeader> maybeHeader) {
maybeHeader.ifPresentOrElse(
header -> sessionDataStorage.put(LAST_STORED_PIVOT_KEY, header),
() -> sessionDataStorage.drop(LAST_STORED_PIVOT_KEY));
lastStoredPivot = maybeHeader;
}
public synchronized Optional<Block> getPivot() {
if (lastStoredPivot.isEmpty()) {
return Optional.empty();
}
return blocks.get(lastStoredPivot.get().getHash());
}
public synchronized void dropFirstHeader() {
if (firstStoredAncestor.isEmpty()) {
return;
}
headers.drop(firstStoredAncestor.get().getHash());
final Optional<Hash> hash = chainStorage.get(firstStoredAncestor.get().getHash());
chainStorage.drop(firstStoredAncestor.get().getHash());
updateFirstStoredAncestor(hash.flatMap(headers::get));
if (firstStoredAncestor.isEmpty()) {
updateLastStoredPivot(Optional.empty());
}
}
public synchronized void appendTrustedBlock(final Block newPivot) {
LOG.atDebug().setMessage("Appending trusted block {}").addArgument(newPivot::toLogString).log();
headers.put(newPivot.getHash(), newPivot.getHeader());
blocks.put(newPivot.getHash(), newPivot);
if (lastStoredPivot.isEmpty()) {
updateFirstStoredAncestor(Optional.of(newPivot.getHeader()));
} else {
if (newPivot.getHeader().getParentHash().equals(lastStoredPivot.get().getHash())) {
LOG.atDebug()
.setMessage("Added block {} to backward chain led by pivot {} on height {}")
.addArgument(newPivot::toLogString)
.addArgument(lastStoredPivot.get()::toLogString)
.addArgument(firstStoredAncestor.get()::getNumber)
.log();
chainStorage.put(lastStoredPivot.get().getHash(), newPivot.getHash());
} else {
updateFirstStoredAncestor(Optional.of(newPivot.getHeader()));
LOG.atDebug()
.setMessage("Re-pivoting to new target block {}")
.addArgument(newPivot::toLogString)
.log();
}
}
updateLastStoredPivot(Optional.of(newPivot.getHeader()));
}
public synchronized boolean isTrusted(final Hash hash) {
return blocks.get(hash).isPresent();
}
public synchronized Block getTrustedBlock(final Hash hash) {
return blocks.get(hash).orElseThrow();
}
public synchronized void clear() {
blocks.clear();
headers.clear();
chainStorage.clear();
sessionDataStorage.clear();
firstStoredAncestor = Optional.empty();
lastStoredPivot = Optional.empty();
hashesToAppend.clear();
}
public synchronized Optional<Hash> getDescendant(final Hash blockHash) {
return chainStorage.get(blockHash);
}
public synchronized Optional<Block> getBlock(final Hash hash) {
return blocks.get(hash);
}
public synchronized Optional<BlockHeader> getHeader(final Hash hash) {
return headers.get(hash);
}
public synchronized void addNewHash(final Hash newBlockHash) {
if (hashesToAppend.contains(newBlockHash)) {
return;
}
this.hashesToAppend.add(newBlockHash);
}
public synchronized Optional<Hash> getFirstHashToAppend() {
return Optional.ofNullable(hashesToAppend.peek());
}
public synchronized void removeFromHashToAppend(final Hash hashToRemove) {
if (hashesToAppend.contains(hashToRemove)) {
hashesToAppend.remove(hashToRemove);
}
}
}