BesuEventsImpl.java

/*
 * Copyright ConsenSys AG.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.services;

import static java.util.stream.Collectors.toUnmodifiableList;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.api.query.LogsQuery;
import org.hyperledger.besu.ethereum.chain.BadBlockManager;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.LogWithMetadata;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.evm.log.LogTopic;
import org.hyperledger.besu.plugin.data.AddedBlockContext;
import org.hyperledger.besu.plugin.data.BlockHeader;
import org.hyperledger.besu.plugin.data.PropagatedBlockContext;
import org.hyperledger.besu.plugin.services.BesuEvents;

import java.util.List;
import java.util.function.Supplier;

import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.units.bigints.UInt256;

/** A concrete implementation of BesuEvents used in Besu plugin framework. */
public class BesuEventsImpl implements BesuEvents {
  private final Blockchain blockchain;
  private final BlockBroadcaster blockBroadcaster;
  private final TransactionPool transactionPool;
  private final SyncState syncState;
  private final BadBlockManager badBlockManager;

  /**
   * Constructor for BesuEventsImpl
   *
   * @param blockchain An instance of Blockchain
   * @param blockBroadcaster An instance of BlockBroadcaster
   * @param transactionPool An instance of TransactionPool
   * @param syncState An instance of SyncState
   * @param badBlockManager A cache of bad blocks encountered on the network
   */
  public BesuEventsImpl(
      final Blockchain blockchain,
      final BlockBroadcaster blockBroadcaster,
      final TransactionPool transactionPool,
      final SyncState syncState,
      final BadBlockManager badBlockManager) {
    this.blockchain = blockchain;
    this.blockBroadcaster = blockBroadcaster;
    this.transactionPool = transactionPool;
    this.syncState = syncState;
    this.badBlockManager = badBlockManager;
  }

  @Override
  public long addBlockPropagatedListener(final BlockPropagatedListener listener) {
    return blockBroadcaster.subscribePropagateNewBlocks(
        (block, totalDifficulty) ->
            listener.onBlockPropagated(
                blockPropagatedContext(block::getHeader, block::getBody, () -> totalDifficulty)));
  }

  @Override
  public void removeBlockPropagatedListener(final long listenerIdentifier) {
    blockBroadcaster.unsubscribePropagateNewBlocks(listenerIdentifier);
  }

  @Override
  public long addBlockAddedListener(final BlockAddedListener listener) {
    return blockchain.observeBlockAdded(
        event ->
            listener.onBlockAdded(
                blockAddedContext(
                    event.getBlock()::getHeader,
                    event.getBlock()::getBody,
                    event::getTransactionReceipts)));
  }

  @Override
  public void removeBlockAddedListener(final long listenerIdentifier) {
    blockchain.removeObserver(listenerIdentifier);
  }

  @Override
  public long addBlockReorgListener(final BlockReorgListener listener) {
    return blockchain.observeChainReorg(
        (blockWithReceipts, chain) ->
            listener.onBlockReorg(
                blockAddedContext(
                    blockWithReceipts::getHeader,
                    blockWithReceipts.getBlock()::getBody,
                    blockWithReceipts::getReceipts)));
  }

  @Override
  public void removeBlockReorgListener(final long listenerIdentifier) {
    blockchain.removeObserver(listenerIdentifier);
  }

  @Override
  public long addTransactionAddedListener(final TransactionAddedListener listener) {
    return transactionPool.subscribePendingTransactions(listener::onTransactionAdded);
  }

  @Override
  public void removeTransactionAddedListener(final long listenerIdentifier) {
    transactionPool.unsubscribePendingTransactions(listenerIdentifier);
  }

  @Override
  public long addTransactionDroppedListener(
      final TransactionDroppedListener transactionDroppedListener) {
    return transactionPool.subscribeDroppedTransactions(
        transactionDroppedListener::onTransactionDropped);
  }

  @Override
  public void removeTransactionDroppedListener(final long listenerIdentifier) {
    transactionPool.unsubscribeDroppedTransactions(listenerIdentifier);
  }

  @Override
  public long addSyncStatusListener(final SyncStatusListener syncStatusListener) {
    return syncState.subscribeSyncStatus(syncStatusListener);
  }

  @Override
  public void removeSyncStatusListener(final long listenerIdentifier) {
    syncState.unsubscribeSyncStatus(listenerIdentifier);
  }

  @Override
  public long addLogListener(
      final List<Address> addresses,
      final List<List<Bytes32>> topics,
      final LogListener logListener) {
    final List<List<LogTopic>> besuTopics =
        topics.stream()
            .map(subList -> subList.stream().map(LogTopic::wrap).collect(toUnmodifiableList()))
            .collect(toUnmodifiableList());

    final LogsQuery logsQuery = new LogsQuery(addresses, besuTopics);

    return blockchain.observeLogs(
        logWithMetadata -> {
          if (logsQuery.matches(LogWithMetadata.fromPlugin(logWithMetadata))) {
            logListener.onLogEmitted(logWithMetadata);
          }
        });
  }

  @Override
  public void removeLogListener(final long listenerIdentifier) {
    blockchain.removeObserver(listenerIdentifier);
  }

  @Override
  public long addBadBlockListener(final BadBlockListener listener) {
    return badBlockManager.subscribeToBadBlocks(listener);
  }

  @Override
  public void removeBadBlockListener(final long listenerIdentifier) {
    badBlockManager.unsubscribeFromBadBlocks(listenerIdentifier);
  }

  private static PropagatedBlockContext blockPropagatedContext(
      final Supplier<BlockHeader> blockHeaderSupplier,
      final Supplier<BlockBody> blockBodySupplier,
      final Supplier<Difficulty> totalDifficultySupplier) {
    return new PropagatedBlockContext() {
      @Override
      public BlockHeader getBlockHeader() {
        return blockHeaderSupplier.get();
      }

      @Override
      public BlockBody getBlockBody() {
        return blockBodySupplier.get();
      }

      @Override
      public UInt256 getTotalDifficulty() {
        return totalDifficultySupplier.get().toUInt256();
      }
    };
  }

  private static AddedBlockContext blockAddedContext(
      final Supplier<BlockHeader> blockHeaderSupplier,
      final Supplier<BlockBody> blockBodySupplier,
      final Supplier<List<TransactionReceipt>> transactionReceiptsSupplier) {
    return new AddedBlockContext() {
      @Override
      public BlockHeader getBlockHeader() {
        return blockHeaderSupplier.get();
      }

      @Override
      public BlockBody getBlockBody() {
        return blockBodySupplier.get();
      }

      @Override
      public List<TransactionReceipt> getTransactionReceipts() {
        return transactionReceiptsSupplier.get();
      }
    };
  }
}