PipelineBuilder.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.services.pipeline;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Collections.emptyList;
import static java.util.Collections.singleton;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
/**
* Supports building a new pipeline. Pipelines are comprised of a source, various processing stages
* and a consumer, each of which run in their own thread.
*
* <p>The pipeline completes when all items from the source have passed through each stage and are
* received by the consumer. The pipeline will halt immediately if an exception is thrown from any
* processing stage.
*
* @param <I> the type of item input to the very start of this pipeline.
* @param <T> the output type of the last stage in the pipeline.
*/
public class PipelineBuilder<I, T> {
private final Pipe<I> inputPipe;
private final Collection<Stage> stages;
private final Collection<Pipe<?>> pipes;
private final String lastStageName;
private final ReadPipe<T> pipeEnd;
private final int bufferSize;
private final LabelledMetric<Counter> outputCounter;
private final boolean tracingEnabled;
private final String pipelineName;
/**
* Instantiates a new Pipeline builder.
*
* @param inputPipe the input pipe
* @param stages the stages
* @param pipes the pipes
* @param lastStageName the last stage name
* @param pipeEnd the pipe end
* @param bufferSize the buffer size
* @param outputCounter the output counter
* @param tracingEnabled the tracing enabled
* @param pipelineName the pipeline name
*/
public PipelineBuilder(
final Pipe<I> inputPipe,
final Collection<Stage> stages,
final Collection<Pipe<?>> pipes,
final String lastStageName,
final ReadPipe<T> pipeEnd,
final int bufferSize,
final LabelledMetric<Counter> outputCounter,
final boolean tracingEnabled,
final String pipelineName) {
checkArgument(!pipes.isEmpty(), "Must have at least one pipe in a pipeline");
this.lastStageName = lastStageName;
this.outputCounter = outputCounter;
this.inputPipe = inputPipe;
this.stages = stages;
this.pipes = pipes;
this.pipeEnd = pipeEnd;
this.bufferSize = bufferSize;
this.tracingEnabled = tracingEnabled;
this.pipelineName = pipelineName;
}
/**
* Create a new pipeline that processes inputs from <i>source</i>. The pipeline completes when
* <i>source</i> returns <code>false</code> from {@link Iterator#hasNext()} and the last item has
* been reached the end of the pipeline.
*
* @param <T> the type of items input into the pipeline.
* @param sourceName the name of this stage. Used as the label for the output count metric.
* @param source the source to pull items from for processing.
* @param bufferSize the number of items to be buffered between each stage in the pipeline.
* @param itemCounter the counter to increment for each output of a stage. Must accept two labels,
* the stage name and action (output or drained).
* @param tracingEnabled whether this pipeline should be traced
* @param pipelineName the name of the pipeline for tracing purposes
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public static <T> PipelineBuilder<T, T> createPipelineFrom(
final String sourceName,
final Iterator<T> source,
final int bufferSize,
final LabelledMetric<Counter> itemCounter,
final boolean tracingEnabled,
final String pipelineName) {
final Pipe<T> pipe = createPipe(bufferSize, sourceName, itemCounter);
final IteratorSourceStage<T> sourceStage = new IteratorSourceStage<>(sourceName, source, pipe);
return new PipelineBuilder<>(
pipe,
singleton(sourceStage),
singleton(pipe),
sourceName,
pipe,
bufferSize,
itemCounter,
tracingEnabled,
pipelineName);
}
/**
* Create a new pipeline that processes inputs added to <i>pipe</i>. The pipeline completes when
* <i>pipe</i> is closed and the last item has been reached the end of the pipeline.
*
* @param <T> the type of items input into the pipeline.
* @param sourceName the name of this stage. Used as the label for the output count metric.
* @param bufferSize the number of items to be buffered between each stage in the pipeline.
* @param outputCounter the counter to increment for each output of a stage. Must have a single
* label which will be filled with the stage name.
* @param tracingEnabled whether this pipeline should be traced
* @param pipelineName the name of the pipeline for tracing purposes
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public static <T> PipelineBuilder<T, T> createPipeline(
final String sourceName,
final int bufferSize,
final LabelledMetric<Counter> outputCounter,
final boolean tracingEnabled,
final String pipelineName) {
final Pipe<T> pipe = createPipe(bufferSize, sourceName, outputCounter);
return new PipelineBuilder<>(
pipe,
emptyList(),
singleton(pipe),
sourceName,
pipe,
bufferSize,
outputCounter,
tracingEnabled,
pipelineName);
}
/**
* Adds a 1-to-1 processing stage to the pipeline. A single thread processes each item in the
* pipeline with <i>processor</i> outputting its return value to the next stage.
*
* @param <O> the output type for this processing step.
* @param stageName the name of this stage. Used as the label for the output count metric.
* @param processor the processing to apply to each item.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public <O> PipelineBuilder<I, O> thenProcess(
final String stageName, final Function<T, O> processor) {
final Processor<T, O> singleStepStage = new MapProcessor<>(processor);
return addStage(singleStepStage, stageName);
}
/**
* Adds a 1-to-1 processing stage to the pipeline. Multiple threads process items in the pipeline
* concurrently with <i>processor</i> outputting its return value to the next stage.
*
* <p>Note: The order of items is not preserved.
*
* @param <O> the output type for this processing step.
* @param stageName the name of this stage. Used as the label for the output count metric.
* @param processor the processing to apply to each item.
* @param numberOfThreads the number of threads to use for processing.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public <O> PipelineBuilder<I, O> thenProcessInParallel(
final String stageName, final Function<T, O> processor, final int numberOfThreads) {
return thenProcessInParallel(
stageName, () -> new MapProcessor<>(processor), numberOfThreads, bufferSize);
}
/**
* Adds a 1-to-1, asynchronous processing stage to the pipeline. A single thread reads items from
* the input and calls <i>processor</i> to begin processing. While a single thread is used to
* begin processing, up to <i>maxConcurrency</i> items may be in progress concurrently. When the
* returned {@link CompletableFuture} completes successfully the result is passed to the next
* stage.
*
* <p>If the returned {@link CompletableFuture} completes exceptionally the pipeline will abort.
*
* <p>Note: The order of items is not preserved.
*
* @param <O> the output type for this processing step.
* @param stageName the name of this stage. Used as the label for the output count metric.
* @param processor the processing to apply to each item.
* @param maxConcurrency the maximum number of items being processed concurrently.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public <O> PipelineBuilder<I, O> thenProcessAsync(
final String stageName,
final Function<T, CompletableFuture<O>> processor,
final int maxConcurrency) {
return addStage(new AsyncOperationProcessor<>(processor, maxConcurrency, false), stageName);
}
/**
* Adds a 1-to-1, asynchronous processing stage to the pipeline. A single thread reads items from
* the input and calls <i>processor</i> to begin processing. While a single thread is used to
* begin processing, up to <i>maxConcurrency</i> items may be in progress concurrently. As each
* returned {@link CompletableFuture} completes successfully the result is passed to the next
* stage in order.
*
* <p>If the returned {@link CompletableFuture} completes exceptionally the pipeline will abort.
*
* <p>Note: While processing may occur concurrently, order is preserved when results are output.
*
* @param <O> the output type for this processing step.
* @param stageName the name of this stage. Used as the label for the output count metric.
* @param processor the processing to apply to each item.
* @param maxConcurrency the maximum number of items being processed concurrently.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public <O> PipelineBuilder<I, O> thenProcessAsyncOrdered(
final String stageName,
final Function<T, CompletableFuture<O>> processor,
final int maxConcurrency) {
return addStage(new AsyncOperationProcessor<>(processor, maxConcurrency, true), stageName);
}
/**
* Batches items into groups of at most <i>maximumBatchSize</i>. Batches are created eagerly to
* minimize delay so may not be full.
*
* <p>Order of items is preserved.
*
* <p>The output buffer size is reduced to <code>bufferSize / maximumBatchSize + 1</code>.
*
* @param maximumBatchSize the maximum number of items to include in a batch.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public PipelineBuilder<I, List<T>> inBatches(final int maximumBatchSize) {
checkArgument(maximumBatchSize > 0, "Maximum batch size must be greater than 0");
return new PipelineBuilder<>(
inputPipe,
stages,
pipes,
lastStageName,
new BatchingReadPipe<>(
pipeEnd,
maximumBatchSize,
outputCounter.labels(lastStageName + "_outputPipe", "batches")),
(int) Math.ceil(((double) bufferSize) / maximumBatchSize),
outputCounter,
tracingEnabled,
pipelineName);
}
/**
* Batches items into groups of at most <i>maximumBatchSize</i>. Batches are created eagerly to
* minimize delay so may not be full.
*
* <p>Order of items is preserved.
*
* <p>The output buffer size is reduced to <code>bufferSize / maximumBatchSize + 1</code>.
*
* @param maximumBatchSize the maximum number of items to include in a batch.
* @param stopBatchCondition the condition before ending the batch
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public PipelineBuilder<I, List<T>> inBatches(
final int maximumBatchSize, final Function<List<T>, Integer> stopBatchCondition) {
return new PipelineBuilder<>(
inputPipe,
stages,
pipes,
lastStageName,
new BatchingReadPipe<>(
pipeEnd,
maximumBatchSize,
outputCounter.labels(lastStageName + "_outputPipe", "batches"),
stopBatchCondition),
(int) Math.ceil(((double) bufferSize) / maximumBatchSize),
outputCounter,
tracingEnabled,
pipelineName);
}
/**
* Adds a 1-to-many processing stage to the pipeline. For each item in the stream, <i>mapper</i>
* is called and each item of the {@link Stream} it returns is output as an individual item. The
* returned Stream may be empty to remove an item.
*
* <p>This can be used to reverse the effect of {@link #inBatches(int)} with:
*
* <pre>thenFlatMap(List::stream, newBufferSize)</pre>
*
* @param <O> the type of items to be output from this stage.
* @param stageName the name of this stage. Used as the label for the output count metric.
* @param mapper the function to process each item with.
* @param newBufferSize the output buffer size to use from this stage onwards.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public <O> PipelineBuilder<I, O> thenFlatMap(
final String stageName, final Function<T, Stream<O>> mapper, final int newBufferSize) {
return addStage(new FlatMapProcessor<>(mapper), newBufferSize, stageName);
}
/**
* Adds a 1-to-many processing stage to the pipeline. For each item in the stream, <i>mapper</i>
* is called and each item of the {@link Stream} it returns is output as an individual item. The
* returned Stream may be empty to remove an item. Multiple threads process items in the pipeline
* concurrently.
*
* <p>This can be used to reverse the effect of {@link #inBatches(int)} with:
*
* <pre>thenFlatMap(List::stream, newBufferSize)</pre>
*
* @param <O> the type of items to be output from this stage.
* @param stageName the name of this stage. Used as the label for the output count metric.
* @param mapper the function to process each item with.
* @param numberOfThreads the number of threads to use for processing.
* @param newBufferSize the output buffer size to use from this stage onwards.
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public <O> PipelineBuilder<I, O> thenFlatMapInParallel(
final String stageName,
final Function<T, Stream<O>> mapper,
final int numberOfThreads,
final int newBufferSize) {
return thenProcessInParallel(
stageName, () -> new FlatMapProcessor<>(mapper), numberOfThreads, newBufferSize);
}
/**
* End the pipeline with a {@link Consumer} that is the last stage of the pipeline.
*
* @param stageName the name of this stage. Used as the label for the output count metric.
* @param completer the {@link Consumer} that accepts the final output of the pipeline.
* @return the constructed pipeline ready to execute.
*/
public Pipeline<I> andFinishWith(final String stageName, final Consumer<T> completer) {
return new Pipeline<>(
inputPipe,
pipelineName,
tracingEnabled,
stages,
pipes,
new CompleterStage<>(stageName, pipeEnd, completer));
}
private <O> PipelineBuilder<I, O> thenProcessInParallel(
final String stageName,
final Supplier<Processor<T, O>> createProcessor,
final int numberOfThreads,
final int newBufferSize) {
final Pipe<O> newPipeEnd = createPipe(newBufferSize, stageName, outputCounter);
final WritePipe<O> outputPipe = new SharedWritePipe<>(newPipeEnd, numberOfThreads);
final ArrayList<Stage> newStages = new ArrayList<>(stages);
for (int i = 0; i < numberOfThreads; i++) {
final Stage processStage =
new ProcessingStage<>(stageName, pipeEnd, outputPipe, createProcessor.get());
newStages.add(processStage);
}
return new PipelineBuilder<>(
inputPipe,
newStages,
concat(pipes, newPipeEnd),
stageName,
newPipeEnd,
newBufferSize,
outputCounter,
tracingEnabled,
pipelineName);
}
private <O> PipelineBuilder<I, O> addStage(
final Processor<T, O> processor, final String stageName) {
return addStage(processor, bufferSize, stageName);
}
private <O> PipelineBuilder<I, O> addStage(
final Processor<T, O> processor, final int newBufferSize, final String stageName) {
final Pipe<O> outputPipe = createPipe(newBufferSize, stageName, outputCounter);
final Stage processStage = new ProcessingStage<>(stageName, pipeEnd, outputPipe, processor);
final List<Stage> newStages = concat(stages, processStage);
return new PipelineBuilder<>(
inputPipe,
newStages,
concat(pipes, outputPipe),
processStage.getName(),
outputPipe,
newBufferSize,
outputCounter,
tracingEnabled,
pipelineName);
}
private <X> List<X> concat(final Collection<X> existing, final X newItem) {
final List<X> newList = new ArrayList<>(existing);
newList.add(newItem);
return newList;
}
private static <O> Pipe<O> createPipe(
final int newBufferSize,
final String stageName,
final LabelledMetric<Counter> outputCounter) {
final String labelName = stageName + "_outputPipe";
return new Pipe<>(
newBufferSize,
outputCounter.labels(labelName, "added"),
outputCounter.labels(labelName, "removed"),
outputCounter.labels(labelName, "aborted"));
}
}