RocksDBKeyValueStorageFactory.java

/*
 * Copyright Hyperledger Besu Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.plugin.services.storage.rocksdb;

import static org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.BaseVersionedStorageFormat.BONSAI_WITH_RECEIPT_COMPACTION;
import static org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.BaseVersionedStorageFormat.BONSAI_WITH_VARIABLES;
import static org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.BaseVersionedStorageFormat.FOREST_WITH_RECEIPT_COMPACTION;
import static org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.BaseVersionedStorageFormat.FOREST_WITH_VARIABLES;

import org.hyperledger.besu.plugin.services.BesuConfiguration;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageFactory;
import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.BaseVersionedStorageFormat;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.DatabaseMetadata;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfigurationBuilder;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBFactoryConfiguration;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.VersionedStorageFormat;
import org.hyperledger.besu.plugin.services.storage.rocksdb.segmented.OptimisticRocksDBColumnarKeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.rocksdb.segmented.RocksDBColumnarKeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.rocksdb.segmented.TransactionDBRocksDBColumnarKeyValueStorage;
import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorageAdapter;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The Rocks db key value storage factory creates segmented storage and uses a adapter to support
 * unsegmented keyvalue storage.
 */
public class RocksDBKeyValueStorageFactory implements KeyValueStorageFactory {

  private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyValueStorageFactory.class);
  private static final EnumSet<BaseVersionedStorageFormat> SUPPORTED_VERSIONED_FORMATS =
      EnumSet.of(
          FOREST_WITH_VARIABLES,
          FOREST_WITH_RECEIPT_COMPACTION,
          BONSAI_WITH_VARIABLES,
          BONSAI_WITH_RECEIPT_COMPACTION);
  private static final String NAME = "rocksdb";
  private final RocksDBMetricsFactory rocksDBMetricsFactory;
  private DatabaseMetadata databaseMetadata;
  private RocksDBColumnarKeyValueStorage segmentedStorage;
  private RocksDBConfiguration rocksDBConfiguration;

  private final Supplier<RocksDBFactoryConfiguration> configuration;
  private final List<SegmentIdentifier> configuredSegments;
  private final List<SegmentIdentifier> ignorableSegments;

  /**
   * Instantiates a new RocksDb key value storage factory.
   *
   * @param configuration the configuration
   * @param configuredSegments the segments
   * @param ignorableSegments the ignorable segments
   * @param rocksDBMetricsFactory the rocks db metrics factory
   */
  public RocksDBKeyValueStorageFactory(
      final Supplier<RocksDBFactoryConfiguration> configuration,
      final List<SegmentIdentifier> configuredSegments,
      final List<SegmentIdentifier> ignorableSegments,
      final RocksDBMetricsFactory rocksDBMetricsFactory) {
    this.configuration = configuration;
    this.configuredSegments = configuredSegments;
    this.ignorableSegments = ignorableSegments;
    this.rocksDBMetricsFactory = rocksDBMetricsFactory;
  }

  /**
   * Instantiates a new RocksDb key value storage factory.
   *
   * @param configuration the configuration
   * @param configuredSegments the segments
   * @param rocksDBMetricsFactory the rocks db metrics factory
   */
  public RocksDBKeyValueStorageFactory(
      final Supplier<RocksDBFactoryConfiguration> configuration,
      final List<SegmentIdentifier> configuredSegments,
      final RocksDBMetricsFactory rocksDBMetricsFactory) {
    this(configuration, configuredSegments, List.of(), rocksDBMetricsFactory);
  }

  @Override
  public String getName() {
    return NAME;
  }

  @Override
  public KeyValueStorage create(
      final SegmentIdentifier segment,
      final BesuConfiguration commonConfiguration,
      final MetricsSystem metricsSystem)
      throws StorageException {
    return new SegmentedKeyValueStorageAdapter(
        segment, create(List.of(segment), commonConfiguration, metricsSystem));
  }

  @Override
  public SegmentedKeyValueStorage create(
      final List<SegmentIdentifier> segments,
      final BesuConfiguration commonConfiguration,
      final MetricsSystem metricsSystem)
      throws StorageException {
    if (requiresInit()) {
      init(commonConfiguration);
    }

    // safety check to see that segments all exist within configured segments
    if (!configuredSegments.containsAll(segments)) {
      throw new StorageException(
          "Attempted to create storage for segments that are not configured: "
              + segments.stream()
                  .filter(segment -> !configuredSegments.contains(segment))
                  .map(SegmentIdentifier::toString)
                  .collect(Collectors.joining(", ")));
    }

    if (segmentedStorage == null) {
      final List<SegmentIdentifier> segmentsForFormat =
          configuredSegments.stream()
              .filter(
                  segmentId ->
                      segmentId.includeInDatabaseFormat(
                          databaseMetadata.getVersionedStorageFormat().getFormat()))
              .toList();

      // It's probably a good idea for the creation logic to be entirely dependent on the database
      // version. Introducing intermediate booleans that represent database properties and
      // dispatching
      // creation logic based on them is error-prone.
      switch (databaseMetadata.getVersionedStorageFormat().getFormat()) {
        case FOREST -> {
          LOG.debug("FOREST mode detected, using TransactionDB.");
          segmentedStorage =
              new TransactionDBRocksDBColumnarKeyValueStorage(
                  rocksDBConfiguration,
                  segmentsForFormat,
                  ignorableSegments,
                  metricsSystem,
                  rocksDBMetricsFactory);
        }
        case BONSAI -> {
          LOG.debug("BONSAI mode detected, Using OptimisticTransactionDB.");
          segmentedStorage =
              new OptimisticRocksDBColumnarKeyValueStorage(
                  rocksDBConfiguration,
                  segmentsForFormat,
                  ignorableSegments,
                  metricsSystem,
                  rocksDBMetricsFactory);
        }
      }
    }
    return segmentedStorage;
  }

  /**
   * Storage path.
   *
   * @param commonConfiguration the common configuration
   * @return the path
   */
  protected Path storagePath(final BesuConfiguration commonConfiguration) {
    return commonConfiguration.getStoragePath();
  }

  private void init(final BesuConfiguration commonConfiguration) {
    try {
      databaseMetadata = readDatabaseMetadata(commonConfiguration);
    } catch (final IOException e) {
      final String message =
          "Failed to retrieve the RocksDB database meta version: "
              + e.getMessage()
              + " could not be found. You may not have the appropriate permission to access the item.";
      throw new StorageException(message, e);
    }
    rocksDBConfiguration =
        RocksDBConfigurationBuilder.from(configuration.get())
            .databaseDir(storagePath(commonConfiguration))
            .build();
  }

  private boolean requiresInit() {
    return segmentedStorage == null;
  }

  private DatabaseMetadata readDatabaseMetadata(final BesuConfiguration commonConfiguration)
      throws IOException {
    final Path dataDir = commonConfiguration.getDataPath();
    final boolean dataDirExists = dataDir.toFile().exists();
    final boolean databaseExists = commonConfiguration.getStoragePath().toFile().exists();
    final boolean metadataExists = DatabaseMetadata.isPresent(dataDir);
    DatabaseMetadata metadata;
    if (databaseExists && !metadataExists) {
      throw new StorageException(
          "Database exists but metadata file not found, without it there is no safe way to open the database");
    }
    if (metadataExists) {
      metadata = DatabaseMetadata.lookUpFrom(dataDir);

      if (!metadata
          .getVersionedStorageFormat()
          .getFormat()
          .equals(commonConfiguration.getDataStorageConfiguration().getDatabaseFormat())) {
        handleFormatMismatch(commonConfiguration, dataDir, metadata);
      }

      final var runtimeVersion =
          BaseVersionedStorageFormat.defaultForNewDB(
              commonConfiguration.getDataStorageConfiguration());

      if (metadata.getVersionedStorageFormat().getVersion() > runtimeVersion.getVersion()) {
        final var maybeDowngradedMetadata =
            handleVersionDowngrade(dataDir, metadata, runtimeVersion);
        if (maybeDowngradedMetadata.isPresent()) {
          metadata = maybeDowngradedMetadata.get();
          metadata.writeToDirectory(dataDir);
        }
      }

      if (metadata.getVersionedStorageFormat().getVersion() < runtimeVersion.getVersion()) {
        final var maybeUpgradedMetadata = handleVersionUpgrade(dataDir, metadata, runtimeVersion);
        if (maybeUpgradedMetadata.isPresent()) {
          metadata = maybeUpgradedMetadata.get();
          metadata.writeToDirectory(dataDir);
        }
      }

      LOG.info("Existing database at {}. Metadata {}. Processing WAL...", dataDir, metadata);
    } else {

      metadata = DatabaseMetadata.defaultForNewDb(commonConfiguration);
      LOG.info(
          "No existing database at {}. Using default metadata for new db {}", dataDir, metadata);
      if (!dataDirExists) {
        Files.createDirectories(dataDir);
      }
      metadata.writeToDirectory(dataDir);
    }

    if (!isSupportedVersionedFormat(metadata.getVersionedStorageFormat())) {
      final String message = "Unsupported RocksDB metadata: " + metadata;
      LOG.error(message);
      throw new StorageException(message);
    }

    return metadata;
  }

  private static void handleFormatMismatch(
      final BesuConfiguration commonConfiguration,
      final Path dataDir,
      final DatabaseMetadata existingMetadata) {
    String error =
        String.format(
            "Database format mismatch: DB at %s is %s but config expects %s. "
                + "Please check your config.",
            dataDir,
            existingMetadata.getVersionedStorageFormat().getFormat().name(),
            commonConfiguration.getDataStorageConfiguration().getDatabaseFormat());

    throw new StorageException(error);
  }

  private Optional<DatabaseMetadata> handleVersionDowngrade(
      final Path dataDir,
      final DatabaseMetadata existingMetadata,
      final BaseVersionedStorageFormat runtimeVersion) {
    // here we put the code, or the messages, to perform an automated, or manual, downgrade of the
    // database, if supported, otherwise we just prevent Besu from starting since it will not
    // recognize the newer version.
    // In case we do an automated downgrade, then we also need to update the metadata on disk to
    // reflect the change to the runtime version, and return it.

    // Besu supports both formats of receipts so no downgrade is needed
    if (runtimeVersion == BONSAI_WITH_VARIABLES || runtimeVersion == FOREST_WITH_VARIABLES) {
      LOG.warn(
          "Database contains compacted receipts but receipt compaction is not enabled, new receipts  will "
              + "be not stored in the compacted format. If you want to remove compacted receipts from the "
              + "database it is necessary to resync Besu. Besu can support both compacted and non-compacted receipts.");
      return Optional.empty();
    }

    // for the moment there are supported automated downgrades, so we just fail.
    String error =
        String.format(
            "Database unsafe downgrade detect: DB at %s is %s with version %s but version %s is expected. "
                + "Please check your config and review release notes for supported downgrade procedures.",
            dataDir,
            existingMetadata.getVersionedStorageFormat().getFormat().name(),
            existingMetadata.getVersionedStorageFormat().getVersion(),
            runtimeVersion.getVersion());

    throw new StorageException(error);
  }

  private Optional<DatabaseMetadata> handleVersionUpgrade(
      final Path dataDir,
      final DatabaseMetadata existingMetadata,
      final BaseVersionedStorageFormat runtimeVersion) {
    // here we put the code, or the messages, to perform an automated, or manual, upgrade of the
    // database.
    // In case we do an automated upgrade, then we also need to update the metadata on disk to
    // reflect the change to the runtime version, and return it.

    // Besu supports both formats of receipts so no upgrade is needed other than updating metadata
    if (runtimeVersion == BONSAI_WITH_RECEIPT_COMPACTION
        || runtimeVersion == FOREST_WITH_RECEIPT_COMPACTION) {
      final DatabaseMetadata metadata = new DatabaseMetadata(runtimeVersion);
      try {
        metadata.writeToDirectory(dataDir);
        return Optional.of(metadata);
      } catch (IOException e) {
        throw new StorageException("Database upgrade to use receipt compaction failed", e);
      }
    }

    // for the moment there are no planned automated upgrades, so we just fail.
    String error =
        String.format(
            "Database unsafe downgrade detect: DB at %s is %s with version %s but version %s is expected. "
                + "Please check your config and review release notes for supported downgrade procedures.",
            dataDir,
            existingMetadata.getVersionedStorageFormat().getFormat().name(),
            existingMetadata.getVersionedStorageFormat().getVersion(),
            runtimeVersion.getVersion());

    throw new StorageException(error);
  }

  private boolean isSupportedVersionedFormat(final VersionedStorageFormat versionedStorageFormat) {
    return SUPPORTED_VERSIONED_FORMATS.stream()
        .anyMatch(
            vsf ->
                vsf.getFormat().equals(versionedStorageFormat.getFormat())
                    && vsf.getVersion() == versionedStorageFormat.getVersion());
  }

  @Override
  public void close() throws IOException {
    if (segmentedStorage != null) {
      segmentedStorage.close();
    }
  }

  @Override
  public boolean isSegmentIsolationSupported() {
    return true;
  }

  @Override
  public boolean isSnapshotIsolationSupported() {
    return true;
  }
}