Pipeline.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.services.pipeline;
import static java.util.stream.Collectors.toList;
import org.hyperledger.besu.services.pipeline.exception.AsyncOperationException;
import org.hyperledger.besu.util.ExceptionUtils;
import org.hyperledger.besu.util.log.LogUtil;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The Pipeline.
*
* @param <I> the type parameter
*/
public class Pipeline<I> {
private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
private final Pipe<I> inputPipe;
private final Collection<Stage> stages;
private final Collection<Pipe<?>> pipes;
private final CompleterStage<?> completerStage;
private final AtomicBoolean started = new AtomicBoolean(false);
private final Tracer tracer =
GlobalOpenTelemetry.getTracer("org.hyperledger.besu.services.pipeline", "1.0.0");
/**
* Flags that the pipeline is being completed so that when we abort we can close the streams
* without the completion stage then marking the future successful before we finish the abort
* process and mark it as exceptionally completed. We can't just use synchronized because it winds
* up being the same thread coming in via a callback so already has the lock.
*/
private final AtomicBoolean completing = new AtomicBoolean(false);
private final CompletableFuture<Void> overallFuture = new CompletableFuture<>();
private final String name;
private final boolean tracingEnabled;
private volatile List<Future<?>> futures;
/**
* Instantiates a new Pipeline.
*
* @param inputPipe the input pipe
* @param name the name
* @param tracingEnabled the tracing enabled
* @param stages the stages
* @param pipes the pipes
* @param completerStage the completer stage
*/
Pipeline(
final Pipe<I> inputPipe,
final String name,
final boolean tracingEnabled,
final Collection<Stage> stages,
final Collection<Pipe<?>> pipes,
final CompleterStage<?> completerStage) {
this.inputPipe = inputPipe;
this.tracingEnabled = tracingEnabled;
this.name = name;
this.stages = stages;
this.pipes = pipes;
this.completerStage = completerStage;
}
/**
* Get the input pipe for this pipeline.
*
* @return the input pipe.
*/
public Pipe<I> getInputPipe() {
return inputPipe;
}
/**
* Starts execution of the pipeline. Each stage in the pipeline requires a dedicated thread from
* the supplied executor service.
*
* @param executorService the {@link ExecutorService} to execute each stage in.
* @return a future that will be completed when the pipeline completes. If the pipeline fails or
* is aborted the returned future will be completed exceptionally.
*/
public synchronized CompletableFuture<Void> start(final ExecutorService executorService) {
if (!started.compareAndSet(false, true)) {
return overallFuture;
}
futures =
Stream.concat(stages.stream(), Stream.of(completerStage))
.map(task -> runWithErrorHandling(executorService, task))
.collect(toList());
completerStage
.getFuture()
.whenComplete(
(result, error) -> {
if (completing.compareAndSet(false, true)) {
if (error != null) {
overallFuture.completeExceptionally(error);
} else {
overallFuture.complete(null);
}
}
});
overallFuture.exceptionally(
error -> {
if (ExceptionUtils.rootCause(error) instanceof CancellationException) {
abort();
}
return null;
});
return overallFuture;
}
/**
* Abort execution of this pipeline. The future returned by {@link #start(ExecutorService)} will
* be completed with a {@link CancellationException}.
*
* <p>A best effort is made to halt all processing by the pipeline immediately by interrupting
* each execution thread and pipes connecting each stage will no longer accept or provide further
* items.
*/
public void abort() {
final CancellationException exception = new CancellationException("Pipeline aborted");
abort(exception);
}
private Future<?> runWithErrorHandling(final ExecutorService executorService, final Stage task) {
return executorService.submit(
() -> {
Span taskSpan = null;
if (tracingEnabled) {
taskSpan =
tracer
.spanBuilder(task.getName())
.setAttribute("pipeline", name)
.setSpanKind(SpanKind.INTERNAL)
.startSpan();
}
final Thread thread = Thread.currentThread();
final String originalName = thread.getName();
try {
thread.setName(originalName + " (" + task.getName() + ")");
task.run();
} catch (final Throwable t) {
if (tracingEnabled) {
taskSpan.setStatus(StatusCode.ERROR);
}
if (t instanceof CompletionException
|| t instanceof CancellationException
|| t instanceof AsyncOperationException) {
LOG.trace("Unhandled exception in pipeline. Aborting.", t);
} else {
LOG.info(
LogUtil.summarizeBesuStackTrace(
"Unexpected exception in pipeline. Aborting.", t));
LOG.debug("Unexpected exception in pipeline. Aborting.", t);
}
try {
abort(t);
} catch (final Throwable t2) {
// Seems excessive but exceptions that propagate out of this method won't be logged
// because the executor just completes the future exceptionally, and we never
// need to call get on it which would normally expose the error.
LOG.error("Failed to abort pipeline after error", t2);
}
} finally {
if (tracingEnabled) {
taskSpan.end();
}
thread.setName(originalName);
}
});
}
private synchronized void abort(final Throwable error) {
if (completing.compareAndSet(false, true)) {
inputPipe.abort();
pipes.forEach(Pipe::abort);
futures.forEach(future -> future.cancel(true));
overallFuture.completeExceptionally(error);
}
}
}