TransactionLogBloomCacher.java

/*
 * Copyright ConsenSys AG.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 *
 */

package org.hyperledger.besu.ethereum.api.query.cache;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.ProcessableBlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import com.fasterxml.jackson.annotation.JsonGetter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionLogBloomCacher {

  private static final Logger LOG = LoggerFactory.getLogger(TransactionLogBloomCacher.class);
  private static final String NO_SPACE_LEFT_ON_DEVICE = "No space left on device";

  public static final int BLOCKS_PER_BLOOM_CACHE = 100_000;
  public static final int BLOOM_BITS_LENGTH = 256;
  private static final int EXPECTED_BLOOM_FILE_SIZE = BLOCKS_PER_BLOOM_CACHE * BLOOM_BITS_LENGTH;
  public static final String CURRENT = "current";
  private final Map<Long, Boolean> cachedSegments;

  private final Lock submissionLock = new ReentrantLock();

  private final EthScheduler scheduler;
  private final Blockchain blockchain;

  private final Path cacheDir;

  private final CachingStatus cachingStatus = new CachingStatus();

  public TransactionLogBloomCacher(
      final Blockchain blockchain, final Path cacheDir, final EthScheduler scheduler) {
    this.blockchain = blockchain;
    this.cacheDir = cacheDir;
    this.scheduler = scheduler;
    this.cachedSegments = new TreeMap<>();
  }

  public CachingStatus getCachingStatus() {
    return cachingStatus;
  }

  void cacheAll() {
    ensurePreviousSegmentsArePresent(blockchain.getChainHeadBlockNumber(), false);
  }

  private static File calculateCacheFileName(final String name, final Path cacheDir) {
    return cacheDir.resolve("logBloom-" + name + ".cache").toFile();
  }

  private static File calculateCacheFileName(final long blockNumber, final Path cacheDir) {
    return calculateCacheFileName(Long.toString(blockNumber / BLOCKS_PER_BLOOM_CACHE), cacheDir);
  }

  public CachingStatus generateLogBloomCache(final long start, final long stop) {
    checkArgument(
        start % BLOCKS_PER_BLOOM_CACHE == 0, "Start block must be at the beginning of a file");

    if (!cachingStatus.isCaching()) {
      try {
        cachingStatus.cachingCount.incrementAndGet();
        LOG.debug(
            "Generating transaction log bloom cache from block {} to block {} in {}",
            start,
            stop,
            cacheDir);
        if (!Files.isDirectory(cacheDir) && !cacheDir.toFile().mkdirs()) {
          LOG.error("Cache directory '{}' does not exist and could not be made.", cacheDir);
          return cachingStatus;
        }
        for (long blockNum = start; blockNum < stop; blockNum += BLOCKS_PER_BLOOM_CACHE) {
          LOG.trace("Caching segment at {}", blockNum);
          final File cacheFile = calculateCacheFileName(blockNum, cacheDir);
          blockchain
              .getBlockHeader(blockNum)
              .ifPresent(
                  blockHeader ->
                      cacheLogsBloomForBlockHeader(
                          blockHeader, Optional.empty(), Optional.of(cacheFile)));
          fillCacheFile(blockNum, blockNum + BLOCKS_PER_BLOOM_CACHE, cacheFile);
        }
      } catch (final Exception e) {
        LOG.error("Unhandled caching exception", e);
      } finally {
        cachingStatus.cachingCount.decrementAndGet();
        LOG.trace("Caching request complete");
      }
    }

    return cachingStatus;
  }

  private void fillCacheFile(final long startBlock, final long stopBlock, final File currentFile)
      throws IOException {
    long blockNum = startBlock;
    try (final OutputStream out = new FileOutputStream(currentFile)) {
      while (blockNum < stopBlock) {
        final Optional<BlockHeader> maybeHeader = blockchain.getBlockHeader(blockNum);
        if (maybeHeader.isEmpty()) {
          break;
        }
        fillCacheFileWithBlock(maybeHeader.get(), out);
        cachingStatus.currentBlock = blockNum;
        blockNum++;
      }
    } catch (final IOException e) {
      if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) {
        LOG.error(e.getMessage());
        System.exit(0);
      }
      throw e;
    }
  }

  void cacheLogsBloomForBlockHeader(
      final BlockHeader blockHeader,
      final Optional<BlockHeader> commonAncestorBlockHeader,
      final Optional<File> reusedCacheFile) {
    try {
      if (cachingStatus.cachingCount.incrementAndGet() != 1) {
        return;
      }
      final long blockNumber = blockHeader.getNumber();
      LOG.atTrace()
          .setMessage("Caching logs bloom for block {}")
          .addArgument(() -> "0x" + Long.toHexString(blockNumber))
          .log();
      final File cacheFile = reusedCacheFile.orElse(calculateCacheFileName(blockNumber, cacheDir));
      if (cacheFile.exists()) {
        try {
          final Optional<Long> ancestorBlockNumber =
              commonAncestorBlockHeader.map(ProcessableBlockHeader::getNumber);
          if (ancestorBlockNumber.isPresent()) {
            // walk through the blocks from the common ancestor to the received block in order to
            // reload the cache in case of reorg
            for (long number = ancestorBlockNumber.get() + 1;
                number < blockHeader.getNumber();
                number++) {
              final Optional<BlockHeader> ancestorBlockHeader = blockchain.getBlockHeader(number);
              if (ancestorBlockHeader.isPresent()) {
                cacheSingleBlock(ancestorBlockHeader.get(), cacheFile, true);
              }
            }
          }
          cacheSingleBlock(blockHeader, cacheFile, true);
        } catch (final InvalidCacheException e) {
          populateLatestSegment(blockNumber);
        }
      } else {
        populateLatestSegment(blockNumber);
      }
    } catch (final IOException e) {
      if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) {
        LOG.error(e.getMessage());
        System.exit(0);
      }
      LOG.error("Unhandled caching exception.", e);
    } finally {
      cachingStatus.cachingCount.decrementAndGet();
    }
  }

  private void cacheSingleBlock(
      final BlockHeader blockHeader, final File cacheFile, final boolean isCheckSizeNeeded)
      throws IOException, InvalidCacheException {
    try (final RandomAccessFile writer = new RandomAccessFile(cacheFile, "rw")) {

      final long nbCachedBlocks = cacheFile.length() / BLOOM_BITS_LENGTH;
      final long blockIndex = (blockHeader.getNumber() % BLOCKS_PER_BLOOM_CACHE);
      final long offset = blockIndex * BLOOM_BITS_LENGTH;
      if (isCheckSizeNeeded && blockIndex > nbCachedBlocks) {
        throw new InvalidCacheException();
      }
      writer.seek(offset);
      writer.write(ensureBloomBitsAreCorrectLength(blockHeader.getLogsBloom().toArray()));

      // remove invalid logs when there was a reorg
      final long validCacheSize = offset + BLOOM_BITS_LENGTH;

      if (writer.length() > validCacheSize) {
        writer.setLength(validCacheSize);
      }
    }
  }

  private boolean populateLatestSegment(final long eventBlockNumber) {
    try {
      final File currentFile = calculateCacheFileName(CURRENT, cacheDir);
      final long segmentNumber = eventBlockNumber / BLOCKS_PER_BLOOM_CACHE;
      long blockNumber =
          Math.min((segmentNumber + 1) * BLOCKS_PER_BLOOM_CACHE - 1, eventBlockNumber);
      fillCacheFile(segmentNumber * BLOCKS_PER_BLOOM_CACHE, blockNumber, currentFile);
      while (blockNumber <= eventBlockNumber && (blockNumber % BLOCKS_PER_BLOOM_CACHE != 0)) {
        Optional<BlockHeader> blockHeader = blockchain.getBlockHeader(blockNumber);
        if (blockHeader.isPresent()) {
          cacheSingleBlock(blockHeader.get(), currentFile, false);
        }
        blockNumber++;
      }
      Files.move(
          currentFile.toPath(),
          calculateCacheFileName(blockNumber, cacheDir).toPath(),
          StandardCopyOption.REPLACE_EXISTING,
          StandardCopyOption.ATOMIC_MOVE);
      return true;
    } catch (final IOException | InvalidCacheException e) {
      LOG.error("Unhandled caching exception.", e);
    }

    return false;
  }

  public void removeSegments(final Long startBlock, final Long stopBlock) {
    if (!cachingStatus.isCaching()) {
      LOG.info(
          "Deleting transaction log bloom cache from block {} to block {} in {}",
          startBlock,
          stopBlock,
          cacheDir);

      for (long blockNum = startBlock; blockNum <= stopBlock; blockNum += BLOCKS_PER_BLOOM_CACHE) {
        try {
          final long segmentNumber = blockNum / BLOCKS_PER_BLOOM_CACHE;
          final long fromBlock = segmentNumber * BLOCKS_PER_BLOOM_CACHE;
          final File cacheFile = calculateCacheFileName(fromBlock, cacheDir);
          cachedSegments.remove(segmentNumber);
          if (Files.deleteIfExists(cacheFile.toPath())) {
            LOG.info(
                "Deleted transaction log bloom cache file: {}/{}", cacheDir, cacheFile.getName());
          } else {
            LOG.info(
                "Unable to delete transaction log bloom cache file: {}/{}",
                cacheDir,
                cacheFile.getName());
          }
        } catch (final IOException e) {
          if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) {
            LOG.error(e.getMessage());
            System.exit(0);
          }
          LOG.error(
              String.format("Unhandled exception removing cache for block number %d", blockNum), e);
        }
      }
    }
  }

  public void ensurePreviousSegmentsArePresent(
      final long blockNumber, final boolean overrideCacheCheck) {
    if (!cachingStatus.isCaching()) {
      scheduler.scheduleFutureTask(
          () ->
              scheduler.scheduleComputationTask(
                  () -> {
                    long currentSegment = (blockNumber / BLOCKS_PER_BLOOM_CACHE) - 1;
                    while (currentSegment >= 0) {
                      try {
                        if (overrideCacheCheck
                            || !cachedSegments.getOrDefault(currentSegment, false)) {
                          final long startBlock = currentSegment * BLOCKS_PER_BLOOM_CACHE;
                          final File cacheFile = calculateCacheFileName(startBlock, cacheDir);
                          if (overrideCacheCheck
                              || !cacheFile.isFile()
                              || cacheFile.length() != EXPECTED_BLOOM_FILE_SIZE) {
                            generateLogBloomCache(startBlock, startBlock + BLOCKS_PER_BLOOM_CACHE);
                          }
                          cachedSegments.put(currentSegment, true);
                        }
                      } finally {
                        currentSegment--;
                      }
                    }
                    return null;
                  }),
          Duration.ofSeconds(1));
    }
  }

  private void fillCacheFileWithBlock(final BlockHeader blockHeader, final OutputStream fos)
      throws IOException {
    fos.write(ensureBloomBitsAreCorrectLength(blockHeader.getLogsBloom().toArray()));
  }

  private byte[] ensureBloomBitsAreCorrectLength(final byte[] logs) {
    checkNotNull(logs);
    checkState(logs.length == BLOOM_BITS_LENGTH, "BloomBits are not the correct length");
    return logs;
  }

  public CachingStatus requestCaching(final long fromBlock, final long toBlock) {
    boolean requestAccepted = false;
    try {
      if ((fromBlock < toBlock) && submissionLock.tryLock(100, TimeUnit.MILLISECONDS)) {
        try {
          if (!cachingStatus.isCaching()) {
            requestAccepted = true;
            cachingStatus.startBlock = fromBlock;
            cachingStatus.endBlock = toBlock;
            scheduler.scheduleComputationTask(
                () ->
                    generateLogBloomCache(
                        fromBlock - (fromBlock % BLOCKS_PER_BLOOM_CACHE), toBlock));
          }
        } finally {
          submissionLock.unlock();
        }
      }
    } catch (final InterruptedException e) {
      // ignore
    }
    cachingStatus.requestAccepted = requestAccepted;
    return cachingStatus;
  }

  EthScheduler getScheduler() {
    return scheduler;
  }

  Path getCacheDir() {
    return cacheDir;
  }

  public static final class CachingStatus {
    long startBlock;
    long endBlock;
    volatile long currentBlock;
    AtomicInteger cachingCount = new AtomicInteger(0);
    boolean requestAccepted;

    @JsonGetter
    public String getStartBlock() {
      return "0x" + Long.toHexString(startBlock);
    }

    @JsonGetter
    public String getEndBlock() {
      return endBlock == Long.MAX_VALUE ? "latest" : "0x" + Long.toHexString(endBlock);
    }

    @JsonGetter
    public String getCurrentBlock() {
      return "0x" + Long.toHexString(currentBlock);
    }

    @JsonGetter
    public boolean isCaching() {
      return cachingCount.get() > 0;
    }

    @JsonGetter
    public boolean isRequestAccepted() {
      return requestAccepted;
    }
  }

  public static class InvalidCacheException extends Exception {}
}