DeterministicEthScheduler.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.testutil;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
/** Schedules tasks that run immediately and synchronously for testing. */
public class DeterministicEthScheduler extends EthScheduler {
private final TimeoutPolicy timeoutPolicy;
private final List<MockExecutorService> executors;
private final List<PendingTimeout<?>> pendingTimeouts = new ArrayList<>();
/** Create a new deterministic scheduler that never timeouts */
public DeterministicEthScheduler() {
this(TimeoutPolicy.NEVER_TIMEOUT);
}
/**
* Create a new deterministic scheduler with the provided timeout policy
*
* @param timeoutPolicy the timeout policy
*/
public DeterministicEthScheduler(final TimeoutPolicy timeoutPolicy) {
super(
new MockExecutorService(),
new MockScheduledExecutor(),
new MockExecutorService(),
new MockExecutorService(),
new MockExecutorService(),
new MockExecutorService());
this.timeoutPolicy = timeoutPolicy;
this.executors =
Arrays.asList(
(MockExecutorService) this.syncWorkerExecutor,
(MockExecutorService) this.scheduler,
(MockExecutorService) this.txWorkerExecutor,
(MockExecutorService) this.servicesExecutor,
(MockExecutorService) this.computationExecutor,
(MockExecutorService) this.blockCreationExecutor);
}
/** Test utility for manually running pending futures, when autorun is disabled */
public void runPendingFutures() {
executors.forEach(MockExecutorService::runPendingFutures);
}
/**
* Get the count of pending tasks
*
* @return the count of pending tasks
*/
public long getPendingFuturesCount() {
return executors.stream().mapToLong(MockExecutorService::getPendingFuturesCount).sum();
}
/** Expire all pending timeouts */
public void expirePendingTimeouts() {
final List<PendingTimeout<?>> toExpire = new ArrayList<>(pendingTimeouts);
pendingTimeouts.clear();
toExpire.forEach(PendingTimeout::expire);
}
/** Do not automatically run submitted tasks. Tasks can be later run using runPendingFutures */
public void disableAutoRun() {
executors.forEach(e -> e.setAutoRun(false));
}
/**
* Get the sync worker mock executor
*
* @return the mock executor
*/
public MockExecutorService mockSyncWorkerExecutor() {
return (MockExecutorService) syncWorkerExecutor;
}
/**
* Get the scheduled mock executor
*
* @return the mock executor
*/
public MockScheduledExecutor mockScheduledExecutor() {
return (MockScheduledExecutor) scheduler;
}
/**
* Get the service mock executor
*
* @return the mock executor
*/
public MockExecutorService mockServiceExecutor() {
return (MockExecutorService) servicesExecutor;
}
/**
* Get the block creation mock executor
*
* @return the mock executor
*/
public MockExecutorService mockBlockCreationExecutor() {
return (MockExecutorService) blockCreationExecutor;
}
@Override
public <T> void failAfterTimeout(final CompletableFuture<T> promise, final Duration timeout) {
final PendingTimeout<T> pendingTimeout = new PendingTimeout<>(promise, timeout);
if (timeoutPolicy.shouldTimeout()) {
pendingTimeout.expire();
} else {
this.pendingTimeouts.add(pendingTimeout);
}
}
/** Used to define the timeout behavior of the scheduler */
@FunctionalInterface
public interface TimeoutPolicy {
/** A policy that never timeouts */
TimeoutPolicy NEVER_TIMEOUT = () -> false;
/** A policy that timeouts on every task */
TimeoutPolicy ALWAYS_TIMEOUT = () -> true;
/**
* If it should simulate a timeout when called
*
* @return true if the scheduler should timeouts
*/
boolean shouldTimeout();
/**
* Create a timeout policy that timeouts x times
*
* @param times the number of timeouts
* @return the timeout policy
*/
static TimeoutPolicy timeoutXTimes(final int times) {
final AtomicInteger timeouts = new AtomicInteger(times);
return () -> {
if (timeouts.get() <= 0) {
return false;
}
timeouts.decrementAndGet();
return true;
};
}
}
private static class PendingTimeout<T> {
final CompletableFuture<T> promise;
final Duration timeout;
private PendingTimeout(final CompletableFuture<T> promise, final Duration timeout) {
this.promise = promise;
this.timeout = timeout;
}
public void expire() {
final TimeoutException timeoutException =
new TimeoutException(
"Mocked timeout after " + timeout.toMillis() + " " + TimeUnit.MILLISECONDS);
promise.completeExceptionally(timeoutException);
}
}
}