AbstractRetryingPeerTask.java

/*
 * Copyright ConsenSys AG.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.ethereum.eth.manager.task;

import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.MaxRetriesReachedException;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.PeerBreachedProtocolException;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.PeerDisconnectedException;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.ExceptionUtils;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A task that will retry a fixed number of times before completing the associated CompletableFuture
 * exceptionally with a new {@link MaxRetriesReachedException}. If the future returned from {@link
 * #executePeerTask(Optional)} is complete with a non-empty list the retry counter is reset.
 *
 * @param <T> The type as a typed list that the peer task can get partial or full results in.
 */
public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> {

  private static final Logger LOG = LoggerFactory.getLogger(AbstractRetryingPeerTask.class);
  private final EthContext ethContext;
  private final int maxRetries;
  private final Predicate<T> isEmptyResponse;
  private final MetricsSystem metricsSystem;
  private int retryCount = 0;
  private Optional<EthPeer> assignedPeer = Optional.empty();

  /**
   * @param ethContext The context of the current Eth network we are attached to.
   * @param maxRetries Maximum number of retries to accept before completing exceptionally.
   * @param isEmptyResponse Test if the response received was empty.
   * @param metricsSystem The metrics system used to measure task.
   */
  protected AbstractRetryingPeerTask(
      final EthContext ethContext,
      final int maxRetries,
      final Predicate<T> isEmptyResponse,
      final MetricsSystem metricsSystem) {
    super(metricsSystem);
    this.ethContext = ethContext;
    this.maxRetries = maxRetries;
    this.isEmptyResponse = isEmptyResponse;
    this.metricsSystem = metricsSystem;
  }

  public void assignPeer(final EthPeer peer) {
    assignedPeer = Optional.of(peer);
  }

  public Optional<EthPeer> getAssignedPeer() {
    return assignedPeer;
  }

  @Override
  protected void executeTask() {
    if (result.isDone()) {
      // Return if task is done
      return;
    }
    if (retryCount >= maxRetries) {
      result.completeExceptionally(new MaxRetriesReachedException());
      return;
    }

    retryCount += 1;
    executePeerTask(assignedPeer)
        .whenComplete(
            (peerResult, error) -> {
              if (error != null) {
                handleTaskError(error);
              } else {
                // If we get a partial success, reset the retry counter.
                if (!isEmptyResponse.test(peerResult)) {
                  retryCount = 0;
                }
                executeTaskTimed();
              }
            });
  }

  protected abstract CompletableFuture<T> executePeerTask(Optional<EthPeer> assignedPeer);

  protected void handleTaskError(final Throwable error) {
    final Throwable cause = ExceptionUtils.rootCause(error);
    if (!isRetryableError(cause)) {
      // Complete exceptionally
      result.completeExceptionally(cause);
      return;
    }

    if (cause instanceof NoAvailablePeersException) {
      LOG.debug(
          "No useful peer found, wait max 5 seconds for new peer to connect: current peers {}",
          ethContext.getEthPeers().peerCount());

      final WaitForPeerTask waitTask = WaitForPeerTask.create(ethContext, metricsSystem);
      executeSubTask(
          () ->
              ethContext
                  .getScheduler()
                  .timeout(waitTask, Duration.ofSeconds(5))
                  .whenComplete((r, t) -> executeTaskTimed()));
      return;
    }

    LOG.debug(
        "Retrying after recoverable failure from peer task {}: {}",
        this.getClass().getSimpleName(),
        cause.getMessage());
    // Wait before retrying on failure
    executeSubTask(
        () ->
            ethContext
                .getScheduler()
                .scheduleFutureTask(this::executeTaskTimed, Duration.ofSeconds(1)));
  }

  protected boolean isRetryableError(final Throwable error) {
    return error instanceof TimeoutException || (!assignedPeer.isPresent() && isPeerFailure(error));
  }

  protected boolean isPeerFailure(final Throwable error) {
    return error instanceof PeerBreachedProtocolException
        || error instanceof PeerDisconnectedException
        || error instanceof NoAvailablePeersException;
  }

  protected EthContext getEthContext() {
    return ethContext;
  }

  protected MetricsSystem getMetricsSystem() {
    return metricsSystem;
  }

  public int getRetryCount() {
    return retryCount;
  }

  public int getMaxRetries() {
    return maxRetries;
  }
}