MetricsHttpService.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.metrics.prometheus;
import static com.google.common.base.Preconditions.checkArgument;
import org.hyperledger.besu.metrics.MetricsService;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.prometheus.client.exporter.common.TextFormat;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.net.HostAndPort;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** The Metrics http service. */
public class MetricsHttpService implements MetricsService {
private static final Logger LOG = LoggerFactory.getLogger(MetricsHttpService.class);
private static final InetSocketAddress EMPTY_SOCKET_ADDRESS = new InetSocketAddress("0.0.0.0", 0);
private final Vertx vertx;
private final MetricsConfiguration config;
private final MetricsSystem metricsSystem;
private HttpServer httpServer;
/**
* Instantiates a new Metrics http service.
*
* @param vertx the vertx
* @param configuration the configuration
* @param metricsSystem the metrics system
*/
public MetricsHttpService(
final Vertx vertx,
final MetricsConfiguration configuration,
final MetricsSystem metricsSystem) {
validateConfig(configuration);
this.vertx = vertx;
this.config = configuration;
this.metricsSystem = metricsSystem;
}
private void validateConfig(final MetricsConfiguration config) {
checkArgument(config.getPort() >= 0 && config.getPort() < 65535, "Invalid port configuration.");
checkArgument(config.getHost() != null, "Required host is not configured.");
checkArgument(
!(config.isEnabled() && config.isPushEnabled()),
"Metrics Http Service cannot run concurrent with push metrics.");
}
@Override
public CompletableFuture<?> start() {
LOG.info("Starting metrics http service on {}:{}", config.getHost(), config.getPort());
// Create the HTTP server and a router object.
httpServer =
vertx.createHttpServer(
new HttpServerOptions()
.setHost(config.getHost())
.setPort(config.getPort())
.setIdleTimeout(config.getIdleTimeout())
.setHandle100ContinueAutomatically(true)
.setCompressionSupported(true));
final Router router = Router.router(vertx);
// Verify Host header.
router.route().handler(checkAllowlistHostHeader());
// Endpoint for AWS health check.
router.route("/").method(HttpMethod.GET).handler(this::handleEmptyRequest);
// Endpoint for Prometheus metrics monitoring.
router.route("/metrics").method(HttpMethod.GET).handler(this::metricsRequest);
final CompletableFuture<?> resultFuture = new CompletableFuture<>();
httpServer
.requestHandler(router)
.listen(
res -> {
if (!res.failed()) {
resultFuture.complete(null);
final int actualPort = httpServer.actualPort();
config.setActualPort(actualPort);
LOG.info(
"Metrics service started and listening on {}:{}", config.getHost(), actualPort);
return;
}
httpServer = null;
final Throwable cause = res.cause();
if (cause instanceof SocketException) {
resultFuture.completeExceptionally(
new RuntimeException(
String.format(
"Failed to bind metrics listener to %s:%s (actual port %s): %s",
config.getHost(),
config.getPort(),
config.getActualPort(),
cause.getMessage())));
return;
}
resultFuture.completeExceptionally(cause);
});
return resultFuture;
}
private Handler<RoutingContext> checkAllowlistHostHeader() {
return event -> {
final Optional<String> hostHeader = getAndValidateHostHeader(event);
if (config.getHostsAllowlist().contains("*")
|| (hostHeader.isPresent() && hostIsInAllowlist(hostHeader.get()))) {
event.next();
} else {
final HttpServerResponse response = event.response();
if (!response.closed()) {
response
.setStatusCode(403)
.putHeader("Content-Type", "application/json; charset=utf-8")
.end("{\"message\":\"Host not authorized.\"}");
}
}
};
}
private Optional<String> getAndValidateHostHeader(final RoutingContext event) {
return Optional.ofNullable(event.request().authority()).map(HostAndPort::host);
}
private boolean hostIsInAllowlist(final String hostHeader) {
if (config.getHostsAllowlist().stream()
.anyMatch(
allowlistEntry ->
allowlistEntry
.toLowerCase(Locale.ROOT)
.equals(hostHeader.toLowerCase(Locale.ROOT)))) {
return true;
} else {
LOG.trace("Host not in allowlist: '{}'", hostHeader);
return false;
}
}
@Override
public CompletableFuture<?> stop() {
if (httpServer == null) {
return CompletableFuture.completedFuture(null);
}
final CompletableFuture<?> resultFuture = new CompletableFuture<>();
httpServer.close(
res -> {
if (res.failed()) {
resultFuture.completeExceptionally(res.cause());
} else {
httpServer = null;
resultFuture.complete(null);
}
});
return resultFuture;
}
private void metricsRequest(final RoutingContext routingContext) {
final Set<String> names = new TreeSet<>(routingContext.queryParam("name[]"));
final HttpServerResponse response = routingContext.response();
vertx.<String>executeBlocking(
future -> {
try {
final ByteArrayOutputStream metrics = new ByteArrayOutputStream(16 * 1024);
final OutputStreamWriter osw = new OutputStreamWriter(metrics, StandardCharsets.UTF_8);
TextFormat.write004(
osw,
((PrometheusMetricsSystem) (metricsSystem))
.getRegistry()
.filteredMetricFamilySamples(names));
osw.flush();
osw.close();
metrics.flush();
metrics.close();
future.complete(metrics.toString(StandardCharsets.UTF_8.name()));
} catch (final IOException ioe) {
future.fail(ioe);
}
},
false,
(res) -> {
if (response.closed()) {
// Request for metrics closed before response was generated
return;
}
if (res.failed()) {
LOG.error("Request for metrics failed", res.cause());
response.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end();
} else {
response.setStatusCode(HttpResponseStatus.OK.code());
response.putHeader("Content-Type", TextFormat.CONTENT_TYPE_004);
response.end(res.result());
}
});
}
/**
* Socket address inet socket address.
*
* @return the inet socket address
*/
InetSocketAddress socketAddress() {
if (httpServer == null) {
return EMPTY_SOCKET_ADDRESS;
}
return new InetSocketAddress(config.getHost(), httpServer.actualPort());
}
@Override
public Optional<Integer> getPort() {
if (httpServer == null) {
return Optional.empty();
}
return Optional.of(httpServer.actualPort());
}
// Facilitate remote health-checks in AWS, inter alia.
private void handleEmptyRequest(final RoutingContext routingContext) {
routingContext.response().setStatusCode(201).end();
}
}