PendingPeerRequest.java

/*
 * Copyright ConsenSys AG.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.ethereum.eth.manager;

import org.hyperledger.besu.ethereum.eth.manager.RequestManager.ResponseStream;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.PeerDisconnectedException;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;

import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;

public class PendingPeerRequest {
  private final EthPeers ethPeers;
  private final PeerRequest request;
  private final CompletableFuture<ResponseStream> result = new CompletableFuture<>();
  private final long minimumBlockNumber;
  private final Optional<EthPeer> peer;

  PendingPeerRequest(
      final EthPeers ethPeers,
      final PeerRequest request,
      final long minimumBlockNumber,
      final Optional<EthPeer> peer) {
    this.ethPeers = ethPeers;
    this.request = request;
    this.minimumBlockNumber = minimumBlockNumber;
    this.peer = peer;
  }

  /**
   * Attempts to find an available peer and execute the peer request.
   *
   * @return true if the request should be removed from the pending list, otherwise false.
   */
  public boolean attemptExecution() {
    if (result.isDone()) {
      return true;
    }
    final Optional<EthPeer> maybePeer = getPeerToUse();
    if (maybePeer.isEmpty()) {
      // No peers have the required height.
      result.completeExceptionally(new NoAvailablePeersException());
      return true;
    } else {
      // At least one peer has the required height, but we are not able to use it if it's busy
      final Optional<EthPeer> maybePeerWithCapacity =
          maybePeer.filter(EthPeer::hasAvailableRequestCapacity);

      maybePeerWithCapacity.ifPresent(this::sendRequest);
      return maybePeerWithCapacity.isPresent();
    }
  }

  private synchronized void sendRequest(final EthPeer peer) {
    // Recheck if we should send the request now we're inside the synchronized block
    if (!result.isDone()) {
      try {
        final ResponseStream responseStream = request.sendRequest(peer);
        result.complete(responseStream);
      } catch (final PeerNotConnected e) {
        result.completeExceptionally(new PeerDisconnectedException(peer));
      }
    }
  }

  private Optional<EthPeer> getPeerToUse() {
    // return the assigned peer if still valid, otherwise switch to another peer
    return peer.filter(p -> !p.isDisconnected()).isPresent()
        ? peer
        : ethPeers
            .streamAvailablePeers()
            .filter(peer -> peer.chainState().getEstimatedHeight() >= minimumBlockNumber)
            .min(EthPeers.LEAST_TO_MOST_BUSY);
  }

  /**
   * Register callbacks for when the request is made or
   *
   * @param onSuccess handler for when a peer becomes available and the request is sent
   * @param onError handler for when there is no peer with sufficient height or the request fails to
   *     send
   */
  public void then(final Consumer<ResponseStream> onSuccess, final Consumer<Throwable> onError) {
    result.whenComplete(
        (result, error) -> {
          if (error != null) {
            onError.accept(error);
          } else {
            onSuccess.accept(result);
          }
        });
  }

  /**
   * Abort this request.
   *
   * @return the response stream if the request has already been sent, otherwise empty.
   */
  public synchronized Optional<ResponseStream> abort() {
    try {
      result.cancel(false);
      return Optional.ofNullable(result.getNow(null));
    } catch (final CancellationException | CompletionException e) {
      return Optional.empty();
    }
  }

  public Optional<EthPeer> getAssignedPeer() {
    return peer;
  }
}