MockExecutorService.java

/*
 * Copyright ConsenSys AG.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.testutil;

import static org.mockito.Mockito.spy;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

/** The Mock executor service. */
public class MockExecutorService implements ExecutorService {

  private boolean autoRun = true;

  private final List<ExecutorTask<?>> tasks = new ArrayList<>();

  /**
   * Gets futures.
   *
   * @return the futures
   */
  // Test utility for inspecting executor's futures
  public List<Future<?>> getFutures() {
    return tasks.stream().map(ExecutorTask::getFuture).collect(Collectors.toList());
  }

  /**
   * Sets auto run.
   *
   * @param shouldAutoRunTasks the should auto run tasks
   */
  public void setAutoRun(final boolean shouldAutoRunTasks) {
    this.autoRun = shouldAutoRunTasks;
  }

  /** Run pending futures. */
  public void runPendingFutures() {
    final List<ExecutorTask<?>> currentTasks = new ArrayList<>(tasks);
    currentTasks.forEach(ExecutorTask::run);
  }

  /**
   * Gets pending futures count.
   *
   * @return the pending futures count
   */
  public long getPendingFuturesCount() {
    return tasks.stream().filter(ExecutorTask::isPending).count();
  }

  /**
   * Run pending futures in separate threads.
   *
   * @param executorService the executor service
   */
  public void runPendingFuturesInSeparateThreads(final ExecutorService executorService) {
    final List<ExecutorTask<?>> currentTasks = new ArrayList<>(tasks);
    currentTasks.forEach(task -> executorService.execute(task::run));
  }

  @Override
  public void shutdown() {}

  @Override
  public List<Runnable> shutdownNow() {
    return Collections.emptyList();
  }

  @Override
  public boolean isShutdown() {
    return false;
  }

  @Override
  public boolean isTerminated() {
    return false;
  }

  @Override
  public boolean awaitTermination(final long timeout, final TimeUnit unit)
      throws InterruptedException {
    return false;
  }

  @Override
  public <T> Future<T> submit(final Callable<T> task) {
    ExecutorTask<T> execTask = new ExecutorTask<>(task::call);
    tasks.add(execTask);
    if (autoRun) {
      execTask.run();
    }

    return execTask.getFuture();
  }

  @Override
  public <T> Future<T> submit(final Runnable task, final T result) {
    return submit(
        () -> {
          task.run();
          return result;
        });
  }

  @Override
  public Future<?> submit(final Runnable task) {
    return submit(
        () -> {
          task.run();
          return null;
        });
  }

  @Override
  public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks)
      throws InterruptedException {
    throw new UnsupportedOperationException();
  }

  @Override
  public <T> List<Future<T>> invokeAll(
      final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit)
      throws InterruptedException {
    throw new UnsupportedOperationException();
  }

  @Override
  public <T> T invokeAny(final Collection<? extends Callable<T>> tasks)
      throws InterruptedException, ExecutionException {
    throw new UnsupportedOperationException();
  }

  @Override
  public <T> T invokeAny(
      final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit)
      throws InterruptedException, ExecutionException, TimeoutException {
    throw new UnsupportedOperationException();
  }

  @Override
  public void execute(final Runnable command) {
    submit(command);
  }

  private static class ExecutorTask<T> {
    private final CompletableFuture<T> future;
    private final Callable<T> taskRunner;
    private boolean isPending = true;

    private ExecutorTask(final Callable<T> taskRunner) {
      this.future = spy(new CompletableFuture<>());
      this.taskRunner = taskRunner;
    }

    /** Run. */
    public void run() {
      if (!isPending) {
        return;
      }

      isPending = false;
      try {
        T result = taskRunner.call();
        future.complete(result);
      } catch (final Exception e) {
        future.completeExceptionally(e);
      }
    }

    /**
     * Gets future.
     *
     * @return the future
     */
    public CompletableFuture<T> getFuture() {
      return future;
    }

    /**
     * Is pending.
     *
     * @return the boolean
     */
    public boolean isPending() {
      return isPending;
    }
  }
}