MonitoredExecutors.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager;
import org.hyperledger.besu.ethereum.eth.manager.bounded.BoundedQueue;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import java.util.Locale;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class MonitoredExecutors {
public static ExecutorService newFixedThreadPool(
final String name,
final int minWorkerCount,
final int workerCount,
final MetricsSystem metricsSystem) {
return newFixedThreadPool(
name, minWorkerCount, workerCount, new LinkedBlockingQueue<>(), metricsSystem);
}
public static ExecutorService newBoundedThreadPool(
final String name,
final int workerCount,
final int queueSize,
final MetricsSystem metricsSystem) {
return newBoundedThreadPool(name, 1, workerCount, queueSize, metricsSystem);
}
public static ExecutorService newBoundedThreadPool(
final String name,
final int minWorkerCount,
final int maxWorkerCount,
final int queueSize,
final MetricsSystem metricsSystem) {
return newFixedThreadPool(
name,
minWorkerCount,
maxWorkerCount,
new BoundedQueue(queueSize, toMetricName(name), metricsSystem),
metricsSystem);
}
private static ExecutorService newFixedThreadPool(
final String name,
final int minWorkerCount,
final int maxWorkerCount,
final BlockingQueue<Runnable> workingQueue,
final MetricsSystem metricsSystem) {
return newMonitoredExecutor(
name,
metricsSystem,
(rejectedExecutionHandler, threadFactory) ->
new ThreadPoolExecutor(
minWorkerCount,
maxWorkerCount,
60L,
TimeUnit.SECONDS,
workingQueue,
threadFactory,
rejectedExecutionHandler));
}
public static ExecutorService newCachedThreadPool(
final String name, final MetricsSystem metricsSystem) {
return newCachedThreadPool(name, 0, metricsSystem);
}
public static ExecutorService newCachedThreadPool(
final String name, final int corePoolSize, final MetricsSystem metricsSystem) {
return newMonitoredExecutor(
name,
metricsSystem,
(rejectedExecutionHandler, threadFactory) ->
new ThreadPoolExecutor(
corePoolSize,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory,
rejectedExecutionHandler));
}
public static ScheduledExecutorService newScheduledThreadPool(
final String name, final int corePoolSize, final MetricsSystem metricsSystem) {
return newMonitoredExecutor(
name,
metricsSystem,
(rejectedExecutionHandler, threadFactory) ->
new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, rejectedExecutionHandler));
}
public static ExecutorService newSingleThreadExecutor(
final String name, final MetricsSystem metricsSystem) {
return newFixedThreadPool(name, 1, 1, metricsSystem);
}
private static <T extends ThreadPoolExecutor> T newMonitoredExecutor(
final String name,
final MetricsSystem metricsSystem,
final BiFunction<RejectedExecutionHandler, ThreadFactory, T> creator) {
final String metricName = toMetricName(name);
final T executor =
creator.apply(
new CountingAbortPolicy(metricName, metricsSystem),
new ThreadFactoryBuilder().setNameFormat(name + "-%d").build());
metricsSystem.createIntegerGauge(
BesuMetricCategory.EXECUTORS,
metricName + "_queue_length_current",
"Current number of tasks awaiting execution",
executor.getQueue()::size);
metricsSystem.createIntegerGauge(
BesuMetricCategory.EXECUTORS,
metricName + "_active_threads_current",
"Current number of threads executing tasks",
executor::getActiveCount);
metricsSystem.createIntegerGauge(
BesuMetricCategory.EXECUTORS,
metricName + "_pool_size_current",
"Current number of threads in the thread pool",
executor::getPoolSize);
metricsSystem.createLongGauge(
BesuMetricCategory.EXECUTORS,
metricName + "_completed_tasks_total",
"Total number of tasks executed",
executor::getCompletedTaskCount);
metricsSystem.createLongGauge(
BesuMetricCategory.EXECUTORS,
metricName + "_submitted_tasks_total",
"Total number of tasks executed",
executor::getTaskCount);
return executor;
}
private static String toMetricName(final String name) {
return name.toLowerCase(Locale.US).replace('-', '_');
}
private static class CountingAbortPolicy extends AbortPolicy {
private final Counter rejectedTaskCounter;
public CountingAbortPolicy(final String metricName, final MetricsSystem metricsSystem) {
this.rejectedTaskCounter =
metricsSystem.createCounter(
BesuMetricCategory.EXECUTORS,
metricName + "_rejected_tasks_total",
"Total number of tasks rejected by this executor");
}
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor e) {
rejectedTaskCounter.inc();
super.rejectedExecution(r, e);
}
}
}