AbstractEthTask.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.task;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import java.util.Collection;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
public abstract class AbstractEthTask<T> implements EthTask<T> {
private double taskTimeInSec = -1.0D;
private final OperationTimer taskTimer;
protected final CompletableFuture<T> result = new CompletableFuture<>();
private final AtomicBoolean started = new AtomicBoolean(false);
private final Collection<CompletableFuture<?>> subTaskFutures = new ConcurrentLinkedDeque<>();
protected AbstractEthTask(final MetricsSystem metricsSystem) {
this.taskTimer = buildOperationTimer(metricsSystem, getClass().getSimpleName());
}
protected AbstractEthTask(final OperationTimer taskTimer) {
this.taskTimer = taskTimer;
}
private static OperationTimer buildOperationTimer(
final MetricsSystem metricsSystem, final String taskName) {
final LabelledMetric<OperationTimer> ethTasksTimer =
metricsSystem.createLabelledTimer(
BesuMetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName");
if (ethTasksTimer == NoOpMetricsSystem.NO_OP_LABELLED_1_OPERATION_TIMER) {
return () ->
new OperationTimer.TimingContext() {
final Stopwatch stopwatch = Stopwatch.createStarted();
@Override
public double stopTimer() {
return stopwatch.elapsed(TimeUnit.MILLISECONDS) / 1000.0;
}
};
} else {
return ethTasksTimer.labels(taskName);
}
}
@Override
public final CompletableFuture<T> run() {
if (!result.isDone() && started.compareAndSet(false, true)) {
executeTaskTimed();
result.whenComplete((r, t) -> cleanup());
}
return result;
}
@Override
public final CompletableFuture<T> runAsync(final ExecutorService executor) {
if (!result.isDone() && started.compareAndSet(false, true)) {
executor.execute(this::executeTaskTimed);
result.whenComplete((r, t) -> cleanup());
}
return result;
}
@Override
public final void cancel() {
result.cancel(false);
}
@VisibleForTesting
public final boolean isDone() {
return result.isDone();
}
public final boolean isFailed() {
return result.isCompletedExceptionally();
}
@VisibleForTesting
public final boolean isCancelled() {
return result.isCancelled();
}
/**
* Utility for executing completable futures that handles cleanup if this EthTask is cancelled.
*
* @param subTask a subTask to execute
* @param <S> the type of data returned from the CompletableFuture
* @return The completableFuture that was executed
*/
protected final <S> CompletableFuture<S> executeSubTask(
final Supplier<CompletableFuture<S>> subTask) {
synchronized (result) {
if (!isCancelled()) {
final CompletableFuture<S> subTaskFuture = subTask.get();
subTaskFutures.add(subTaskFuture);
subTaskFuture.whenComplete((r, t) -> subTaskFutures.remove(subTaskFuture));
return subTaskFuture;
} else {
return CompletableFuture.failedFuture(new CancellationException());
}
}
}
/**
* Helper method for sending subTask to worker that will clean up if this EthTask is cancelled.
*
* @param scheduler the scheduler that will run worker task
* @param subTask a subTask to execute
* @param <S> the type of data returned from the CompletableFuture
* @return The completableFuture that was executed
*/
protected final <S> CompletableFuture<S> executeWorkerSubTask(
final EthScheduler scheduler, final Supplier<CompletableFuture<S>> subTask) {
return executeSubTask(() -> scheduler.scheduleSyncWorkerTask(subTask));
}
/** Execute core task logic. */
protected abstract void executeTask();
/** Executes the task while timed by a timer. */
public void executeTaskTimed() {
final OperationTimer.TimingContext timingContext = taskTimer.startTimer();
try {
executeTask();
} finally {
taskTimeInSec = timingContext.stopTimer();
}
}
public double getTaskTimeInSec() {
return taskTimeInSec;
}
/** Cleanup any resources when task completes. */
protected void cleanup() {
for (final CompletableFuture<?> subTaskFuture : subTaskFutures) {
subTaskFuture.cancel(false);
}
}
}