EthScheduler.java

/*
 * Copyright ConsenSys AG.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.ethereum.eth.manager;

import static org.hyperledger.besu.util.FutureUtils.propagateResult;

import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.pipeline.Pipeline;
import org.hyperledger.besu.util.ExceptionUtils;

import java.time.Duration;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EthScheduler {
  private static final Logger LOG = LoggerFactory.getLogger(EthScheduler.class);

  private final Duration defaultTimeout = Duration.ofSeconds(5);
  private final AtomicBoolean stopped = new AtomicBoolean(false);
  private final CountDownLatch shutdown = new CountDownLatch(1);
  private static final int TX_WORKER_CAPACITY = 1_000;

  protected final ExecutorService syncWorkerExecutor;
  protected final ScheduledExecutorService scheduler;
  protected final ExecutorService txWorkerExecutor;
  protected final ExecutorService servicesExecutor;
  protected final ExecutorService computationExecutor;
  protected final ExecutorService blockCreationExecutor;

  private final Collection<CompletableFuture<?>> pendingFutures = new ConcurrentLinkedDeque<>();

  public EthScheduler(
      final int syncWorkerCount,
      final int txWorkerCount,
      final int computationWorkerCount,
      final MetricsSystem metricsSystem) {
    this(syncWorkerCount, txWorkerCount, TX_WORKER_CAPACITY, computationWorkerCount, metricsSystem);
  }

  public EthScheduler(
      final int syncWorkerCount,
      final int txWorkerCount,
      final int txWorkerQueueSize,
      final int computationWorkerCount,
      final MetricsSystem metricsSystem) {
    this(
        MonitoredExecutors.newFixedThreadPool(
            EthScheduler.class.getSimpleName() + "-Workers", 1, syncWorkerCount, metricsSystem),
        MonitoredExecutors.newScheduledThreadPool(
            EthScheduler.class.getSimpleName() + "-Timer", 1, metricsSystem),
        MonitoredExecutors.newBoundedThreadPool(
            EthScheduler.class.getSimpleName() + "-Transactions",
            1,
            txWorkerCount,
            txWorkerQueueSize,
            metricsSystem),
        MonitoredExecutors.newCachedThreadPool(
            EthScheduler.class.getSimpleName() + "-Services", metricsSystem),
        MonitoredExecutors.newFixedThreadPool(
            EthScheduler.class.getSimpleName() + "-Computation",
            1,
            computationWorkerCount,
            metricsSystem),
        MonitoredExecutors.newCachedThreadPool(
            EthScheduler.class.getSimpleName() + "-BlockCreation", metricsSystem));
  }

  protected EthScheduler(
      final ExecutorService syncWorkerExecutor,
      final ScheduledExecutorService scheduler,
      final ExecutorService txWorkerExecutor,
      final ExecutorService servicesExecutor,
      final ExecutorService computationExecutor,
      final ExecutorService blockCreationExecutor) {
    this.syncWorkerExecutor = syncWorkerExecutor;
    this.scheduler = scheduler;
    this.txWorkerExecutor = txWorkerExecutor;
    this.servicesExecutor = servicesExecutor;
    this.computationExecutor = computationExecutor;
    this.blockCreationExecutor = blockCreationExecutor;
  }

  public <T> CompletableFuture<T> scheduleSyncWorkerTask(
      final Supplier<CompletableFuture<T>> future) {
    final CompletableFuture<T> promise = new CompletableFuture<>();
    final Future<?> workerFuture =
        syncWorkerExecutor.submit(() -> propagateResult(future, promise));
    // If returned promise is cancelled, cancel the worker future
    promise.whenComplete(
        (r, t) -> {
          if (t instanceof CancellationException) {
            workerFuture.cancel(false);
          }
        });
    return promise;
  }

  public void scheduleSyncWorkerTask(final Runnable command) {
    syncWorkerExecutor.execute(command);
  }

  public <T> CompletableFuture<T> scheduleSyncWorkerTask(final EthTask<T> task) {
    final CompletableFuture<T> syncFuture = task.runAsync(syncWorkerExecutor);
    pendingFutures.add(syncFuture);
    syncFuture.whenComplete((r, t) -> pendingFutures.remove(syncFuture));
    return syncFuture;
  }

  public void scheduleTxWorkerTask(final Runnable command) {
    txWorkerExecutor.execute(command);
  }

  public void executeServiceTask(final Runnable command) {
    servicesExecutor.execute(command);
  }

  public <T> CompletableFuture<Void> scheduleServiceTask(final Runnable task) {
    return CompletableFuture.runAsync(task, servicesExecutor);
  }

  public <T> CompletableFuture<T> scheduleServiceTask(final EthTask<T> task) {
    final CompletableFuture<T> serviceFuture = task.runAsync(servicesExecutor);
    pendingFutures.add(serviceFuture);
    serviceFuture.whenComplete((r, t) -> pendingFutures.remove(serviceFuture));
    return serviceFuture;
  }

  public CompletableFuture<Void> startPipeline(final Pipeline<?> pipeline) {
    final CompletableFuture<Void> pipelineFuture = pipeline.start(servicesExecutor);
    pendingFutures.add(pipelineFuture);
    pipelineFuture.whenComplete((r, t) -> pendingFutures.remove(pipelineFuture));
    return pipelineFuture;
  }

  public <T> CompletableFuture<T> scheduleComputationTask(final Supplier<T> computation) {
    return CompletableFuture.supplyAsync(computation, computationExecutor);
  }

  public CompletableFuture<Void> scheduleFutureTask(
      final Runnable command, final Duration duration) {
    final CompletableFuture<Void> promise = new CompletableFuture<>();
    final ScheduledFuture<?> scheduledFuture =
        scheduler.schedule(
            () -> {
              try {
                command.run();
                promise.complete(null);
              } catch (final Throwable t) {
                promise.completeExceptionally(t);
              }
            },
            duration.toMillis(),
            TimeUnit.MILLISECONDS);
    // If returned promise is cancelled, cancel scheduled task
    promise.whenComplete(
        (r, t) -> {
          if (t instanceof CancellationException) {
            scheduledFuture.cancel(false);
          }
        });
    return promise;
  }

  public ScheduledFuture<?> scheduleFutureTaskWithFixedDelay(
      final Runnable command, final Duration initialDelay, final Duration duration) {
    return scheduler.scheduleWithFixedDelay(
        command, initialDelay.toMillis(), duration.toMillis(), TimeUnit.MILLISECONDS);
  }

  public <T> CompletableFuture<T> scheduleFutureTask(
      final Supplier<CompletableFuture<T>> future, final Duration duration) {
    final CompletableFuture<T> promise = new CompletableFuture<>();
    final ScheduledFuture<?> scheduledFuture =
        scheduler.schedule(
            () -> propagateResult(future, promise), duration.toMillis(), TimeUnit.MILLISECONDS);
    // If returned promise is cancelled, cancel scheduled task
    promise.whenComplete(
        (r, t) -> {
          if (t instanceof CancellationException) {
            scheduledFuture.cancel(false);
          }
        });
    return promise;
  }

  public CompletableFuture<Void> scheduleBlockCreationTask(final Runnable task) {
    return CompletableFuture.runAsync(task, blockCreationExecutor);
  }

  public <T> CompletableFuture<T> timeout(final EthTask<T> task) {
    return timeout(task, defaultTimeout);
  }

  public <T> CompletableFuture<T> timeout(final EthTask<T> task, final Duration timeout) {
    final CompletableFuture<T> future = task.run();
    final CompletableFuture<T> result = timeout(future, timeout);
    result.whenComplete(
        (r, error) -> {
          if (errorIsTimeoutOrCancellation(error)) {
            task.cancel();
          }
        });
    return result;
  }

  private boolean errorIsTimeoutOrCancellation(final Throwable error) {
    final Throwable cause = ExceptionUtils.rootCause(error);
    return cause instanceof TimeoutException || cause instanceof CancellationException;
  }

  private <T> CompletableFuture<T> timeout(
      final CompletableFuture<T> future, final Duration delay) {
    final CompletableFuture<T> timeout = failAfterTimeout(delay);
    return future.applyToEither(timeout, Function.identity());
  }

  public void stop() {
    if (stopped.compareAndSet(false, true)) {
      LOG.trace("Stopping " + getClass().getSimpleName());
      syncWorkerExecutor.shutdownNow();
      txWorkerExecutor.shutdownNow();
      scheduler.shutdownNow();
      servicesExecutor.shutdownNow();
      computationExecutor.shutdownNow();
      shutdown.countDown();
    } else {
      LOG.trace("Attempted to stop already stopped " + getClass().getSimpleName());
    }
  }

  public void awaitStop() throws InterruptedException {
    shutdown.await();
    pendingFutures.forEach(future -> future.cancel(true));
    if (!syncWorkerExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
      LOG.error("{} worker executor did not shutdown cleanly.", this.getClass().getSimpleName());
    }
    if (!txWorkerExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
      LOG.error(
          "{} transaction worker executor did not shutdown cleanly.",
          this.getClass().getSimpleName());
    }
    if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
      LOG.error("{} scheduler did not shutdown cleanly.", this.getClass().getSimpleName());
      scheduler.shutdownNow();
    }
    if (!servicesExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
      LOG.error("{} services executor did not shutdown cleanly.", this.getClass().getSimpleName());
    }
    if (!computationExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
      LOG.error(
          "{} computation executor did not shutdown cleanly.", this.getClass().getSimpleName());
    }
    LOG.trace("{} stopped.", this.getClass().getSimpleName());
  }

  private <T> CompletableFuture<T> failAfterTimeout(final Duration timeout) {
    final CompletableFuture<T> promise = new CompletableFuture<>();
    failAfterTimeout(promise, timeout);
    return promise;
  }

  public <T> void failAfterTimeout(final CompletableFuture<T> promise, final Duration timeout) {
    final long delay = timeout.toMillis();
    final TimeUnit unit = TimeUnit.MILLISECONDS;
    scheduler.schedule(
        () -> {
          final TimeoutException ex =
              new TimeoutException("Timeout after " + delay + " " + unit.name());
          return promise.completeExceptionally(ex);
        },
        delay,
        unit);
  }

  public <ITEM> OrderedProcessor<ITEM> createOrderedProcessor(final Consumer<ITEM> processor) {
    return new OrderedProcessor<>(processor);
  }

  /**
   * This class is a way to execute a set of tasks, one by one, in a strict order, without blocking
   * the caller in case there are still previous tasks queued
   *
   * @param <ITEM> the class of item to be processed
   */
  public class OrderedProcessor<ITEM> {
    private final Queue<ITEM> blockAddedQueue = new ConcurrentLinkedQueue<>();
    private final ReentrantLock blockAddedLock = new ReentrantLock();
    private final Consumer<ITEM> processor;

    private OrderedProcessor(final Consumer<ITEM> processor) {
      this.processor = processor;
    }

    public void submit(final ITEM item) {
      // add the item to the processing queue
      blockAddedQueue.add(item);

      if (blockAddedLock.hasQueuedThreads()) {
        // another thread is already waiting to process the queue with our item, there is no need to
        // schedule another thread
        LOG.trace(
            "Block added event queue is already being processed and an already queued thread is present, nothing to do");
      } else {
        servicesExecutor.submit(
            () -> {
              blockAddedLock.lock();
              try {
                // now that we have the lock, process as many items as possible
                for (ITEM i = blockAddedQueue.poll(); i != null; i = blockAddedQueue.poll()) {
                  processor.accept(i);
                }
              } finally {
                blockAddedLock.unlock();
              }
            });
      }
    }
  }
}