FilterManager.java

/*
 * Copyright ConsenSys AG.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.ethereum.api.jsonrpc.internal.filter;

import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toUnmodifiableList;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.parameters.BlockParameter;
import org.hyperledger.besu.ethereum.api.query.BlockchainQueries;
import org.hyperledger.besu.ethereum.api.query.LogsQuery;
import org.hyperledger.besu.ethereum.api.query.PrivacyQueries;
import org.hyperledger.besu.ethereum.chain.BlockAddedEvent;
import org.hyperledger.besu.ethereum.core.LogWithMetadata;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.privacy.PrivateTransactionEvent;
import org.hyperledger.besu.ethereum.privacy.PrivateTransactionObserver;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;

import com.google.common.annotations.VisibleForTesting;
import io.vertx.core.AbstractVerticle;

/** Manages JSON-RPC filter events. */
public class FilterManager extends AbstractVerticle implements PrivateTransactionObserver {

  private static final int FILTER_TIMEOUT_CHECK_TIMER = 10000;

  private final FilterIdGenerator filterIdGenerator;
  private final FilterRepository filterRepository;
  private final BlockchainQueries blockchainQueries;
  private final Optional<PrivacyQueries> privacyQueries;
  private final List<PrivateTransactionEvent> removalEvents;

  FilterManager(
      final BlockchainQueries blockchainQueries,
      final TransactionPool transactionPool,
      final Optional<PrivacyQueries> privacyQueries,
      final FilterIdGenerator filterIdGenerator,
      final FilterRepository filterRepository) {
    this.filterIdGenerator = filterIdGenerator;
    this.filterRepository = filterRepository;
    checkNotNull(blockchainQueries.getBlockchain());
    blockchainQueries.getBlockchain().observeBlockAdded(this::recordBlockEvent);
    transactionPool.subscribePendingTransactions(this::recordPendingTransactionEvent);
    this.blockchainQueries = blockchainQueries;
    this.privacyQueries = privacyQueries;
    this.removalEvents = new ArrayList<>();
  }

  @Override
  public void start() {
    startFilterTimeoutTimer();
  }

  @Override
  public void stop() {
    filterRepository.deleteAll();
  }

  private void startFilterTimeoutTimer() {
    vertx.setPeriodic(
        FILTER_TIMEOUT_CHECK_TIMER,
        timerId ->
            vertx.executeBlocking(
                future -> new FilterTimeoutMonitor(filterRepository).checkFilters(), result -> {}));
  }

  /**
   * Installs a new block filter
   *
   * @return the block filter id
   */
  public String installBlockFilter() {
    final String filterId = filterIdGenerator.nextId();
    filterRepository.save(new BlockFilter(filterId));
    return filterId;
  }

  /**
   * Installs a pending transaction filter
   *
   * @return the transaction filter id
   */
  public String installPendingTransactionFilter() {
    final String filterId = filterIdGenerator.nextId();
    filterRepository.save(new PendingTransactionFilter(filterId));
    return filterId;
  }

  /**
   * Installs a new log filter
   *
   * @param fromBlock {@link BlockParameter} Integer block number, or latest/pending/earliest.
   * @param toBlock {@link BlockParameter} Integer block number, or latest/pending/earliest.
   * @param logsQuery {@link LogsQuery} Addresses and/or topics to filter by
   * @return the log filter id
   */
  public String installLogFilter(
      final BlockParameter fromBlock, final BlockParameter toBlock, final LogsQuery logsQuery) {
    final String filterId = filterIdGenerator.nextId();
    filterRepository.save(new LogFilter(filterId, fromBlock, toBlock, logsQuery));
    return filterId;
  }

  /**
   * Installs a new private log filter
   *
   * @param privacyGroupId String privacyGroupId
   * @param privacyUserId String privacyUserId of user creating the filter
   * @param fromBlock {@link BlockParameter} Integer block number, or latest/pending/earliest.
   * @param toBlock {@link BlockParameter} Integer block number, or latest/pending/earliest.
   * @param logsQuery {@link LogsQuery} Addresses and/or topics to filter by
   * @return the log filter id
   */
  public String installPrivateLogFilter(
      final String privacyGroupId,
      final String privacyUserId,
      final BlockParameter fromBlock,
      final BlockParameter toBlock,
      final LogsQuery logsQuery) {
    final String filterId = filterIdGenerator.nextId();
    filterRepository.save(
        new PrivateLogFilter(
            filterId, privacyGroupId, privacyUserId, fromBlock, toBlock, logsQuery));
    return filterId;
  }

  /**
   * Uninstalls the specified filter.
   *
   * @param filterId the id of the filter to remove
   * @return {@code true} if the filter was successfully removed; otherwise {@code false}
   */
  public boolean uninstallFilter(final String filterId) {
    if (filterRepository.exists(filterId)) {
      filterRepository.delete(filterId);
      return true;
    } else {
      return false;
    }
  }

  public void recordBlockEvent(final BlockAddedEvent event) {
    final Hash blockHash = event.getBlock().getHash();
    final Collection<BlockFilter> blockFilters =
        filterRepository.getFiltersOfType(BlockFilter.class);
    blockFilters.forEach(
        filter -> {
          synchronized (filter) {
            filter.addBlockHash(blockHash);
          }
        });

    removalEvents.stream().forEach(removalEvent -> processRemovalEvent(removalEvent));
    removalEvents.clear();

    final List<LogWithMetadata> logsWithMetadata = event.getLogsWithMetadata();
    filterRepository.getFiltersOfType(LogFilter.class).stream()
        .filter(
            // Only keep filters where the "to" block could include the block in the event
            filter -> {
              final Optional<Long> maybeToBlockNumber = filter.getToBlock().getNumber();
              return maybeToBlockNumber.isEmpty()
                  || maybeToBlockNumber.get() >= event.getBlock().getHeader().getNumber();
            })
        .forEach(
            filter -> {
              final LogsQuery logsQuery = filter.getLogsQuery();
              filter.addLogs(
                  // We need to use privacy queries for private log filters but for regular
                  // log filters we already have all the info in the event
                  filter instanceof PrivateLogFilter
                      ? privacyQueries
                          .map(
                              pq ->
                                  pq.matchingLogs(
                                      ((PrivateLogFilter) filter).getPrivacyGroupId(),
                                      blockHash,
                                      logsQuery))
                          .orElse(emptyList())
                      : logsWithMetadata.stream()
                          .filter(logsQuery::matches)
                          .collect(toUnmodifiableList()));
            });
  }

  @Override
  public void onPrivateTransactionProcessed(final PrivateTransactionEvent event) {
    // the list will be processed at the end of the block
    removalEvents.add(event);
  }

  @VisibleForTesting
  void recordPendingTransactionEvent(final Transaction transaction) {
    final Collection<PendingTransactionFilter> pendingTransactionFilters =
        filterRepository.getFiltersOfType(PendingTransactionFilter.class);
    if (pendingTransactionFilters.isEmpty()) {
      return;
    }

    pendingTransactionFilters.forEach(
        filter -> {
          synchronized (filter) {
            filter.addTransactionHash(transaction.getHash());
          }
        });
  }

  @VisibleForTesting
  void processRemovalEvent(final PrivateTransactionEvent event) {
    // when user removed from privacy group, remove all filters created by that user in that group
    filterRepository.getFiltersOfType(PrivateLogFilter.class).stream()
        .filter(
            privateLogFilter ->
                privateLogFilter.getPrivacyGroupId().equals(event.getPrivacyGroupId())
                    && privateLogFilter.getPrivacyUserId().equals(event.getPrivacyUserId()))
        .forEach(
            privateLogFilter -> {
              uninstallFilter(privateLogFilter.getId());
            });
  }

  /**
   * Gets the new block hashes that have occurred since the filter was last checked.
   *
   * @param filterId the id of the filter to get the new blocks for
   * @return the new block hashes that have occurred since the filter was last checked
   */
  public List<Hash> blockChanges(final String filterId) {
    final BlockFilter filter = filterRepository.getFilter(filterId, BlockFilter.class).orElse(null);
    if (filter == null) {
      return null;
    }

    final List<Hash> hashes;
    synchronized (filter) {
      hashes = new ArrayList<>(filter.blockHashes());
      filter.clearBlockHashes();
      filter.resetExpireTime();
    }
    return hashes;
  }

  /**
   * Gets the pending transactions that have occurred since the filter was last checked.
   *
   * @param filterId the id of the filter to get the pending transactions for
   * @return the new pending transaction hashes that have occurred since the filter was last checked
   */
  public List<Hash> pendingTransactionChanges(final String filterId) {
    final PendingTransactionFilter filter =
        filterRepository.getFilter(filterId, PendingTransactionFilter.class).orElse(null);
    if (filter == null) {
      return null;
    }

    final List<Hash> hashes;
    synchronized (filter) {
      hashes = new ArrayList<>(filter.transactionHashes());
      filter.clearTransactionHashes();
      filter.resetExpireTime();
    }
    return hashes;
  }

  public List<LogWithMetadata> logsChanges(final String filterId) {
    final LogFilter filter = filterRepository.getFilter(filterId, LogFilter.class).orElse(null);
    if (filter == null) {
      return null;
    }

    final List<LogWithMetadata> logs;
    synchronized (filter) {
      logs = new ArrayList<>(filter.logs());
      filter.clearLogs();
      filter.resetExpireTime();
    }
    return logs;
  }

  public List<LogWithMetadata> logs(final String filterId) {
    final LogFilter filter = filterRepository.getFilter(filterId, LogFilter.class).orElse(null);
    if (filter == null) {
      return null;
    } else {
      filter.resetExpireTime();
    }

    final long fromBlockNumber =
        filter.getFromBlock().getNumber().orElse(blockchainQueries.headBlockNumber());
    final long toBlockNumber =
        filter.getToBlock().getNumber().orElse(blockchainQueries.headBlockNumber());

    return findLogsWithinRange(filter, fromBlockNumber, toBlockNumber);
  }

  private List<LogWithMetadata> findLogsWithinRange(
      final LogFilter filter, final long fromBlockNumber, final long toBlockNumber) {
    if (filter instanceof PrivateLogFilter) {
      return privacyQueries
          .map(
              pq ->
                  pq.matchingLogs(
                      ((PrivateLogFilter) filter).getPrivacyGroupId(),
                      fromBlockNumber,
                      toBlockNumber,
                      filter.getLogsQuery()))
          .orElse(emptyList());
    } else {
      return blockchainQueries.matchingLogs(
          fromBlockNumber, toBlockNumber, filter.getLogsQuery(), () -> true);
    }
  }
}