RocksDBSnapshotTransaction.java

/*
 * Copyright Hyperledger Besu Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 *
 */
package org.hyperledger.besu.plugin.services.storage.rocksdb.segmented;

import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorageTransaction;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetrics;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbIterator;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Stream;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.tuweni.bytes.Bytes;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Transaction;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** The Rocks db snapshot transaction. */
public class RocksDBSnapshotTransaction
    implements SegmentedKeyValueStorageTransaction, AutoCloseable {
  private static final Logger LOG = LoggerFactory.getLogger(RocksDBSnapshotTransaction.class);
  private static final String NO_SPACE_LEFT_ON_DEVICE = "No space left on device";
  private final RocksDBMetrics metrics;
  private final OptimisticTransactionDB db;
  private final Function<SegmentIdentifier, ColumnFamilyHandle> columnFamilyMapper;
  private final Transaction snapTx;
  private final RocksDBSnapshot snapshot;
  private final WriteOptions writeOptions;
  private final ReadOptions readOptions;
  private final AtomicBoolean isClosed = new AtomicBoolean(false);

  /**
   * Instantiates a new RocksDb snapshot transaction.
   *
   * @param db the db
   * @param columnFamilyMapper mapper from segment identifier to column family handle
   * @param metrics the metrics
   */
  RocksDBSnapshotTransaction(
      final OptimisticTransactionDB db,
      final Function<SegmentIdentifier, ColumnFamilyHandle> columnFamilyMapper,
      final RocksDBMetrics metrics) {
    this.metrics = metrics;
    this.db = db;
    this.columnFamilyMapper = columnFamilyMapper;
    this.snapshot = new RocksDBSnapshot(db);
    this.writeOptions = new WriteOptions();
    this.snapTx = db.beginTransaction(writeOptions);
    this.readOptions =
        new ReadOptions().setVerifyChecksums(false).setSnapshot(snapshot.markAndUseSnapshot());
  }

  private RocksDBSnapshotTransaction(
      final OptimisticTransactionDB db,
      final Function<SegmentIdentifier, ColumnFamilyHandle> columnFamilyMapper,
      final RocksDBMetrics metrics,
      final RocksDBSnapshot snapshot,
      final Transaction snapTx,
      final ReadOptions readOptions) {
    this.metrics = metrics;
    this.db = db;
    this.columnFamilyMapper = columnFamilyMapper;
    this.snapshot = snapshot;
    this.writeOptions = new WriteOptions();
    this.readOptions = readOptions;
    this.snapTx = snapTx;
  }

  /**
   * Get data against given key.
   *
   * @param segmentId the segment id
   * @param key the key
   * @return the optional data
   */
  public Optional<byte[]> get(final SegmentIdentifier segmentId, final byte[] key) {
    throwIfClosed();

    try (final OperationTimer.TimingContext ignored = metrics.getReadLatency().startTimer()) {
      return Optional.ofNullable(snapTx.get(columnFamilyMapper.apply(segmentId), readOptions, key));
    } catch (final RocksDBException e) {
      throw new StorageException(e);
    }
  }

  @Override
  public void put(final SegmentIdentifier segmentId, final byte[] key, final byte[] value) {
    throwIfClosed();

    try (final OperationTimer.TimingContext ignored = metrics.getWriteLatency().startTimer()) {
      snapTx.put(columnFamilyMapper.apply(segmentId), key, value);
    } catch (final RocksDBException e) {
      if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) {
        LOG.error(e.getMessage());
        System.exit(0);
      }
      throw new StorageException(e);
    }
  }

  @Override
  public void remove(final SegmentIdentifier segmentId, final byte[] key) {
    throwIfClosed();

    try (final OperationTimer.TimingContext ignored = metrics.getRemoveLatency().startTimer()) {
      snapTx.delete(columnFamilyMapper.apply(segmentId), key);
    } catch (final RocksDBException e) {
      if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) {
        LOG.error(e.getMessage());
        System.exit(0);
      }
      throw new StorageException(e);
    }
  }

  /**
   * get a RocksIterator that reads through the transaction to represent the current state.
   *
   * <p>be sure to close this iterator, like in a try-with-resources block, otherwise a native
   * memory leak might occur.
   *
   * @param segmentId id for the segment to iterate over.
   * @return RocksIterator
   */
  public RocksIterator getIterator(final SegmentIdentifier segmentId) {
    return snapTx.getIterator(readOptions, columnFamilyMapper.apply(segmentId));
  }

  /**
   * Stream.
   *
   * @param segmentId the segment id
   * @return the stream
   */
  public Stream<Pair<byte[], byte[]>> stream(final SegmentIdentifier segmentId) {
    throwIfClosed();

    final RocksIterator rocksIterator =
        db.newIterator(columnFamilyMapper.apply(segmentId), readOptions);
    rocksIterator.seekToFirst();
    return RocksDbIterator.create(rocksIterator).toStream();
  }

  /**
   * Stream keys.
   *
   * @param segmentId the segment id
   * @return the stream
   */
  public Stream<byte[]> streamKeys(final SegmentIdentifier segmentId) {
    throwIfClosed();

    final RocksIterator rocksIterator =
        db.newIterator(columnFamilyMapper.apply(segmentId), readOptions);
    rocksIterator.seekToFirst();
    return RocksDbIterator.create(rocksIterator).toStreamKeys();
  }

  /**
   * Returns a stream of key-value pairs starting from the specified key. This method is used to
   * retrieve a stream of data reading through the transaction, starting from the given key. If no
   * data is available from the specified key onwards, an empty stream is returned.
   *
   * @param segment The segment identifier whose keys we want to stream.
   * @param startKey The key from which the stream should start.
   * @return A stream of key-value pairs starting from the specified key.
   */
  public Stream<Pair<byte[], byte[]>> streamFromKey(
      final SegmentIdentifier segment, final byte[] startKey) {
    throwIfClosed();

    final RocksIterator rocksIterator =
        db.newIterator(columnFamilyMapper.apply(segment), readOptions);
    rocksIterator.seek(startKey);
    return RocksDbIterator.create(rocksIterator).toStream();
  }

  /**
   * Returns a stream of key-value pairs starting from the specified key, ending at the specified
   * key. This method is used to retrieve a stream of data reading through the transaction, starting
   * from the given key. If no data is available from the specified key onwards, an empty stream is
   * returned.
   *
   * @param segment The segment identifier whose keys we want to stream.
   * @param startKey The key from which the stream should start.
   * @param endKey The key at which the stream should stop.
   * @return A stream of key-value pairs starting from the specified key.
   */
  public Stream<Pair<byte[], byte[]>> streamFromKey(
      final SegmentIdentifier segment, final byte[] startKey, final byte[] endKey) {
    throwIfClosed();
    final Bytes endKeyBytes = Bytes.wrap(endKey);

    final RocksIterator rocksIterator =
        db.newIterator(columnFamilyMapper.apply(segment), readOptions);
    rocksIterator.seek(startKey);
    return RocksDbIterator.create(rocksIterator)
        .toStream()
        .takeWhile(e -> endKeyBytes.compareTo(Bytes.wrap(e.getKey())) >= 0);
  }

  @Override
  public void commit() throws StorageException {
    // no-op
  }

  @Override
  public void rollback() {
    throwIfClosed();

    try {
      snapTx.rollback();
      metrics.getRollbackCount().inc();
    } catch (final RocksDBException e) {
      if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) {
        LOG.error(e.getMessage());
        System.exit(0);
      }
      throw new StorageException(e);
    } finally {
      close();
    }
  }

  /**
   * Copy.
   *
   * @return the rocks db snapshot transaction
   */
  public RocksDBSnapshotTransaction copy() {
    throwIfClosed();
    try {
      var copyReadOptions = new ReadOptions().setSnapshot(snapshot.markAndUseSnapshot());
      var copySnapTx = db.beginTransaction(writeOptions);
      copySnapTx.rebuildFromWriteBatch(snapTx.getWriteBatch().getWriteBatch());
      return new RocksDBSnapshotTransaction(
          db, columnFamilyMapper, metrics, snapshot, copySnapTx, copyReadOptions);
    } catch (Exception ex) {
      LOG.error("Failed to copy snapshot transaction", ex);
      snapshot.unMarkSnapshot();
      throw new StorageException(ex);
    }
  }

  @Override
  public void close() {
    snapTx.close();
    writeOptions.close();
    readOptions.close();
    snapshot.unMarkSnapshot();
    isClosed.set(true);
  }

  private void throwIfClosed() {
    if (isClosed.get()) {
      LOG.error("Attempting to use a closed RocksDBSnapshotTransaction");
      throw new StorageException("Storage has already been closed");
    }
  }
}