RequestManager.java

/*
 * Copyright ConsenSys AG.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
package org.hyperledger.besu.ethereum.eth.manager;

import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
import org.hyperledger.besu.ethereum.rlp.RLPException;

import java.math.BigInteger;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RequestManager {

  private static final Logger LOG = LoggerFactory.getLogger(RequestManager.class);
  private final AtomicLong requestIdCounter =
      new AtomicLong(1); // some clients have issues encoding zero
  private final Map<BigInteger, ResponseStream> responseStreams = new ConcurrentHashMap<>();
  private final EthPeer peer;
  private final boolean supportsRequestId;
  private final String protocolName;

  private final AtomicInteger outstandingRequests = new AtomicInteger(0);

  public RequestManager(
      final EthPeer peer, final boolean supportsRequestId, final String protocolName) {
    this.peer = peer;
    this.supportsRequestId = supportsRequestId;
    this.protocolName = protocolName;
  }

  public int outstandingRequests() {
    return outstandingRequests.get();
  }

  public String getProtocolName() {
    return protocolName;
  }

  public ResponseStream dispatchRequest(final RequestSender sender, final MessageData messageData)
      throws PeerNotConnected {
    outstandingRequests.incrementAndGet();
    final BigInteger requestId = BigInteger.valueOf(requestIdCounter.getAndIncrement());
    final ResponseStream stream = createStream(requestId);
    sender.send(supportsRequestId ? messageData.wrapMessageData(requestId) : messageData);
    return stream;
  }

  public void dispatchResponse(final EthMessage ethMessage) {
    final Collection<ResponseStream> streams = List.copyOf(responseStreams.values());
    final int count = outstandingRequests.decrementAndGet();
    try {
      if (supportsRequestId) {
        // If there's a requestId, find the specific stream it belongs to
        final Map.Entry<BigInteger, MessageData> requestIdAndEthMessage =
            ethMessage.getData().unwrapMessageData();
        Optional.ofNullable(responseStreams.get(requestIdAndEthMessage.getKey()))
            .ifPresentOrElse(
                responseStream -> responseStream.processMessage(requestIdAndEthMessage.getValue()),
                // Consider incorrect requestIds to be a useless response; too
                // many of these and we will disconnect.
                () -> peer.recordUselessResponse("Request ID incorrect"));

      } else {
        // otherwise iterate through all of them
        streams.forEach(stream -> stream.processMessage(ethMessage.getData()));
      }
    } catch (final RLPException e) {
      LOG.debug(
          "Received malformed message {} (BREACH_OF_PROTOCOL), disconnecting: {}",
          ethMessage.getData(),
          peer,
          e);

      peer.disconnect(
          DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL_MALFORMED_MESSAGE_RECEIVED);
    }

    if (count == 0) {
      // No possibility of any remaining outstanding messages
      closeOutstandingStreams(streams);
    }
  }

  public void close() {
    closeOutstandingStreams(responseStreams.values());
  }

  private ResponseStream createStream(final BigInteger requestId) {
    final ResponseStream stream = new ResponseStream(peer, () -> deregisterStream(requestId));
    responseStreams.put(requestId, stream);
    return stream;
  }

  /** Close all current streams. This will be called when the peer disconnects. */
  private void closeOutstandingStreams(final Collection<ResponseStream> outstandingStreams) {
    outstandingStreams.forEach(ResponseStream::close);
  }

  private void deregisterStream(final BigInteger id) {
    responseStreams.remove(id);
  }

  @FunctionalInterface
  public interface RequestSender {
    void send(final MessageData messageData) throws PeerNotConnected;
  }

  @FunctionalInterface
  public interface ResponseCallback {

    /**
     * Process a potential message response
     *
     * @param streamClosed True if the ResponseStream is being shut down and will no longer deliver
     *     messages.
     * @param message the message to be processed
     * @param peer the peer that owns this response stream
     */
    void exec(boolean streamClosed, MessageData message, EthPeer peer);
  }

  @FunctionalInterface
  public interface DeregistrationProcessor {
    void exec();
  }

  private static class Response {
    final boolean closed;
    final MessageData message;

    private Response(final boolean closed, final MessageData message) {
      this.closed = closed;
      this.message = message;
    }
  }

  public static class ResponseStream {
    private final EthPeer peer;
    private final DeregistrationProcessor deregisterCallback;
    private final Queue<Response> bufferedResponses = new ConcurrentLinkedQueue<>();
    private volatile boolean closed = false;
    private volatile ResponseCallback responseCallback = null;

    public ResponseStream(final EthPeer peer, final DeregistrationProcessor deregisterCallback) {
      this.peer = peer;
      this.deregisterCallback = deregisterCallback;
    }

    public ResponseStream then(final ResponseCallback callback) {
      if (responseCallback != null) {
        // For now just manage a single callback for simplicity.  We could expand this to support
        // multiple listeners in the future.
        throw new IllegalStateException("Response streams expect only a single callback");
      }
      responseCallback = callback;
      dispatchBufferedResponses();
      return this;
    }

    public void close() {
      if (closed) {
        return;
      }
      closed = true;
      deregisterCallback.exec();
      bufferedResponses.add(new Response(true, null));
      dispatchBufferedResponses();
    }

    public EthPeer getPeer() {
      return peer;
    }

    private void processMessage(final MessageData message) {
      if (closed) {
        return;
      }
      bufferedResponses.add(new Response(false, message));
      dispatchBufferedResponses();
    }

    private void dispatchBufferedResponses() {
      if (responseCallback == null) {
        return;
      }
      Response response = bufferedResponses.poll();
      while (response != null) {
        responseCallback.exec(response.closed, response.message, peer);
        response = bufferedResponses.poll();
      }
    }
  }
}