CheckpointSyncDownloadPipelineFactory.java
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.checkpointsync;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloadPipelineFactory;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.Checkpoint;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncTarget;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.pipeline.Pipeline;
import org.hyperledger.besu.services.pipeline.PipelineBuilder;
import java.util.concurrent.CompletionStage;
public class CheckpointSyncDownloadPipelineFactory extends FastSyncDownloadPipelineFactory {
public CheckpointSyncDownloadPipelineFactory(
final SynchronizerConfiguration syncConfig,
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext,
final FastSyncState fastSyncState,
final MetricsSystem metricsSystem) {
super(syncConfig, protocolSchedule, protocolContext, ethContext, fastSyncState, metricsSystem);
}
@Override
public CompletionStage<Void> startPipeline(
final EthScheduler scheduler,
final SyncState syncState,
final SyncTarget syncTarget,
final Pipeline<?> pipeline) {
return scheduler
.startPipeline(createDownloadCheckPointPipeline(syncState, syncTarget))
.thenCompose(unused -> scheduler.startPipeline(pipeline));
}
protected Pipeline<Hash> createDownloadCheckPointPipeline(
final SyncState syncState, final SyncTarget target) {
final Checkpoint checkpoint = syncState.getCheckpoint().orElseThrow();
final BlockHeader checkpointBlockHeader = target.peer().getCheckpointHeader().orElseThrow();
final CheckpointSource checkPointSource =
new CheckpointSource(
syncState,
checkpointBlockHeader,
protocolSchedule
.getByBlockHeader(checkpointBlockHeader)
.getBlockHeaderFunctions()
.getCheckPointWindowSize(checkpointBlockHeader));
final CheckpointBlockImportStep checkPointBlockImportStep =
new CheckpointBlockImportStep(
checkPointSource, checkpoint, protocolContext.getBlockchain());
final CheckpointDownloadBlockStep checkPointDownloadBlockStep =
new CheckpointDownloadBlockStep(protocolSchedule, ethContext, checkpoint, metricsSystem);
return PipelineBuilder.createPipelineFrom(
"fetchCheckpoints",
checkPointSource,
1,
metricsSystem.createLabelledCounter(
BesuMetricCategory.SYNCHRONIZER,
"chain_download_pipeline_processed_total",
"Number of header process by each chain download pipeline stage",
"step",
"action"),
true,
"checkpointSync")
.thenProcessAsyncOrdered("downloadBlock", checkPointDownloadBlockStep::downloadBlock, 1)
.andFinishWith("importBlock", checkPointBlockImportStep);
}
@Override
protected BlockHeader getCommonAncestor(final SyncTarget target) {
return target
.peer()
.getCheckpointHeader()
.filter(checkpoint -> checkpoint.getNumber() > target.commonAncestor().getNumber())
.orElse(target.commonAncestor());
}
}