OpenTelemetrySystem.java

/*
 * Copyright ConsenSys AG.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.metrics.opentelemetry;

import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.ObservableMetricsSystem;
import org.hyperledger.besu.metrics.Observation;
import org.hyperledger.besu.metrics.StandardMetricCategory;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledGauge;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import org.hyperledger.besu.plugin.services.metrics.MetricCategory;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer;

import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryUsage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleSupplier;
import java.util.stream.Stream;

import com.google.common.collect.ImmutableSet;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.trace.TracerProvider;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.HistogramPointData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.MetricDataType;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.data.SummaryPointData;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.semconv.ResourceAttributes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Metrics system relying on the native OpenTelemetry format. */
public class OpenTelemetrySystem implements ObservableMetricsSystem {

  private static final Logger LOG = LoggerFactory.getLogger(OpenTelemetrySystem.class);

  private static final String TYPE_LABEL_KEY = "type";
  private static final String AREA_LABEL_KEY = "area";
  private static final String POOL_LABEL_KEY = "pool";
  private static final String USED = "used";
  private static final String COMMITTED = "committed";
  private static final String MAX = "max";
  private static final String HEAP = "heap";
  private static final String NON_HEAP = "non_heap";

  private final Set<MetricCategory> enabledCategories;
  private final boolean timersEnabled;
  private final Map<String, LabelledMetric<Counter>> cachedCounters = new ConcurrentHashMap<>();
  private final Map<String, LabelledMetric<OperationTimer>> cachedTimers =
      new ConcurrentHashMap<>();
  private final SdkMeterProvider sdkMeterProvider;
  private final DebugMetricReader debugMetricReader;
  private final SdkTracerProvider sdkTracerProvider;

  /**
   * Instantiates a new Open telemetry system.
   *
   * @param enabledCategories the enabled categories
   * @param timersEnabled the timers enabled
   * @param jobName the job name
   * @param setAsGlobal the set as global
   */
  public OpenTelemetrySystem(
      final Set<MetricCategory> enabledCategories,
      final boolean timersEnabled,
      final String jobName,
      final boolean setAsGlobal) {
    LOG.info("Starting OpenTelemetry metrics system");
    this.enabledCategories = ImmutableSet.copyOf(enabledCategories);
    this.timersEnabled = timersEnabled;
    this.debugMetricReader = new DebugMetricReader();
    Resource resource =
        Resource.getDefault()
            .merge(
                Resource.create(
                    Attributes.builder().put(ResourceAttributes.SERVICE_NAME, jobName).build()));
    AutoConfiguredOpenTelemetrySdkBuilder autoSdkBuilder =
        AutoConfiguredOpenTelemetrySdk.builder()
            .addMeterProviderCustomizer(
                (provider, config) ->
                    provider.setResource(resource).registerMetricReader(debugMetricReader))
            .addTracerProviderCustomizer((provider, config) -> provider.setResource(resource));

    if (setAsGlobal) {
      autoSdkBuilder.setResultAsGlobal();
    }

    OpenTelemetrySdk sdk = autoSdkBuilder.build().getOpenTelemetrySdk();
    this.sdkMeterProvider = sdk.getSdkMeterProvider();
    this.sdkTracerProvider = sdk.getSdkTracerProvider();
  }

  @Override
  public Stream<Observation> streamObservations(final MetricCategory category) {
    return streamObservations().filter(metricData -> metricData.getCategory().equals(category));
  }

  @Override
  public Stream<Observation> streamObservations() {
    Collection<MetricData> metricsList = this.debugMetricReader.getAllMetrics();
    return metricsList.stream().flatMap(this::convertToObservations);
  }

  private Stream<Observation> convertToObservations(final MetricData metricData) {
    List<Observation> observations = new ArrayList<>();
    MetricCategory category =
        categoryNameToMetricCategory(metricData.getInstrumentationScopeInfo().getName());
    if (category == null) {
      return Stream.empty();
    }
    Collection<?> points;
    switch (metricData.getType()) {
      case DOUBLE_GAUGE:
        points = metricData.getDoubleGaugeData().getPoints();
        break;
      case DOUBLE_SUM:
        points = metricData.getDoubleSumData().getPoints();
        break;
      case SUMMARY:
        points = metricData.getData().getPoints();
        break;
      case LONG_SUM:
        points = metricData.getLongSumData().getPoints();
        break;
      case HISTOGRAM:
        points = metricData.getData().getPoints();
        break;
      case LONG_GAUGE:
        points = metricData.getLongGaugeData().getPoints();
        break;
      default:
        throw new UnsupportedOperationException("Unsupported type " + metricData.getType().name());
    }

    for (Object ptObj : points) {
      PointData point = (PointData) ptObj;
      List<String> labels = new ArrayList<>();
      point.getAttributes().forEach((k, v) -> labels.add(v.toString()));
      observations.add(
          new Observation(
              category, metricData.getName(), extractValue(metricData.getType(), point), labels));
    }
    return observations.stream();
  }

  private MetricCategory categoryNameToMetricCategory(final String name) {
    Set<MetricCategory> categories =
        ImmutableSet.<MetricCategory>builder()
            .addAll(EnumSet.allOf(BesuMetricCategory.class))
            .addAll(EnumSet.allOf(StandardMetricCategory.class))
            .build();
    for (MetricCategory category : categories) {
      if (category.getName().equals(name)) {
        return category;
      }
    }
    return null;
  }

  private Object extractValue(final MetricDataType type, final PointData point) {
    switch (type) {
      case LONG_GAUGE:
      case LONG_SUM:
        return ((LongPointData) point).getValue();
      case DOUBLE_GAUGE:
        return ((DoublePointData) point).getValue();
      case SUMMARY:
        return ((SummaryPointData) point).getValues();
      case HISTOGRAM:
        return ((HistogramPointData) point).getCounts();
      default:
        throw new UnsupportedOperationException("Unsupported type " + type);
    }
  }

  @Override
  public LabelledMetric<Counter> createLabelledCounter(
      final MetricCategory category,
      final String name,
      final String help,
      final String... labelNames) {
    LOG.trace("Creating a counter {}", name);
    return cachedCounters.computeIfAbsent(
        name,
        (k) -> {
          if (isCategoryEnabled(category)) {
            final Meter meter = sdkMeterProvider.get(category.getName());

            final LongCounter counter = meter.counterBuilder(name).setDescription(help).build();
            return new OpenTelemetryCounter(counter, labelNames);
          } else {
            return NoOpMetricsSystem.getCounterLabelledMetric(labelNames.length);
          }
        });
  }

  @Override
  public LabelledMetric<OperationTimer> createLabelledTimer(
      final MetricCategory category,
      final String name,
      final String help,
      final String... labelNames) {
    LOG.trace("Creating a timer {}", name);
    return cachedTimers.computeIfAbsent(
        name,
        (k) -> {
          if (timersEnabled && isCategoryEnabled(category)) {
            final Meter meter = sdkMeterProvider.get(category.getName());
            return new OpenTelemetryTimer(name, help, meter, labelNames);
          } else {
            return NoOpMetricsSystem.getOperationTimerLabelledMetric(labelNames.length);
          }
        });
  }

  @Override
  public void createGauge(
      final MetricCategory category,
      final String name,
      final String help,
      final DoubleSupplier valueSupplier) {
    LOG.trace("Creating a gauge {}", name);
    if (isCategoryEnabled(category)) {
      final Meter meter = sdkMeterProvider.get(category.getName());
      meter
          .gaugeBuilder(name)
          .setDescription(help)
          .buildWithCallback(res -> res.record(valueSupplier.getAsDouble(), Attributes.empty()));
    }
  }

  @Override
  public LabelledGauge createLabelledGauge(
      final MetricCategory category,
      final String name,
      final String help,
      final String... labelNames) {
    LOG.trace("Creating a labelled gauge {}", name);
    if (isCategoryEnabled(category)) {
      return new OpenTelemetryGauge(
          name, help, sdkMeterProvider.get(category.getName()), List.of(labelNames));
    }
    return NoOpMetricsSystem.getLabelledGauge(labelNames.length);
  }

  @Override
  public Set<MetricCategory> getEnabledCategories() {
    return enabledCategories;
  }

  /** Init defaults. */
  public void initDefaults() {
    if (isCategoryEnabled(StandardMetricCategory.JVM)) {
      collectGC();
    }
  }

  private void collectGC() {
    final List<GarbageCollectorMXBean> garbageCollectors =
        ManagementFactory.getGarbageCollectorMXBeans();
    final MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
    final List<MemoryPoolMXBean> poolBeans = ManagementFactory.getMemoryPoolMXBeans();
    final Meter meter = sdkMeterProvider.get(StandardMetricCategory.JVM.getName());
    final List<Attributes> labelSets = new ArrayList<>(garbageCollectors.size());
    for (final GarbageCollectorMXBean gc : garbageCollectors) {
      labelSets.add(Attributes.of(AttributeKey.stringKey("gc"), gc.getName()));
    }
    meter
        .gaugeBuilder("jvm.gc.collection")
        .setDescription("Time spent in a given JVM garbage collector in milliseconds.")
        .setUnit("ms")
        .buildWithCallback(
            resultLongObserver -> {
              for (int i = 0; i < garbageCollectors.size(); i++) {
                resultLongObserver.record(
                    (double) garbageCollectors.get(i).getCollectionTime(), labelSets.get(i));
              }
            });
    final AttributeKey<String> typeKey = AttributeKey.stringKey(TYPE_LABEL_KEY);
    final AttributeKey<String> areaKey = AttributeKey.stringKey(AREA_LABEL_KEY);
    final Attributes usedHeap = Attributes.of(typeKey, USED, areaKey, HEAP);
    final Attributes usedNonHeap = Attributes.of(typeKey, USED, areaKey, NON_HEAP);
    final Attributes committedHeap = Attributes.of(typeKey, COMMITTED, areaKey, HEAP);
    final Attributes committedNonHeap = Attributes.of(typeKey, COMMITTED, areaKey, NON_HEAP);
    // TODO: Decide if max is needed or not. May be derived with some approximation from max(used).
    final Attributes maxHeap = Attributes.of(typeKey, MAX, areaKey, HEAP);
    final Attributes maxNonHeap = Attributes.of(typeKey, MAX, areaKey, NON_HEAP);
    meter
        .upDownCounterBuilder("jvm.memory.area")
        .setDescription("Bytes of a given JVM memory area.")
        .setUnit("By")
        .buildWithCallback(
            resultLongObserver -> {
              MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage();
              MemoryUsage nonHeapUsage = memoryBean.getNonHeapMemoryUsage();
              resultLongObserver.record(heapUsage.getUsed(), usedHeap);
              resultLongObserver.record(nonHeapUsage.getUsed(), usedNonHeap);
              resultLongObserver.record(heapUsage.getUsed(), committedHeap);
              resultLongObserver.record(nonHeapUsage.getUsed(), committedNonHeap);
              resultLongObserver.record(heapUsage.getUsed(), maxHeap);
              resultLongObserver.record(nonHeapUsage.getUsed(), maxNonHeap);
            });
    final List<Attributes> usedLabelSets = new ArrayList<>(poolBeans.size());
    final List<Attributes> committedLabelSets = new ArrayList<>(poolBeans.size());
    final List<Attributes> maxLabelSets = new ArrayList<>(poolBeans.size());
    final AttributeKey<String> poolKey = AttributeKey.stringKey(POOL_LABEL_KEY);
    for (final MemoryPoolMXBean pool : poolBeans) {
      usedLabelSets.add(Attributes.of(typeKey, USED, poolKey, pool.getName()));
      committedLabelSets.add(Attributes.of(typeKey, COMMITTED, poolKey, pool.getName()));
      maxLabelSets.add(Attributes.of(typeKey, MAX, poolKey, pool.getName()));
    }

    meter
        .upDownCounterBuilder("jvm.memory.pool")
        .setDescription("Bytes of a given JVM memory pool.")
        .setUnit("By")
        .buildWithCallback(
            resultLongObserver -> {
              for (int i = 0; i < poolBeans.size(); i++) {
                MemoryUsage poolUsage = poolBeans.get(i).getUsage();
                resultLongObserver.record(poolUsage.getUsed(), usedLabelSets.get(i));
                resultLongObserver.record(poolUsage.getCommitted(), committedLabelSets.get(i));
                // TODO: Decide if max is needed or not. May be derived with some approximation from
                //  max(used).
                resultLongObserver.record(poolUsage.getMax(), maxLabelSets.get(i));
              }
            });
  }

  /** Shuts down the OpenTelemetry exporters, blocking until they have completed orderly. */
  public void shutdown() {
    final CompletableResultCode result =
        CompletableResultCode.ofAll(
            Arrays.asList(this.sdkMeterProvider.shutdown(), this.sdkTracerProvider.shutdown()));
    result.join(5000, TimeUnit.SECONDS);
  }

  /**
   * Gets tracer provider.
   *
   * @return the tracer provider
   */
  public TracerProvider getTracerProvider() {
    return sdkTracerProvider;
  }
}