RocksDBColumnarKeyValueStorage.java

/*
 * Copyright Hyperledger Besu Contributors..
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.plugin.services.storage.rocksdb.segmented;

import static java.util.stream.Collectors.toUnmodifiableSet;

import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetrics;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbIterator;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbSegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbUtil;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.common.base.Splitter;
import com.google.common.collect.Streams;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.tuweni.bytes.Bytes;
import org.rocksdb.AbstractRocksIterator;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.Env;
import org.rocksdb.LRUCache;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Statistics;
import org.rocksdb.Status;
import org.rocksdb.TransactionDBOptions;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** The RocksDb columnar key value storage. */
public abstract class RocksDBColumnarKeyValueStorage implements SegmentedKeyValueStorage {

  private static final Logger LOG = LoggerFactory.getLogger(RocksDBColumnarKeyValueStorage.class);
  private static final int ROCKSDB_FORMAT_VERSION = 5;
  private static final long ROCKSDB_BLOCK_SIZE = 32768;

  /** RocksDb blockcache size when using the high spec option */
  protected static final long ROCKSDB_BLOCKCACHE_SIZE_HIGH_SPEC = 1_073_741_824L;

  /** RocksDb memtable size when using the high spec option */
  protected static final long ROCKSDB_MEMTABLE_SIZE_HIGH_SPEC = 536_870_912L;

  /** Max total size of all WAL file, after which a flush is triggered */
  protected static final long WAL_MAX_TOTAL_SIZE = 1_073_741_824L;

  /** Expected size of a single WAL file, to determine how many WAL files to keep around */
  protected static final long EXPECTED_WAL_FILE_SIZE = 67_108_864L;

  /** RocksDb number of log files to keep on disk */
  private static final long NUMBER_OF_LOG_FILES_TO_KEEP = 7;

  /** RocksDb Time to roll a log file (1 day = 3600 * 24 seconds) */
  private static final long TIME_TO_ROLL_LOG_FILE = 86_400L;

  static {
    RocksDbUtil.loadNativeLibrary();
  }

  /** atomic boolean to track if the storage is closed */
  protected final AtomicBoolean closed = new AtomicBoolean(false);

  private final WriteOptions tryDeleteOptions =
      new WriteOptions().setNoSlowdown(true).setIgnoreMissingColumnFamilies(true);
  private final ReadOptions readOptions = new ReadOptions().setVerifyChecksums(false);
  private final MetricsSystem metricsSystem;
  private final RocksDBMetricsFactory rocksDBMetricsFactory;
  private final RocksDBConfiguration configuration;

  /** RocksDB DB options */
  protected DBOptions options;

  /** RocksDb transactionDB options */
  protected TransactionDBOptions txOptions;

  /** RocksDb statistics */
  protected final Statistics stats = new Statistics();

  /** RocksDB metrics */
  protected RocksDBMetrics metrics;

  /** Map of the columns handles by name */
  protected Map<SegmentIdentifier, RocksDbSegmentIdentifier> columnHandlesBySegmentIdentifier;

  /** Column descriptors */
  protected List<ColumnFamilyDescriptor> columnDescriptors;

  /** Column handles */
  protected List<ColumnFamilyHandle> columnHandles;

  /** Trimmed segments */
  protected List<SegmentIdentifier> trimmedSegments;

  /**
   * Instantiates a new Rocks db columnar key value storage.
   *
   * @param configuration the configuration
   * @param defaultSegments the segments
   * @param ignorableSegments the ignorable segments
   * @param metricsSystem the metrics system
   * @param rocksDBMetricsFactory the rocks db metrics factory
   * @throws StorageException the storage exception
   */
  public RocksDBColumnarKeyValueStorage(
      final RocksDBConfiguration configuration,
      final List<SegmentIdentifier> defaultSegments,
      final List<SegmentIdentifier> ignorableSegments,
      final MetricsSystem metricsSystem,
      final RocksDBMetricsFactory rocksDBMetricsFactory)
      throws StorageException {

    this.configuration = configuration;
    this.metricsSystem = metricsSystem;
    this.rocksDBMetricsFactory = rocksDBMetricsFactory;

    try {
      trimmedSegments = new ArrayList<>(defaultSegments);
      final List<byte[]> existingColumnFamilies =
          RocksDB.listColumnFamilies(new Options(), configuration.getDatabaseDir().toString());
      // Only ignore if not existed currently
      ignorableSegments.stream()
          .filter(
              ignorableSegment ->
                  existingColumnFamilies.stream()
                      .noneMatch(existed -> Arrays.equals(existed, ignorableSegment.getId())))
          .forEach(trimmedSegments::remove);
      columnDescriptors =
          trimmedSegments.stream()
              .map(segment -> createColumnDescriptor(segment, configuration))
              .collect(Collectors.toList());

      setGlobalOptions(configuration, stats);

      txOptions = new TransactionDBOptions();
      columnHandles = new ArrayList<>(columnDescriptors.size());
    } catch (RocksDBException e) {
      throw parseRocksDBException(e, defaultSegments, ignorableSegments);
    }
  }

  /**
   * Create a Column Family Descriptor for a given segment It defines basically the different
   * options to apply to the corresponding Column Family
   *
   * @param segment the segment identifier
   * @param configuration RocksDB configuration
   * @return a column family descriptor
   */
  private ColumnFamilyDescriptor createColumnDescriptor(
      final SegmentIdentifier segment, final RocksDBConfiguration configuration) {

    BlockBasedTableConfig basedTableConfig = createBlockBasedTableConfig(segment, configuration);

    final var options =
        new ColumnFamilyOptions()
            .setTtl(0)
            .setCompressionType(CompressionType.LZ4_COMPRESSION)
            .setTableFormatConfig(basedTableConfig);

    if (segment.containsStaticData()) {
      options
          .setEnableBlobFiles(true)
          .setEnableBlobGarbageCollection(segment.isStaticDataGarbageCollectionEnabled())
          .setMinBlobSize(100)
          .setBlobCompressionType(CompressionType.LZ4_COMPRESSION);
    }

    return new ColumnFamilyDescriptor(segment.getId(), options);
  }

  /***
   * Create a Block Base Table configuration for each segment, depending on the configuration in place
   * and the segment itself
   *
   * @param segment The segment related to the column family
   * @param config RocksDB configuration
   * @return Block Base Table configuration
   */
  private BlockBasedTableConfig createBlockBasedTableConfig(
      final SegmentIdentifier segment, final RocksDBConfiguration config) {
    final LRUCache cache =
        new LRUCache(
            config.isHighSpec() && segment.isEligibleToHighSpecFlag()
                ? ROCKSDB_BLOCKCACHE_SIZE_HIGH_SPEC
                : config.getCacheCapacity());
    return new BlockBasedTableConfig()
        .setFormatVersion(ROCKSDB_FORMAT_VERSION)
        .setBlockCache(cache)
        .setFilterPolicy(new BloomFilter(10, false))
        .setPartitionFilters(true)
        .setCacheIndexAndFilterBlocks(false)
        .setBlockSize(ROCKSDB_BLOCK_SIZE);
  }

  /***
   * Set Global options (DBOptions)
   *
   * @param configuration RocksDB configuration
   * @param stats The statistics object
   */
  private void setGlobalOptions(final RocksDBConfiguration configuration, final Statistics stats) {
    options = new DBOptions();
    options
        .setCreateIfMissing(true)
        .setMaxOpenFiles(configuration.getMaxOpenFiles())
        .setStatistics(stats)
        .setCreateMissingColumnFamilies(true)
        .setLogFileTimeToRoll(TIME_TO_ROLL_LOG_FILE)
        .setKeepLogFileNum(NUMBER_OF_LOG_FILES_TO_KEEP)
        .setEnv(Env.getDefault().setBackgroundThreads(configuration.getBackgroundThreadCount()))
        .setMaxTotalWalSize(WAL_MAX_TOTAL_SIZE)
        .setRecycleLogFileNum(WAL_MAX_TOTAL_SIZE / EXPECTED_WAL_FILE_SIZE);
  }

  /**
   * Parse RocksDBException and wrap in StorageException
   *
   * @param ex RocksDBException
   * @param defaultSegments segments requested to open
   * @param ignorableSegments segments which are ignorable if not present
   * @return StorageException wrapping the RocksDB Exception
   */
  protected static StorageException parseRocksDBException(
      final RocksDBException ex,
      final List<SegmentIdentifier> defaultSegments,
      final List<SegmentIdentifier> ignorableSegments) {
    String message = ex.getMessage();
    List<SegmentIdentifier> knownSegments =
        Streams.concat(defaultSegments.stream(), ignorableSegments.stream()).distinct().toList();

    // parse out unprintable segment names for a more useful exception:
    String columnExceptionMessagePrefix = "Column families not opened: ";
    if (message.contains(columnExceptionMessagePrefix)) {
      String substring = message.substring(message.indexOf(": ") + 2);

      List<String> unHandledSegments = new ArrayList<>();
      Splitter.on(", ")
          .splitToStream(substring)
          .forEach(
              part -> {
                byte[] bytes = part.getBytes(StandardCharsets.UTF_8);
                unHandledSegments.add(
                    knownSegments.stream()
                        .filter(seg -> Arrays.equals(seg.getId(), bytes))
                        .findFirst()
                        .map(seg -> new SegmentRecord(seg.getName(), seg.getId()))
                        .orElse(new SegmentRecord(part, bytes))
                        .forDisplay());
              });

      return new StorageException(
          "RocksDBException: Unhandled column families: ["
              + unHandledSegments.stream().collect(Collectors.joining(", "))
              + "]");
    } else {
      return new StorageException(ex);
    }
  }

  void initMetrics() {
    metrics = rocksDBMetricsFactory.create(metricsSystem, configuration, getDB(), stats);
  }

  void initColumnHandles() throws RocksDBException {
    // will not include the DEFAULT columnHandle, we do not use it:
    columnHandlesBySegmentIdentifier =
        trimmedSegments.stream()
            .collect(
                Collectors.toMap(
                    segmentId -> segmentId,
                    segment -> {
                      var columnHandle =
                          columnHandles.stream()
                              .filter(
                                  ch -> {
                                    try {
                                      return Arrays.equals(ch.getName(), segment.getId());
                                    } catch (RocksDBException e) {
                                      throw new RuntimeException(e);
                                    }
                                  })
                              .findFirst()
                              .orElseThrow(
                                  () ->
                                      new RuntimeException(
                                          "Column handle not found for segment "
                                              + segment.getName()));
                      return new RocksDbSegmentIdentifier(getDB(), columnHandle);
                    }));
  }

  /**
   * Safe method to map segment identifier to column handle.
   *
   * @param segment segment identifier
   * @return column handle
   */
  protected ColumnFamilyHandle safeColumnHandle(final SegmentIdentifier segment) {
    RocksDbSegmentIdentifier safeRef = columnHandlesBySegmentIdentifier.get(segment);
    if (safeRef == null) {
      throw new RuntimeException("Column handle not found for segment " + segment.getName());
    }
    return safeRef.get();
  }

  @Override
  public Optional<byte[]> get(final SegmentIdentifier segment, final byte[] key)
      throws StorageException {
    throwIfClosed();

    try (final OperationTimer.TimingContext ignored = metrics.getReadLatency().startTimer()) {
      return Optional.ofNullable(getDB().get(safeColumnHandle(segment), readOptions, key));
    } catch (final RocksDBException e) {
      throw new StorageException(e);
    }
  }

  @Override
  public Optional<NearestKeyValue> getNearestTo(
      final SegmentIdentifier segmentIdentifier, final Bytes key) throws StorageException {

    try (final RocksIterator rocksIterator =
        getDB().newIterator(safeColumnHandle(segmentIdentifier))) {
      rocksIterator.seekForPrev(key.toArrayUnsafe());
      return Optional.of(rocksIterator)
          .filter(AbstractRocksIterator::isValid)
          .map(it -> new NearestKeyValue(Bytes.of(it.key()), Optional.of(it.value())));
    }
  }

  @Override
  public Stream<Pair<byte[], byte[]>> stream(final SegmentIdentifier segmentIdentifier) {
    final RocksIterator rocksIterator = getDB().newIterator(safeColumnHandle(segmentIdentifier));
    rocksIterator.seekToFirst();
    return RocksDbIterator.create(rocksIterator).toStream();
  }

  @Override
  public Stream<Pair<byte[], byte[]>> streamFromKey(
      final SegmentIdentifier segmentIdentifier, final byte[] startKey) {
    final RocksIterator rocksIterator = getDB().newIterator(safeColumnHandle(segmentIdentifier));
    rocksIterator.seek(startKey);
    return RocksDbIterator.create(rocksIterator).toStream();
  }

  @Override
  public Stream<Pair<byte[], byte[]>> streamFromKey(
      final SegmentIdentifier segmentIdentifier, final byte[] startKey, final byte[] endKey) {
    final Bytes endKeyBytes = Bytes.wrap(endKey);
    final RocksIterator rocksIterator = getDB().newIterator(safeColumnHandle(segmentIdentifier));
    rocksIterator.seek(startKey);
    return RocksDbIterator.create(rocksIterator)
        .toStream()
        .takeWhile(e -> endKeyBytes.compareTo(Bytes.wrap(e.getKey())) >= 0);
  }

  @Override
  public Stream<byte[]> streamKeys(final SegmentIdentifier segmentIdentifier) {
    final RocksIterator rocksIterator = getDB().newIterator(safeColumnHandle(segmentIdentifier));
    rocksIterator.seekToFirst();
    return RocksDbIterator.create(rocksIterator).toStreamKeys();
  }

  @Override
  public boolean tryDelete(final SegmentIdentifier segmentIdentifier, final byte[] key) {
    try {
      getDB().delete(safeColumnHandle(segmentIdentifier), tryDeleteOptions, key);
      return true;
    } catch (RocksDBException e) {
      if (e.getStatus().getCode() == Status.Code.Incomplete) {
        return false;
      } else {
        throw new StorageException(e);
      }
    }
  }

  @Override
  public Set<byte[]> getAllKeysThat(
      final SegmentIdentifier segmentIdentifier, final Predicate<byte[]> returnCondition) {
    return stream(segmentIdentifier)
        .filter(pair -> returnCondition.test(pair.getKey()))
        .map(Pair::getKey)
        .collect(toUnmodifiableSet());
  }

  @Override
  public Set<byte[]> getAllValuesFromKeysThat(
      final SegmentIdentifier segmentIdentifier, final Predicate<byte[]> returnCondition) {
    return stream(segmentIdentifier)
        .filter(pair -> returnCondition.test(pair.getKey()))
        .map(Pair::getValue)
        .collect(toUnmodifiableSet());
  }

  @Override
  public void clear(final SegmentIdentifier segmentIdentifier) {
    Optional.ofNullable(columnHandlesBySegmentIdentifier.get(segmentIdentifier))
        .ifPresent(RocksDbSegmentIdentifier::reset);
  }

  @Override
  public void close() {
    if (closed.compareAndSet(false, true)) {
      txOptions.close();
      options.close();
      tryDeleteOptions.close();
      columnHandlesBySegmentIdentifier.values().stream()
          .map(RocksDbSegmentIdentifier::get)
          .forEach(ColumnFamilyHandle::close);
      getDB().close();
    }
  }

  @Override
  public boolean isClosed() {
    return closed.get();
  }

  void throwIfClosed() {
    if (closed.get()) {
      LOG.error("Attempting to use a closed RocksDbKeyValueStorage");
      throw new IllegalStateException("Storage has been closed");
    }
  }

  abstract RocksDB getDB();

  record SegmentRecord(String name, byte[] id) {
    public String forDisplay() {
      return String.format("'%s'(%s)", name, Bytes.of(id).toHexString());
    }
  }
}