SyncState.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.state;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.chain.ChainHead;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.DefaultSyncStatus;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.core.Synchronizer.InSyncListener;
import org.hyperledger.besu.ethereum.eth.manager.ChainHeadEstimate;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.Checkpoint;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloadStatus;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.services.BesuEvents.InitialSyncCompletionListener;
import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener;
import org.hyperledger.besu.plugin.services.BesuEvents.TTDReachedListener;
import org.hyperledger.besu.util.Subscribers;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public class SyncState {
private final Blockchain blockchain;
private final EthPeers ethPeers;
private final AtomicLong inSyncSubscriberId = new AtomicLong();
private final Map<Long, InSyncTracker> inSyncTrackers = new ConcurrentHashMap<>();
private final Subscribers<SyncStatusListener> syncStatusListeners = Subscribers.create();
private final Subscribers<TTDReachedListener> ttdReachedListeners = Subscribers.create();
private final Subscribers<InitialSyncCompletionListener> completionListenerSubscribers =
Subscribers.create();
private volatile long chainHeightListenerId;
private volatile Optional<SyncTarget> syncTarget = Optional.empty();
private Optional<WorldStateDownloadStatus> worldStateDownloadStatus = Optional.empty();
private Optional<Long> newPeerListenerId;
private Optional<Boolean> reachedTerminalDifficulty = Optional.empty();
private final Optional<Checkpoint> checkpoint;
private volatile boolean isInitialSyncPhaseDone;
private volatile boolean isResyncNeeded;
private Optional<Address> maybeAccountToRepair = Optional.empty();
public SyncState(final Blockchain blockchain, final EthPeers ethPeers) {
this(blockchain, ethPeers, false, Optional.empty());
}
public SyncState(
final Blockchain blockchain,
final EthPeers ethPeers,
final boolean hasInitialSyncPhase,
final Optional<Checkpoint> checkpoint) {
this.blockchain = blockchain;
this.ethPeers = ethPeers;
isInitialSyncPhaseDone = !hasInitialSyncPhase;
blockchain.observeBlockAdded(
event -> {
if (event.isNewCanonicalHead()) {
checkInSync();
}
});
// Add new peer listener to prevent permissioned PoA network stalling on start-up.
// https://github.com/hyperledger/besu/issues/528
newPeerListenerId =
Optional.of(
ethPeers.subscribeConnect(
newPeer -> {
if (newPeer.readyForRequests()) {
checkInSync();
}
}));
this.checkpoint = checkpoint;
}
/**
* Add a listener that will be notified when this node's sync status changes. A node is considered
* in-sync if the local chain height is no more than {@code SYNC_TOLERANCE} behind the highest
* estimated remote chain height.
*
* @param listener The callback to invoke when the sync status changes
* @return An {@code Unsubscriber} that can be used to stop listening for these events
*/
public long subscribeInSync(final InSyncListener listener) {
return subscribeInSync(listener, Synchronizer.DEFAULT_IN_SYNC_TOLERANCE);
}
/**
* Add a listener that will be notified when this node's sync status changes. A node is considered
* in-sync if the local chain height is no more than {@code syncTolerance} behind the highest
* estimated remote chain height.
*
* @param listener The callback to invoke when the sync status changes
* @param syncTolerance The tolerance used to determine whether this node is in-sync. A value of
* zero means that the node is considered in-sync only when the local chain height is greater
* than or equal to the best estimated remote chain height.
* @return An {@code Unsubscriber} that can be used to stop listening for these events
*/
public long subscribeInSync(final InSyncListener listener, final long syncTolerance) {
final InSyncTracker inSyncTracker = InSyncTracker.create(listener, syncTolerance);
final long id = inSyncSubscriberId.incrementAndGet();
inSyncTrackers.put(id, inSyncTracker);
return id;
}
public boolean unsubscribeInSync(final long subscriberId) {
return inSyncTrackers.remove(subscriberId) != null;
}
public long subscribeSyncStatus(final SyncStatusListener listener) {
return syncStatusListeners.subscribe(listener);
}
public long subscribeTTDReached(final TTDReachedListener listener) {
return ttdReachedListeners.subscribe(listener);
}
public long subscribeCompletionReached(final InitialSyncCompletionListener listener) {
return completionListenerSubscribers.subscribe(listener);
}
public boolean unsubscribeSyncStatus(final long listenerId) {
return syncStatusListeners.unsubscribe(listenerId);
}
public boolean unsubscribeTTDReached(final long listenerId) {
return ttdReachedListeners.unsubscribe(listenerId);
}
public boolean unsubscribeInitialConditionReached(final long listenerId) {
return completionListenerSubscribers.unsubscribe(listenerId);
}
public Optional<SyncStatus> syncStatus() {
return syncStatus(syncTarget);
}
public Optional<SyncTarget> syncTarget() {
return syncTarget;
}
public void setSyncTarget(final EthPeer peer, final BlockHeader commonAncestor) {
final SyncTarget syncTarget = new SyncTarget(peer, commonAncestor);
replaceSyncTarget(Optional.of(syncTarget));
}
public void setWorldStateDownloadStatus(final WorldStateDownloadStatus worldStateDownloadStatus) {
this.worldStateDownloadStatus = Optional.ofNullable(worldStateDownloadStatus);
}
public boolean isInSync() {
return isInSync(Synchronizer.DEFAULT_IN_SYNC_TOLERANCE);
}
public boolean isInSync(final long syncTolerance) {
return isInSync(
getLocalChainHead(), getSyncTargetChainHead(), getBestPeerChainHead(), syncTolerance);
}
public void setReachedTerminalDifficulty(final boolean stoppedAtTerminalDifficulty) {
this.reachedTerminalDifficulty = Optional.of(stoppedAtTerminalDifficulty);
ttdReachedListeners.forEach(listener -> listener.onTTDReached(stoppedAtTerminalDifficulty));
}
public Optional<Boolean> hasReachedTerminalDifficulty() {
if (isInitialSyncPhaseDone) {
return reachedTerminalDifficulty;
}
return Optional.of(Boolean.FALSE);
}
private boolean isInSync(
final ChainHead localChain,
final Optional<ChainHeadEstimate> syncTargetChain,
final Optional<ChainHeadEstimate> bestPeerChain,
final long syncTolerance) {
return isInitialSyncPhaseDone
&& reachedTerminalDifficulty.orElse(true)
// Sync target may be temporarily empty while we switch sync targets during a sync, so
// check both the sync target and our best peer to determine if we're in sync or not
&& isInSync(localChain, syncTargetChain, syncTolerance)
&& isInSync(localChain, bestPeerChain, syncTolerance);
}
private boolean isInSync(
final ChainHead localChain,
final Optional<ChainHeadEstimate> remoteChain,
final long syncTolerance) {
return remoteChain
.map(remoteState -> InSyncTracker.isInSync(localChain, remoteState, syncTolerance))
.orElse(true);
}
private ChainHead getLocalChainHead() {
return blockchain.getChainHead();
}
private Optional<ChainHeadEstimate> getSyncTargetChainHead() {
return syncTarget.map(SyncTarget::peer).map(EthPeer::chainStateSnapshot);
}
private Optional<ChainHeadEstimate> getBestPeerChainHead() {
return ethPeers.bestPeerWithHeightEstimate().map(EthPeer::chainStateSnapshot);
}
public void disconnectSyncTarget(final DisconnectReason reason) {
syncTarget.ifPresent(syncTarget -> syncTarget.peer().disconnect(reason));
}
public void clearSyncTarget() {
replaceSyncTarget(Optional.empty());
}
private synchronized void replaceSyncTarget(final Optional<SyncTarget> newTarget) {
if (syncTarget.equals(newTarget)) {
// Nothing to do
return;
}
syncTarget.ifPresent(this::removeEstimatedHeightListener);
syncTarget = newTarget;
newTarget.ifPresent(this::addEstimatedHeightListener);
publishSyncStatus(newTarget);
checkInSync();
}
private void publishSyncStatus(final Optional<SyncTarget> newTarget) {
final Optional<SyncStatus> syncStatus = syncStatus(newTarget);
syncStatusListeners.forEach(c -> c.onSyncStatusChanged(syncStatus));
}
private Optional<SyncStatus> syncStatus(final Optional<SyncTarget> maybeTarget) {
return maybeTarget.map(
target -> {
final long chainHeadBlockNumber = blockchain.getChainHeadBlockNumber();
final long commonAncestor = target.commonAncestor().getNumber();
final long highestKnownBlock = bestChainHeight(chainHeadBlockNumber);
return new DefaultSyncStatus(
commonAncestor,
chainHeadBlockNumber,
highestKnownBlock,
worldStateDownloadStatus.flatMap(WorldStateDownloadStatus::getPulledStates),
worldStateDownloadStatus.flatMap(WorldStateDownloadStatus::getKnownStates));
});
}
private void removeEstimatedHeightListener(final SyncTarget target) {
target.removePeerChainEstimatedHeightListener(chainHeightListenerId);
}
private void addEstimatedHeightListener(final SyncTarget target) {
chainHeightListenerId =
target.addPeerChainEstimatedHeightListener(estimatedHeight -> checkInSync());
}
public long getLocalChainHeight() {
return blockchain.getChainHeadBlockNumber();
}
public long bestChainHeight() {
final long localChainHeight = blockchain.getChainHeadBlockNumber();
return bestChainHeight(localChainHeight);
}
public long bestChainHeight(final long localChainHeight) {
return Math.max(
localChainHeight,
ethPeers
.bestPeerWithHeightEstimate()
.map(p -> p.chainState().getEstimatedHeight())
.orElse(localChainHeight));
}
private synchronized void checkInSync() {
final ChainHead localChain = getLocalChainHead();
final Optional<ChainHeadEstimate> syncTargetChain = getSyncTargetChainHead();
final Optional<ChainHeadEstimate> bestPeerChain = getBestPeerChainHead();
// Remove listener when we've found a peer.
newPeerListenerId.ifPresent(
listenerId -> {
ethPeers.unsubscribeConnect(listenerId);
newPeerListenerId = Optional.empty();
});
inSyncTrackers
.values()
.forEach(
(syncTracker) -> syncTracker.checkState(localChain, syncTargetChain, bestPeerChain));
}
public Optional<Checkpoint> getCheckpoint() {
return checkpoint;
}
public boolean isInitialSyncPhaseDone() {
return isInitialSyncPhaseDone;
}
public void markInitialSyncPhaseAsDone() {
isInitialSyncPhaseDone = true;
isResyncNeeded = false;
completionListenerSubscribers.forEach(InitialSyncCompletionListener::onInitialSyncCompleted);
}
public boolean isResyncNeeded() {
return isResyncNeeded;
}
public void markResyncNeeded() {
isResyncNeeded = true;
}
public Optional<Address> getAccountToRepair() {
return maybeAccountToRepair;
}
public void markAccountToRepair(final Optional<Address> address) {
maybeAccountToRepair = address;
}
public void markInitialSyncRestart() {
isInitialSyncPhaseDone = false;
completionListenerSubscribers.forEach(InitialSyncCompletionListener::onInitialSyncRestart);
}
}