AutoTransactionLogBloomCachingService.java

/*
 * Copyright ConsenSys AG.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.ethereum.api.query.cache;

import static org.hyperledger.besu.ethereum.api.query.cache.LogBloomCacheMetadata.DEFAULT_VERSION;

import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AutoTransactionLogBloomCachingService {
  private static final Logger LOG =
      LoggerFactory.getLogger(AutoTransactionLogBloomCachingService.class);
  private final Blockchain blockchain;
  private final TransactionLogBloomCacher transactionLogBloomCacher;
  private OptionalLong blockAddedSubscriptionId = OptionalLong.empty();

  public AutoTransactionLogBloomCachingService(
      final Blockchain blockchain, final TransactionLogBloomCacher transactionLogBloomCacher) {
    this.blockchain = blockchain;
    this.transactionLogBloomCacher = transactionLogBloomCacher;
  }

  public void start() {
    try {
      LOG.info("Starting auto transaction log bloom caching service.");
      final Path cacheDir = transactionLogBloomCacher.getCacheDir();
      if (!cacheDir.toFile().exists() || !cacheDir.toFile().isDirectory()) {
        Files.createDirectory(cacheDir);
      }
      final LogBloomCacheMetadata logBloomCacheMetadata =
          LogBloomCacheMetadata.lookUpFrom(cacheDir);
      if (logBloomCacheMetadata.getVersion() < DEFAULT_VERSION) {
        try (final Stream<Path> walk = Files.walk(cacheDir)) {
          walk.filter(Files::isRegularFile).map(Path::toFile).forEach(File::delete);
        } catch (final Exception e) {
          LOG.error("Failed to update cache {}", e.getMessage());
        }
        new LogBloomCacheMetadata(DEFAULT_VERSION).writeToDirectory(cacheDir);
      }

      blockAddedSubscriptionId =
          OptionalLong.of(
              blockchain.observeBlockAdded(
                  event -> {
                    if (event.isNewCanonicalHead()) {
                      final BlockHeader eventBlockHeader = event.getBlock().getHeader();
                      final Optional<BlockHeader> commonAncestorBlockHeader =
                          blockchain.getBlockHeader(event.getCommonAncestorHash());
                      transactionLogBloomCacher.cacheLogsBloomForBlockHeader(
                          eventBlockHeader, commonAncestorBlockHeader, Optional.empty());
                    }
                  }));

      transactionLogBloomCacher
          .getScheduler()
          .scheduleFutureTask(
              () ->
                  // run long tasks in the computation executor
                  transactionLogBloomCacher
                      .getScheduler()
                      .scheduleComputationTask(
                          () -> {
                            transactionLogBloomCacher.cacheAll();
                            return null;
                          }),
              Duration.ofMinutes(1));
    } catch (final IOException e) {
      LOG.error("Unhandled caching exception.", e);
    }
  }

  public void stop() {
    LOG.info("Shutting down Auto transaction logs caching service.");
    blockAddedSubscriptionId.ifPresent(blockchain::removeObserver);
  }
}