PendingBlocksManager.java

/*
 * Copyright ConsenSys AG.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.ethereum.eth.sync.state;

import static java.util.Collections.newSetFromMap;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.cache.ImmutablePendingBlock;
import org.hyperledger.besu.ethereum.eth.sync.state.cache.PendingBlockCache;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.apache.tuweni.bytes.Bytes;

public class PendingBlocksManager {

  private final PendingBlockCache pendingBlocks;

  private final Map<Hash, Set<Hash>> pendingBlocksByParentHash = new ConcurrentHashMap<>();

  public PendingBlocksManager(final SynchronizerConfiguration synchronizerConfiguration) {

    pendingBlocks =
        new PendingBlockCache(
            (Math.abs(synchronizerConfiguration.getBlockPropagationRange().lowerEndpoint())
                + Math.abs(synchronizerConfiguration.getBlockPropagationRange().upperEndpoint())));
  }

  /**
   * Track the given block.
   *
   * @param block the block to track
   * @param nodeId node that sent the block
   * @return true if the block was added (was not previously present)
   */
  public boolean registerPendingBlock(final Block block, final Bytes nodeId) {

    final ImmutablePendingBlock previousValue =
        this.pendingBlocks.putIfAbsent(
            block.getHash(), ImmutablePendingBlock.builder().block(block).nodeId(nodeId).build());
    if (previousValue != null) {
      return false;
    }

    pendingBlocksByParentHash
        .computeIfAbsent(
            block.getHeader().getParentHash(),
            h -> {
              final Set<Hash> set = newSetFromMap(new ConcurrentHashMap<>());
              // Go ahead and add our value at construction, so that we don't set an empty set which
              // could be removed in deregisterPendingBlock
              set.add(block.getHash());
              return set;
            })
        .add(block.getHash());

    return true;
  }

  /**
   * Stop tracking the given block.
   *
   * @param block the block that is no longer pending
   * @return true if this block was removed
   */
  public boolean deregisterPendingBlock(final Block block) {
    final Hash parentHash = block.getHeader().getParentHash();
    final ImmutablePendingBlock removed = pendingBlocks.remove(block.getHash());
    final Set<Hash> blocksForParent = pendingBlocksByParentHash.get(parentHash);
    if (blocksForParent != null) {
      blocksForParent.remove(block.getHash());
      pendingBlocksByParentHash.remove(parentHash, Collections.emptySet());
    }
    return removed != null;
  }

  public void purgeBlocksOlderThan(final long blockNumber) {
    pendingBlocks.values().stream()
        .filter(b -> b.block().getHeader().getNumber() < blockNumber)
        .map(ImmutablePendingBlock::block)
        .forEach(this::deregisterPendingBlock);
  }

  public boolean contains(final Hash blockHash) {
    return pendingBlocks.containsKey(blockHash);
  }

  public int size() {
    return pendingBlocks.size();
  }

  public List<Block> childrenOf(final Hash parentBlock) {
    final Set<Hash> blocksByParent = pendingBlocksByParentHash.get(parentBlock);
    if (blocksByParent == null || blocksByParent.size() == 0) {
      return Collections.emptyList();
    }
    return blocksByParent.stream()
        .map(pendingBlocks::get)
        .filter(Objects::nonNull)
        .map(ImmutablePendingBlock::block)
        .collect(Collectors.toList());
  }

  public Optional<BlockHeader> lowestAnnouncedBlock() {
    return pendingBlocks.values().stream()
        .map(ImmutablePendingBlock::block)
        .map(Block::getHeader)
        .min(Comparator.comparing(BlockHeader::getNumber));
  }

  /**
   * Get the lowest pending ancestor block saved for a block
   *
   * @param block target block
   * @return An optional with the lowest ancestor pending block
   */
  public Optional<Block> pendingAncestorBlockOf(final Block block) {
    Block ancestor = block;
    int ancestorLevel = 0;
    while (pendingBlocks.containsKey(ancestor.getHeader().getParentHash())
        && ancestorLevel++ < pendingBlocks.size()) {
      ancestor = pendingBlocks.get(ancestor.getHeader().getParentHash()).block();
    }
    return Optional.of(ancestor);
  }

  @Override
  public String toString() {
    return "PendingBlocksManager{"
        + "pendingBlocks ["
        + pendingBlocks.values().stream()
            .map(ImmutablePendingBlock::block)
            .map(b -> b.getHeader().getNumber() + " (" + b.getHash() + ")")
            .collect(Collectors.joining(", "))
        + "], pendingBlocksByParentHash="
        + pendingBlocksByParentHash
        + '}';
  }
}