Runner.java

/*
 * Copyright ConsenSys AG.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu;

import org.hyperledger.besu.controller.BesuController;
import org.hyperledger.besu.ethereum.api.graphql.GraphQLHttpService;
import org.hyperledger.besu.ethereum.api.jsonrpc.EngineJsonRpcService;
import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcHttpService;
import org.hyperledger.besu.ethereum.api.jsonrpc.ipc.JsonRpcIpcService;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketService;
import org.hyperledger.besu.ethereum.api.query.cache.AutoTransactionLogBloomCachingService;
import org.hyperledger.besu.ethereum.api.query.cache.TransactionLogBloomCacher;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolEvictionService;
import org.hyperledger.besu.ethereum.p2p.network.NetworkRunner;
import org.hyperledger.besu.ethereum.stratum.StratumServer;
import org.hyperledger.besu.ethstats.EthStatsService;
import org.hyperledger.besu.metrics.MetricsService;
import org.hyperledger.besu.nat.NatService;
import org.hyperledger.besu.plugin.data.EnodeURL;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.google.common.annotations.VisibleForTesting;
import io.vertx.core.Vertx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** The Runner controls various Besu services lifecycle. */
public class Runner implements AutoCloseable {

  private static final Logger LOG = LoggerFactory.getLogger(Runner.class);

  private final Vertx vertx;
  private final CountDownLatch vertxShutdownLatch = new CountDownLatch(1);
  private final CountDownLatch shutdown = new CountDownLatch(1);

  private final NatService natService;
  private final NetworkRunner networkRunner;
  private final Optional<EthStatsService> ethStatsService;
  private final Optional<GraphQLHttpService> graphQLHttp;
  private final Optional<JsonRpcHttpService> jsonRpc;
  private final Optional<EngineJsonRpcService> engineJsonRpc;
  private final Optional<MetricsService> metrics;
  private final Optional<JsonRpcIpcService> ipcJsonRpc;
  private final Optional<Path> pidPath;
  private final Optional<WebSocketService> webSocketRpc;
  private final TransactionPoolEvictionService transactionPoolEvictionService;

  private final BesuController besuController;
  private final Path dataDir;
  private final Optional<StratumServer> stratumServer;
  private final Optional<AutoTransactionLogBloomCachingService>
      autoTransactionLogBloomCachingService;

  /**
   * Instantiates a new Runner.
   *
   * @param vertx the vertx
   * @param networkRunner the network runner
   * @param natService the nat service
   * @param jsonRpc the json rpc
   * @param engineJsonRpc the engine json rpc
   * @param graphQLHttp the graph ql http
   * @param webSocketRpc the web socket rpc
   * @param ipcJsonRpc the ipc json rpc
   * @param stratumServer the stratum server
   * @param metrics the metrics
   * @param ethStatsService the eth stats service
   * @param besuController the besu controller
   * @param dataDir the data dir
   * @param pidPath the pid path
   * @param transactionLogBloomCacher the transaction log bloom cacher
   * @param blockchain the blockchain
   */
  Runner(
      final Vertx vertx,
      final NetworkRunner networkRunner,
      final NatService natService,
      final Optional<JsonRpcHttpService> jsonRpc,
      final Optional<EngineJsonRpcService> engineJsonRpc,
      final Optional<GraphQLHttpService> graphQLHttp,
      final Optional<WebSocketService> webSocketRpc,
      final Optional<JsonRpcIpcService> ipcJsonRpc,
      final Optional<StratumServer> stratumServer,
      final Optional<MetricsService> metrics,
      final Optional<EthStatsService> ethStatsService,
      final BesuController besuController,
      final Path dataDir,
      final Optional<Path> pidPath,
      final Optional<TransactionLogBloomCacher> transactionLogBloomCacher,
      final Blockchain blockchain) {
    this.vertx = vertx;
    this.networkRunner = networkRunner;
    this.natService = natService;
    this.graphQLHttp = graphQLHttp;
    this.pidPath = pidPath;
    this.jsonRpc = jsonRpc;
    this.engineJsonRpc = engineJsonRpc;
    this.webSocketRpc = webSocketRpc;
    this.ipcJsonRpc = ipcJsonRpc;
    this.metrics = metrics;
    this.ethStatsService = ethStatsService;
    this.besuController = besuController;
    this.dataDir = dataDir;
    this.stratumServer = stratumServer;
    this.autoTransactionLogBloomCachingService =
        transactionLogBloomCacher.map(
            cacher -> new AutoTransactionLogBloomCachingService(blockchain, cacher));
    this.transactionPoolEvictionService =
        new TransactionPoolEvictionService(vertx, besuController.getTransactionPool());
  }

  /** Start external services. */
  public void startExternalServices() {
    LOG.info("Starting external services ... ");
    metrics.ifPresent(service -> waitForServiceToStart("metrics", service.start()));

    jsonRpc.ifPresent(service -> waitForServiceToStart("jsonRpc", service.start()));
    engineJsonRpc.ifPresent(service -> waitForServiceToStart("engineJsonRpc", service.start()));
    graphQLHttp.ifPresent(service -> waitForServiceToStart("graphQLHttp", service.start()));
    webSocketRpc.ifPresent(service -> waitForServiceToStart("websocketRpc", service.start()));
    ipcJsonRpc.ifPresent(
        service ->
            waitForServiceToStart(
                "ipcJsonRpc", service.start().toCompletionStage().toCompletableFuture()));
    stratumServer.ifPresent(
        server ->
            waitForServiceToStart(
                "stratum", server.start().toCompletionStage().toCompletableFuture()));
    autoTransactionLogBloomCachingService.ifPresent(AutoTransactionLogBloomCachingService::start);
  }

  private void startExternalServicePostMainLoop() {
    ethStatsService.ifPresent(EthStatsService::start);
  }

  /** Start ethereum main loop. */
  public void startEthereumMainLoop() {
    try {
      LOG.info("Starting Ethereum main loop ... ");
      natService.start();
      networkRunner.start();
      if (networkRunner.getNetwork().isP2pEnabled()) {
        besuController.getSynchronizer().start();
      }
      besuController.getMiningCoordinator().start();
      transactionPoolEvictionService.start();

      LOG.info("Ethereum main loop is up.");
      // we write these values to disk to be able to access them during the acceptance tests
      writeBesuPortsToFile();
      writeBesuNetworksToFile();
      writePidFile();

      // start external service that depends on information from main loop
      startExternalServicePostMainLoop();
    } catch (final Exception ex) {
      LOG.error("unable to start main loop", ex);
      throw new IllegalStateException("Startup failed", ex);
    }
  }

  /** Stop services. */
  public void stop() {
    transactionPoolEvictionService.stop();
    jsonRpc.ifPresent(service -> waitForServiceToStop("jsonRpc", service.stop()));
    engineJsonRpc.ifPresent(service -> waitForServiceToStop("engineJsonRpc", service.stop()));
    graphQLHttp.ifPresent(service -> waitForServiceToStop("graphQLHttp", service.stop()));
    webSocketRpc.ifPresent(service -> waitForServiceToStop("websocketRpc", service.stop()));
    ipcJsonRpc.ifPresent(
        service ->
            waitForServiceToStop(
                "ipcJsonRpc", service.stop().toCompletionStage().toCompletableFuture()));
    waitForServiceToStop("Transaction Pool", besuController.getTransactionPool().setDisabled());
    metrics.ifPresent(service -> waitForServiceToStop("metrics", service.stop()));
    ethStatsService.ifPresent(EthStatsService::stop);
    besuController.getMiningCoordinator().stop();
    waitForServiceToStop("Mining Coordinator", besuController.getMiningCoordinator()::awaitStop);
    stratumServer.ifPresent(server -> waitForServiceToStop("Stratum", server::stop));
    if (networkRunner.getNetwork().isP2pEnabled()) {
      besuController.getSynchronizer().stop();
      waitForServiceToStop("Synchronizer", besuController.getSynchronizer()::awaitStop);
    }

    networkRunner.stop();
    waitForServiceToStop("Network", networkRunner::awaitStop);
    autoTransactionLogBloomCachingService.ifPresent(AutoTransactionLogBloomCachingService::stop);
    natService.stop();
    besuController.close();
    vertx.close((res) -> vertxShutdownLatch.countDown());
    waitForServiceToStop("Vertx", vertxShutdownLatch::await);
    shutdown.countDown();
  }

  /** Await stop. */
  public void awaitStop() {
    try {
      shutdown.await();
    } catch (final InterruptedException e) {
      LOG.debug("Interrupted, exiting", e);
      Thread.currentThread().interrupt();
    }
  }

  @Override
  public void close() {
    stop();
    awaitStop();
  }

  private void waitForServiceToStop(
      final String serviceName, final CompletableFuture<?> stopFuture) {
    try {
      stopFuture.get(30, TimeUnit.SECONDS);
    } catch (final InterruptedException e) {
      LOG.debug("Interrupted while waiting for service to complete", e);
      Thread.currentThread().interrupt();
    } catch (final ExecutionException e) {
      LOG.error("Service " + serviceName + " failed to shutdown", e);
    } catch (final TimeoutException e) {
      LOG.error("Service {} did not shut down cleanly", serviceName);
    }
  }

  private void waitForServiceToStop(final String serviceName, final SynchronousShutdown shutdown) {
    try {
      shutdown.await();
    } catch (final InterruptedException e) {
      LOG.debug("Interrupted while waiting for service " + serviceName + " to stop", e);
      Thread.currentThread().interrupt();
    }
  }

  private void waitForServiceToStart(
      final String serviceName, final CompletableFuture<?> startFuture) {
    do {
      try {
        startFuture.get(60, TimeUnit.SECONDS);
      } catch (final InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new IllegalStateException("Interrupted while waiting for service to start", e);
      } catch (final ExecutionException e) {
        throw new IllegalStateException("Service " + serviceName + " failed to start", e);
      } catch (final TimeoutException e) {
        LOG.warn("Service {} is taking an unusually long time to start", serviceName);
      }
    } while (!startFuture.isDone());
  }

  private void writeBesuPortsToFile() {
    final Properties properties = new Properties();
    if (networkRunner.getNetwork().isP2pEnabled()) {
      networkRunner
          .getNetwork()
          .getLocalEnode()
          .ifPresent(
              enode -> {
                enode
                    .getDiscoveryPort()
                    .ifPresent(
                        discoveryPort ->
                            properties.setProperty("discovery", String.valueOf(discoveryPort)));
                enode
                    .getListeningPort()
                    .ifPresent(
                        listeningPort ->
                            properties.setProperty("p2p", String.valueOf(listeningPort)));
              });
    }

    Optional<Integer> port = getJsonRpcPort();
    if (port.isPresent()) {
      properties.setProperty("json-rpc", String.valueOf(port.get()));
    }
    port = getGraphQLHttpPort();
    if (port.isPresent()) {
      properties.setProperty("graphql-http", String.valueOf(port.get()));
    }
    port = getWebSocketPort();
    if (port.isPresent()) {
      properties.setProperty("ws-rpc", String.valueOf(port.get()));
    }
    port = getMetricsPort();
    if (port.isPresent()) {
      properties.setProperty("metrics", String.valueOf(port.get()));
    }
    port = getEngineJsonRpcPort();
    if (port.isPresent()) {
      properties.setProperty("engine-json-rpc", String.valueOf(port.get()));
    }
    // create besu.ports file
    createBesuFile(
        properties, "ports", "This file contains the ports used by the running instance of Besu");
  }

  private void writeBesuNetworksToFile() {
    final Properties properties = new Properties();
    if (networkRunner.getNetwork().isP2pEnabled()) {
      networkRunner
          .getNetwork()
          .getLocalEnode()
          .ifPresent(
              enode -> {
                final String globalIp = natService.queryExternalIPAddress(enode.getIpAsString());
                properties.setProperty("global-ip", globalIp);
                final String localIp = natService.queryLocalIPAddress(enode.getIpAsString());
                properties.setProperty("local-ip", localIp);
              });
    }
    // create besu.networks file
    createBesuFile(
        properties,
        "networks",
        "This file contains the IP Addresses (global and local) used by the running instance of Besu");
  }

  private void writePidFile() {
    pidPath.ifPresent(
        path -> {
          String pid = "";
          try {
            pid = Long.toString(ProcessHandle.current().pid());
          } catch (Throwable t) {
            LOG.error("Error retrieving PID", t);
          }
          try {
            Files.write(
                path,
                pid.getBytes(StandardCharsets.UTF_8),
                StandardOpenOption.CREATE,
                StandardOpenOption.TRUNCATE_EXISTING,
                StandardOpenOption.WRITE);
            path.toFile().deleteOnExit();
          } catch (IOException e) {
            LOG.error("Error writing PID file", e);
          }
        });
  }

  /**
   * Gets json rpc port.
   *
   * @return the json rpc port
   */
  public Optional<Integer> getJsonRpcPort() {
    return jsonRpc.map(service -> service.socketAddress().getPort());
  }

  /**
   * Gets engine json rpc port.
   *
   * @return the engine json rpc port
   */
  public Optional<Integer> getEngineJsonRpcPort() {
    return engineJsonRpc.map(service -> service.socketAddress().getPort());
  }

  /**
   * Gets GraphQl http port.
   *
   * @return the graph ql http port
   */
  public Optional<Integer> getGraphQLHttpPort() {
    return graphQLHttp.map(service -> service.socketAddress().getPort());
  }

  /**
   * Gets web socket port.
   *
   * @return the web socket port
   */
  public Optional<Integer> getWebSocketPort() {
    return webSocketRpc.map(service -> service.socketAddress().getPort());
  }

  /**
   * Gets metrics port.
   *
   * @return the metrics port
   */
  public Optional<Integer> getMetricsPort() {
    if (metrics.isPresent()) {
      return metrics.get().getPort();
    } else {
      return Optional.empty();
    }
  }

  /**
   * Gets local enode.
   *
   * @return the local enode
   */
  @VisibleForTesting
  Optional<EnodeURL> getLocalEnode() {
    return networkRunner.getNetwork().getLocalEnode();
  }

  @FunctionalInterface
  private interface SynchronousShutdown {
    /**
     * Await for shutdown.
     *
     * @throws InterruptedException the interrupted exception
     */
    void await() throws InterruptedException;
  }

  private void createBesuFile(
      final Properties properties, final String fileName, final String fileHeader) {
    final File file = new File(dataDir.toFile(), String.format("besu.%s", fileName));
    file.deleteOnExit();
    try (final FileOutputStream fileOutputStream = new FileOutputStream(file)) {
      properties.store(
          fileOutputStream,
          String.format("%s. This file will be deleted after the node is shutdown.", fileHeader));
    } catch (final Exception e) {
      LOG.warn(String.format("Error writing %s file", fileName), e);
    }
  }
}