TraceFilter.java

/*
 * Copyright ConsenSys AG.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods;

import static org.hyperledger.besu.services.pipeline.PipelineBuilder.createPipelineFrom;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.api.jsonrpc.RpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.parameters.BlockParameter;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.parameters.FilterParameter;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.processor.BlockTracer;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.processor.Tracer;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.processor.TransactionTrace;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.RpcErrorType;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.tracing.flat.FlatTrace;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.tracing.flat.RewardTraceGenerator;
import org.hyperledger.besu.ethereum.api.query.BlockchainQueries;
import org.hyperledger.besu.ethereum.api.util.ArrayNodeWrapper;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.debug.TraceOptions;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.mainnet.MainnetTransactionProcessor;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.vm.DebugOperationTracer;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.metrics.prometheus.PrometheusMetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import org.hyperledger.besu.services.pipeline.Pipeline;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import javax.annotation.Nonnull;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TraceFilter extends TraceBlock {

  private static final Logger LOG = LoggerFactory.getLogger(TraceFilter.class);
  private final Long maxRange;

  public TraceFilter(
      final Supplier<BlockTracer> blockTracerSupplier,
      final ProtocolSchedule protocolSchedule,
      final BlockchainQueries blockchainQueries,
      final Long maxRange) {
    super(protocolSchedule, blockchainQueries);
    this.maxRange = maxRange;
  }

  @Override
  public String getName() {
    return RpcMethod.TRACE_FILTER.getMethodName();
  }

  @Override
  public JsonRpcResponse response(final JsonRpcRequestContext requestContext) {
    final FilterParameter filterParameter =
        requestContext.getRequiredParameter(0, FilterParameter.class);

    final long fromBlock = resolveBlockNumber(filterParameter.getFromBlock());
    final long toBlock = resolveBlockNumber(filterParameter.getToBlock());
    LOG.trace("Received RPC rpcName={} fromBlock={} toBlock={}", getName(), fromBlock, toBlock);

    if (maxRange > 0 && toBlock - fromBlock > maxRange) {
      LOG.atDebug()
          .setMessage("trace_filter request {} failed:")
          .addArgument(requestContext.getRequest())
          .setCause(
              new IllegalArgumentException(RpcErrorType.EXCEEDS_RPC_MAX_BLOCK_RANGE.getMessage()))
          .log();
      return new JsonRpcErrorResponse(
          requestContext.getRequest().getId(), RpcErrorType.EXCEEDS_RPC_MAX_BLOCK_RANGE);
    }

    final ObjectMapper mapper = new ObjectMapper();
    final ArrayNodeWrapper resultArrayNode =
        new ArrayNodeWrapper(
            mapper.createArrayNode(), filterParameter.getAfter(), filterParameter.getCount());
    if (fromBlock > toBlock)
      return new JsonRpcSuccessResponse(
          requestContext.getRequest().getId(), resultArrayNode.getArrayNode());
    else
      return traceFilterWithPipeline(
          requestContext, filterParameter, fromBlock, toBlock, resultArrayNode);
  }

  private JsonRpcResponse traceFilterWithPipeline(
      final JsonRpcRequestContext requestContext,
      final FilterParameter filterParameter,
      final long fromBlock,
      final long toBlock,
      final ArrayNodeWrapper resultArrayNode) {

    long currentBlockNumber = fromBlock;
    Optional<Block> block =
        blockchainQueriesSupplier.get().getBlockchain().getBlockByNumber(currentBlockNumber);
    while ((block.isEmpty() || block.get().getHeader().getParentHash().equals(Bytes32.ZERO))
        && currentBlockNumber < toBlock) {
      currentBlockNumber++;
      block = blockchainQueriesSupplier.get().getBlockchain().getBlockByNumber(currentBlockNumber);
    }
    if (block.isEmpty()) {
      return new JsonRpcSuccessResponse(
          requestContext.getRequest().getId(), resultArrayNode.getArrayNode());
    }
    final BlockHeader header = block.get().getHeader();

    List<Block> blockList = getBlockList(currentBlockNumber, toBlock, block);

    ArrayNodeWrapper result =
        Tracer.processTracing(
                getBlockchainQueries(),
                Optional.of(header),
                traceableState -> {
                  TraceFilterSource traceFilterSource =
                      new TraceFilterSource(blockList, resultArrayNode);
                  final ProtocolSpec protocolSpec = protocolSchedule.getByBlockHeader(header);
                  final MainnetTransactionProcessor transactionProcessor =
                      protocolSpec.getTransactionProcessor();
                  final ChainUpdater chainUpdater = new ChainUpdater(traceableState);
                  final LabelledMetric<Counter> outputCounter =
                      new PrometheusMetricsSystem(
                              BesuMetricCategory.DEFAULT_METRIC_CATEGORIES, false)
                          .createLabelledCounter(
                              BesuMetricCategory.BLOCKCHAIN,
                              "transactions_tracefilter_pipeline_processed_total",
                              "Number of transactions processed for trace_filter",
                              "step",
                              "action");

                  DebugOperationTracer debugOperationTracer =
                      new DebugOperationTracer(new TraceOptions(false, false, true), false);
                  ExecuteTransactionStep executeTransactionStep =
                      new ExecuteTransactionStep(
                          chainUpdater,
                          transactionProcessor,
                          getBlockchainQueries().getBlockchain(),
                          debugOperationTracer,
                          protocolSpec);

                  Function<TransactionTrace, CompletableFuture<Stream<FlatTrace>>>
                      traceFlatTransactionStep =
                          new TraceFlatTransactionStep(
                              protocolSchedule, null, Optional.of(filterParameter));

                  BuildArrayNodeCompleterStep buildArrayNodeStep =
                      new BuildArrayNodeCompleterStep(resultArrayNode);
                  Pipeline<TransactionTrace> traceBlockPipeline =
                      createPipelineFrom(
                              "getTransactions",
                              traceFilterSource,
                              4,
                              outputCounter,
                              false,
                              "trace_block_transactions")
                          .thenProcess("executeTransaction", executeTransactionStep)
                          .thenProcessAsyncOrdered(
                              "traceFlatTransaction", traceFlatTransactionStep, 4)
                          .andFinishWith(
                              "buildArrayNode",
                              traceStream -> traceStream.forEachOrdered(buildArrayNodeStep));

                  try {
                    Optional<EthScheduler> ethSchedulerOpt =
                        getBlockchainQueries().getEthScheduler();

                    ethSchedulerOpt
                        .orElse(new EthScheduler(1, 1, 1, 1, new NoOpMetricsSystem()))
                        .startPipeline(traceBlockPipeline)
                        .get();

                  } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                  }
                  return Optional.of(resultArrayNode);
                })
            .orElse(emptyResult());

    return new JsonRpcSuccessResponse(requestContext.getRequest().getId(), result.getArrayNode());
  }

  @Nonnull
  private List<Block> getBlockList(
      final long fromBlock, final long toBlock, final Optional<Block> block) {
    List<Block> blockList = new ArrayList<>();
    Block currentBlock = block.get();
    blockList.add(currentBlock);
    long index = fromBlock + 1; // We already stored current Block
    while (index <= toBlock) {
      Optional<Block> blockByNumber =
          blockchainQueriesSupplier.get().getBlockchain().getBlockByNumber(index);
      blockByNumber.ifPresent(blockList::add);
      index++;
    }
    return blockList;
  }

  public Map<Transaction, Block> createTransactionBlockMap(final List<Block> blockList) {
    Map<Transaction, Block> transactionBlockMap = new HashMap<>();
    for (Block block : blockList) {
      List<Transaction> transactionList = block.getBody().getTransactions();
      for (Transaction transaction : transactionList) {
        transactionBlockMap.put(transaction, block);
      }
    }
    return transactionBlockMap;
  }

  @Override
  protected void generateRewardsFromBlock(
      final Optional<FilterParameter> maybeFilterParameter,
      final Block block,
      final ArrayNodeWrapper resultArrayNode) {
    maybeFilterParameter.ifPresent(
        filterParameter -> {
          final List<Address> fromAddress = filterParameter.getFromAddress();
          if (fromAddress.isEmpty()) {
            final List<Address> toAddress = filterParameter.getToAddress();
            RewardTraceGenerator.generateFromBlock(protocolSchedule, block)
                .map(FlatTrace.class::cast)
                .filter(trace -> trace.getBlockNumber() != 0)
                .filter(
                    trace ->
                        toAddress.isEmpty()
                            || Optional.ofNullable(trace.getAction().getAuthor())
                                .map(Address::fromHexString)
                                .map(toAddress::contains)
                                .orElse(false))
                .forEachOrdered(resultArrayNode::addPOJO);
          }
        });
  }

  private long resolveBlockNumber(final BlockParameter param) {
    if (param.getNumber().isPresent()) {
      return param.getNumber().get();
    } else if (param.isLatest()) {
      return blockchainQueriesSupplier.get().headBlockNumber();
    } else {
      throw new IllegalStateException("Unknown block parameter type.");
    }
  }
}