DeFramer.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.p2p.rlpx.connections.netty;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.network.exceptions.BreachOfProtocolException;
import org.hyperledger.besu.ethereum.p2p.network.exceptions.IncompatiblePeerException;
import org.hyperledger.besu.ethereum.p2p.network.exceptions.PeerChannelClosedException;
import org.hyperledger.besu.ethereum.p2p.network.exceptions.PeerDisconnectedException;
import org.hyperledger.besu.ethereum.p2p.network.exceptions.UnexpectedPeerConnectionException;
import org.hyperledger.besu.ethereum.p2p.peers.DefaultPeer;
import org.hyperledger.besu.ethereum.p2p.peers.EnodeURLImpl;
import org.hyperledger.besu.ethereum.p2p.peers.LocalNode;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnectionEventDispatcher;
import org.hyperledger.besu.ethereum.p2p.rlpx.framing.Framer;
import org.hyperledger.besu.ethereum.p2p.rlpx.framing.FramingException;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.CapabilityMultiplexer;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.PeerInfo;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.HelloMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.WireMessageCodes;
import org.hyperledger.besu.ethereum.rlp.RLPException;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
final class DeFramer extends ByteToMessageDecoder {
private static final Logger LOG = LoggerFactory.getLogger(DeFramer.class);
private final CompletableFuture<PeerConnection> connectFuture;
private final PeerConnectionEventDispatcher connectionEventDispatcher;
private final Framer framer;
private final LocalNode localNode;
// The peer we are expecting to connect to, if such a peer is known
private final Optional<Peer> expectedPeer;
private final List<SubProtocol> subProtocols;
private final boolean inboundInitiated;
private final PeerTable peerTable;
private boolean hellosExchanged;
private final LabelledMetric<Counter> outboundMessagesCounter;
DeFramer(
final Framer framer,
final List<SubProtocol> subProtocols,
final LocalNode localNode,
final Optional<Peer> expectedPeer,
final PeerConnectionEventDispatcher connectionEventDispatcher,
final CompletableFuture<PeerConnection> connectFuture,
final MetricsSystem metricsSystem,
final boolean inboundInitiated,
final PeerTable peerTable) {
this.framer = framer;
this.subProtocols = subProtocols;
this.localNode = localNode;
this.expectedPeer = expectedPeer;
this.connectFuture = connectFuture;
this.connectionEventDispatcher = connectionEventDispatcher;
this.inboundInitiated = inboundInitiated;
this.peerTable = peerTable;
this.outboundMessagesCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.NETWORK,
"p2p_messages_outbound",
"Count of each P2P message sent outbound.",
"protocol",
"name",
"code");
}
@Override
protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) {
MessageData message;
while ((message = framer.deframe(in)) != null) {
if (hellosExchanged) {
out.add(message);
} else if (message.getCode() == WireMessageCodes.HELLO) {
hellosExchanged = true;
// Decode first hello and use the payload to modify pipeline
final PeerInfo peerInfo;
try {
peerInfo = HelloMessage.readFrom(message).getPeerInfo();
} catch (final RLPException e) {
LOG.debug("Received invalid HELLO message, set log level to TRACE for message body", e);
connectFuture.completeExceptionally(e);
ctx.close();
return;
}
LOG.trace("Received HELLO message: {}", peerInfo);
if (peerInfo.getVersion() >= 5) {
LOG.trace("Enable compression for p2pVersion: {}", peerInfo.getVersion());
framer.enableCompression();
}
final CapabilityMultiplexer capabilityMultiplexer =
new CapabilityMultiplexer(
subProtocols,
localNode.getPeerInfo().getCapabilities(),
peerInfo.getCapabilities());
Optional<Peer> peer;
if (expectedPeer.isPresent()) {
peer = expectedPeer;
} else {
// This is an inbound "Hello" message. Create peer from information from the Hello message
peer = createPeer(peerInfo, ctx);
if (peer.isEmpty()) {
LOG.debug("Failed to create connection for peer {}", peerInfo);
connectFuture.completeExceptionally(new PeerChannelClosedException(peerInfo));
ctx.close();
return;
}
// If we can find the DiscoveryPeer for the peer in the PeerTable we use it, because
// it could contains additional information, like the fork id.
final Optional<DiscoveryPeer> discoveryPeer = peerTable.get(peer.get());
if (discoveryPeer.isPresent()) {
peer = Optional.of(discoveryPeer.get());
}
}
final PeerConnection connection =
new NettyPeerConnection(
ctx,
peer.get(),
peerInfo,
capabilityMultiplexer,
connectionEventDispatcher,
outboundMessagesCounter,
inboundInitiated);
// Check peer is who we expected
if (expectedPeer.isPresent()
&& !Objects.equals(expectedPeer.get().getId(), peerInfo.getNodeId())) {
final String unexpectedMsg =
String.format(
"Expected id %s, but got %s", expectedPeer.get().getId(), peerInfo.getNodeId());
connectFuture.completeExceptionally(new UnexpectedPeerConnectionException(unexpectedMsg));
LOG.debug("{}. Disconnecting.", unexpectedMsg);
connection.disconnect(DisconnectMessage.DisconnectReason.UNEXPECTED_ID);
}
// Check that we have shared caps
if (capabilityMultiplexer.getAgreedCapabilities().size() == 0) {
LOG.debug("Disconnecting because no capabilities are shared: {}", peerInfo);
connectFuture.completeExceptionally(
new IncompatiblePeerException("No shared capabilities"));
connection.disconnect(
DisconnectMessage.DisconnectReason.USELESS_PEER_NO_SHARED_CAPABILITIES);
}
// Setup next stage
final AtomicBoolean waitingForPong = new AtomicBoolean(false);
ctx.channel()
.pipeline()
.addLast(
new IdleStateHandler(15, 0, 0),
new WireKeepAlive(connection, waitingForPong),
new ApiHandler(
capabilityMultiplexer, connection, connectionEventDispatcher, waitingForPong),
new MessageFramer(capabilityMultiplexer, framer));
connectFuture.complete(connection);
} else if (message.getCode() == WireMessageCodes.DISCONNECT) {
final DisconnectMessage disconnectMessage = DisconnectMessage.readFrom(message);
LOG.debug(
"Peer {} disconnected before sending HELLO. Reason: {}",
expectedPeer.map(Peer::getEnodeURLString).orElse("unknown"),
disconnectMessage.getReason());
ctx.close();
connectFuture.completeExceptionally(
new PeerDisconnectedException(disconnectMessage.getReason()));
} else {
// Unexpected message - disconnect
LOG.debug(
"Message received before HELLO's exchanged (BREACH_OF_PROTOCOL), disconnecting. Peer: {}, Code: {}, Data: {}",
expectedPeer.map(Peer::getEnodeURLString).orElse("unknown"),
message.getCode(),
message.getData().toString());
ctx.writeAndFlush(
new OutboundMessage(
null,
DisconnectMessage.create(
DisconnectMessage.DisconnectReason
.BREACH_OF_PROTOCOL_MESSAGE_RECEIVED_BEFORE_HELLO_EXCHANGE)))
.addListener((f) -> ctx.close());
connectFuture.completeExceptionally(
new BreachOfProtocolException("Message received before HELLO's exchanged"));
}
}
}
private Optional<Peer> createPeer(final PeerInfo peerInfo, final ChannelHandlerContext ctx) {
final InetSocketAddress remoteAddress = ((InetSocketAddress) ctx.channel().remoteAddress());
if (remoteAddress == null) {
return Optional.empty();
}
final int port = peerInfo.getPort();
return Optional.of(
DefaultPeer.fromEnodeURL(
EnodeURLImpl.builder()
.nodeId(peerInfo.getNodeId())
.ipAddress(remoteAddress.getAddress())
.listeningPort(port)
// Discovery information is unknown, so disable it
.disableDiscovery()
.build()));
}
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable throwable)
throws Exception {
final Throwable cause =
throwable instanceof DecoderException && throwable.getCause() != null
? throwable.getCause()
: throwable;
if (cause instanceof FramingException
|| cause instanceof RLPException
|| cause instanceof IllegalArgumentException) {
LOG.debug("Invalid incoming message (BREACH_OF_PROTOCOL)", throwable);
if (connectFuture.isDone() && !connectFuture.isCompletedExceptionally()) {
connectFuture
.get()
.disconnect(
DisconnectMessage.DisconnectReason
.BREACH_OF_PROTOCOL_INVALID_MESSAGE_RECEIVED_CAUGHT_EXCEPTION);
return;
}
} else if (cause instanceof IOException) {
// IO failures are routine when communicating with random peers across the network.
LOG.debug("IO error while processing incoming message", throwable);
} else {
LOG.error("Exception while processing incoming message", throwable);
}
if (connectFuture.isDone() && !connectFuture.isCompletedExceptionally()) {
connectFuture
.get()
.terminateConnection(DisconnectMessage.DisconnectReason.TCP_SUBSYSTEM_ERROR, false);
} else {
connectFuture.completeExceptionally(throwable);
ctx.close();
}
}
}