AbstractPeerRequestTask.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.task;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.PeerRequest;
import org.hyperledger.besu.ethereum.eth.manager.PendingPeerRequest;
import org.hyperledger.besu.ethereum.eth.manager.RequestManager;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.PeerBreachedProtocolException;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.ethereum.rlp.RLPException;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.ExceptionUtils;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractPeerRequestTask<R> extends AbstractPeerTask<R> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractPeerRequestTask.class);
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(5);
private Duration timeout = DEFAULT_TIMEOUT;
private final int requestCode;
private volatile PendingPeerRequest responseStream;
protected AbstractPeerRequestTask(
final EthContext ethContext, final int requestCode, final MetricsSystem metricsSystem) {
super(ethContext, metricsSystem);
this.requestCode = requestCode;
}
public AbstractPeerRequestTask<R> setTimeout(final Duration timeout) {
this.timeout = timeout;
return this;
}
@Override
protected final void executeTask() {
final CompletableFuture<R> promise = new CompletableFuture<>();
responseStream = sendRequest();
responseStream.then(
stream -> {
// Start the timeout now that the request has actually been sent
ethContext.getScheduler().failAfterTimeout(promise, timeout);
stream.then(
(streamClosed, message, peer1) ->
handleMessage(promise, streamClosed, message, peer1));
},
promise::completeExceptionally);
promise.whenComplete(
(r, t) -> {
final Optional<RequestManager.ResponseStream> responseStream =
this.responseStream.abort();
if (t != null) {
t = ExceptionUtils.rootCause(t);
if (t instanceof TimeoutException && responseStream.isPresent()) {
responseStream.get().getPeer().recordRequestTimeout(requestCode);
}
result.completeExceptionally(t);
} else if (r != null) {
// If we got a response we must have had a response stream...
result.complete(new PeerTaskResult<>(responseStream.get().getPeer(), r));
}
});
}
public PendingPeerRequest sendRequestToPeer(
final PeerRequest request, final long minimumBlockNumber) {
return ethContext.getEthPeers().executePeerRequest(request, minimumBlockNumber, assignedPeer);
}
private void handleMessage(
final CompletableFuture<R> promise,
final boolean streamClosed,
final MessageData message,
final EthPeer peer) {
if (promise.isDone()) {
// We've already got our response, don't pass on the stream closed event.
return;
}
try {
final Optional<R> result = processResponse(streamClosed, message, peer);
result.ifPresent(
r -> {
promise.complete(r);
peer.recordUsefulResponse();
});
} catch (final RLPException e) {
// Peer sent us malformed data - disconnect
LOG.debug(
"Disconnecting with BREACH_OF_PROTOCOL due to malformed message: {}",
peer.getLoggableId(),
e);
LOG.trace("Peer {} Malformed message data: {}", peer, message.getData());
peer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL_MALFORMED_MESSAGE_RECEIVED);
promise.completeExceptionally(new PeerBreachedProtocolException());
}
}
@Override
protected void cleanup() {
super.cleanup();
responseStream.abort().ifPresent(RequestManager.ResponseStream::close);
}
protected abstract PendingPeerRequest sendRequest();
protected abstract Optional<R> processResponse(
boolean streamClosed, MessageData message, EthPeer peer);
}