EthServer.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.core.encoding.EncodingContext;
import org.hyperledger.besu.ethereum.core.encoding.TransactionEncoder;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage;
import org.hyperledger.besu.ethereum.eth.messages.BlockHeadersMessage;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.EthPV63;
import org.hyperledger.besu.ethereum.eth.messages.EthPV65;
import org.hyperledger.besu.ethereum.eth.messages.GetBlockBodiesMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetBlockHeadersMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetNodeDataMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetPooledTransactionsMessage;
import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.messages.NodeDataMessage;
import org.hyperledger.besu.ethereum.eth.messages.PooledTransactionsMessage;
import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes;
class EthServer {
private final Blockchain blockchain;
private final WorldStateArchive worldStateArchive;
private final TransactionPool transactionPool;
private final EthMessages ethMessages;
private final EthProtocolConfiguration ethereumWireProtocolConfiguration;
EthServer(
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final TransactionPool transactionPool,
final EthMessages ethMessages,
final EthProtocolConfiguration ethereumWireProtocolConfiguration) {
this.blockchain = blockchain;
this.worldStateArchive = worldStateArchive;
this.transactionPool = transactionPool;
this.ethMessages = ethMessages;
this.ethereumWireProtocolConfiguration = ethereumWireProtocolConfiguration;
this.registerResponseConstructors();
}
private void registerResponseConstructors() {
final int maxMessageSize = ethereumWireProtocolConfiguration.getMaxMessageSize();
ethMessages.registerResponseConstructor(
EthPV62.GET_BLOCK_HEADERS,
messageData ->
constructGetHeadersResponse(
blockchain,
messageData,
ethereumWireProtocolConfiguration.getMaxGetBlockHeaders(),
maxMessageSize));
ethMessages.registerResponseConstructor(
EthPV62.GET_BLOCK_BODIES,
messageData ->
constructGetBodiesResponse(
blockchain,
messageData,
ethereumWireProtocolConfiguration.getMaxGetBlockBodies(),
maxMessageSize));
ethMessages.registerResponseConstructor(
EthPV63.GET_RECEIPTS,
messageData ->
constructGetReceiptsResponse(
blockchain,
messageData,
ethereumWireProtocolConfiguration.getMaxGetReceipts(),
maxMessageSize));
ethMessages.registerResponseConstructor(
EthPV63.GET_NODE_DATA,
messageData ->
constructGetNodeDataResponse(
worldStateArchive,
messageData,
ethereumWireProtocolConfiguration.getMaxGetNodeData(),
maxMessageSize));
ethMessages.registerResponseConstructor(
EthPV65.GET_POOLED_TRANSACTIONS,
messageData ->
constructGetPooledTransactionsResponse(
transactionPool,
messageData,
ethereumWireProtocolConfiguration.getMaxGetPooledTransactions(),
maxMessageSize));
}
static MessageData constructGetHeadersResponse(
final Blockchain blockchain,
final MessageData message,
final int requestLimit,
final int maxMessageSize) {
// Extract parameters from request
final GetBlockHeadersMessage getHeaders = GetBlockHeadersMessage.readFrom(message);
final Optional<Hash> hash = getHeaders.hash();
final int skip = getHeaders.skip();
final int maxHeaders = Math.min(requestLimit, getHeaders.maxHeaders());
final boolean reversed = getHeaders.reverse();
final BlockHeader firstHeader;
// Query first header by hash or number depending on request arguments
if (hash.isPresent()) {
final Hash startHash = hash.get();
firstHeader = blockchain.getBlockHeader(startHash).orElse(null);
} else {
final long firstNumber = getHeaders.blockNumber().getAsLong();
firstHeader = blockchain.getBlockHeader(firstNumber).orElse(null);
}
// The initial header was not found, nothing to return
if (firstHeader == null) {
return BlockHeadersMessage.create(Collections.emptyList());
}
// Encode the first header
int responseSizeEstimate = RLP.MAX_PREFIX_SIZE;
final BytesValueRLPOutput rlp = new BytesValueRLPOutput();
rlp.startList();
final Bytes firstEncodedHeader = RLP.encode(firstHeader::writeTo);
if (responseSizeEstimate + firstEncodedHeader.size() > maxMessageSize) {
return BlockHeadersMessage.create(Collections.emptyList());
}
responseSizeEstimate += firstEncodedHeader.size();
rlp.writeRaw(firstEncodedHeader);
// Collect and encode the remaining headers
final long numberDelta = reversed ? -(skip + 1) : (skip + 1);
for (int i = 1; i < maxHeaders; i++) {
final long blockNumber = firstHeader.getNumber() + i * numberDelta;
if (blockNumber < BlockHeader.GENESIS_BLOCK_NUMBER) {
break;
}
final Optional<BlockHeader> maybeHeader = blockchain.getBlockHeader(blockNumber);
if (maybeHeader.isEmpty()) {
break;
}
final BytesValueRLPOutput headerRlp = new BytesValueRLPOutput();
maybeHeader.get().writeTo(headerRlp);
final int encodedSize = headerRlp.encodedSize();
if (responseSizeEstimate + encodedSize > maxMessageSize) {
break;
}
responseSizeEstimate += encodedSize;
rlp.writeRaw(headerRlp.encoded());
}
rlp.endList();
return BlockHeadersMessage.createUnsafe(rlp.encoded());
}
static MessageData constructGetBodiesResponse(
final Blockchain blockchain,
final MessageData message,
final int requestLimit,
final int maxMessageSize) {
final GetBlockBodiesMessage getBlockBodiesMessage = GetBlockBodiesMessage.readFrom(message);
final Iterable<Hash> hashes = getBlockBodiesMessage.hashes();
int responseSizeEstimate = RLP.MAX_PREFIX_SIZE;
final BytesValueRLPOutput rlp = new BytesValueRLPOutput();
rlp.startList();
int count = 0;
for (final Hash hash : hashes) {
if (count >= requestLimit) {
break;
}
count++;
final Optional<BlockBody> maybeBody = blockchain.getBlockBody(hash);
if (maybeBody.isEmpty()) {
continue;
}
final BlockBody body = maybeBody.get();
final BytesValueRLPOutput bodyOutput = new BytesValueRLPOutput();
body.writeWrappedBodyTo(bodyOutput);
final int encodedSize = bodyOutput.encodedSize();
if (responseSizeEstimate + encodedSize > maxMessageSize) {
break;
}
responseSizeEstimate += encodedSize;
rlp.writeRaw(bodyOutput.encoded());
}
rlp.endList();
return BlockBodiesMessage.createUnsafe(rlp.encoded());
}
static MessageData constructGetReceiptsResponse(
final Blockchain blockchain,
final MessageData message,
final int requestLimit,
final int maxMessageSize) {
final GetReceiptsMessage getReceipts = GetReceiptsMessage.readFrom(message);
final Iterable<Hash> hashes = getReceipts.hashes();
int responseSizeEstimate = RLP.MAX_PREFIX_SIZE;
final BytesValueRLPOutput rlp = new BytesValueRLPOutput();
rlp.startList();
int count = 0;
for (final Hash hash : hashes) {
if (count >= requestLimit) {
break;
}
count++;
final Optional<List<TransactionReceipt>> maybeReceipts = blockchain.getTxReceipts(hash);
if (maybeReceipts.isEmpty()) {
continue;
}
final BytesValueRLPOutput encodedReceipts = new BytesValueRLPOutput();
encodedReceipts.startList();
maybeReceipts.get().forEach(r -> r.writeToForNetwork(encodedReceipts));
encodedReceipts.endList();
final int encodedSize = encodedReceipts.encodedSize();
if (responseSizeEstimate + encodedSize > maxMessageSize) {
break;
}
responseSizeEstimate += encodedSize;
rlp.writeRaw(encodedReceipts.encoded());
}
rlp.endList();
return ReceiptsMessage.createUnsafe(rlp.encoded());
}
static MessageData constructGetPooledTransactionsResponse(
final TransactionPool transactionPool,
final MessageData message,
final int requestLimit,
final int maxMessageSize) {
final GetPooledTransactionsMessage getPooledTransactions =
GetPooledTransactionsMessage.readFrom(message);
final Iterable<Hash> hashes = getPooledTransactions.pooledTransactions();
int responseSizeEstimate = RLP.MAX_PREFIX_SIZE;
final BytesValueRLPOutput rlp = new BytesValueRLPOutput();
rlp.startList();
int count = 0;
for (final Hash hash : hashes) {
if (count >= requestLimit) {
break;
}
count++;
final Optional<Transaction> maybeTx = transactionPool.getTransactionByHash(hash);
if (maybeTx.isEmpty()) {
continue;
}
final BytesValueRLPOutput txRlp = new BytesValueRLPOutput();
TransactionEncoder.encodeRLP(maybeTx.get(), txRlp, EncodingContext.POOLED_TRANSACTION);
final int encodedSize = txRlp.encodedSize();
if (responseSizeEstimate + encodedSize > maxMessageSize) {
break;
}
responseSizeEstimate += encodedSize;
rlp.writeRaw(txRlp.encoded());
}
rlp.endList();
return PooledTransactionsMessage.createUnsafe(rlp.encoded());
}
static MessageData constructGetNodeDataResponse(
final WorldStateArchive worldStateArchive,
final MessageData message,
final int requestLimit,
final int maxMessageSize) {
final GetNodeDataMessage getNodeDataMessage = GetNodeDataMessage.readFrom(message);
final Iterable<Hash> hashes = getNodeDataMessage.hashes();
int responseSizeEstimate = RLP.MAX_PREFIX_SIZE;
final BytesValueRLPOutput rlp = new BytesValueRLPOutput();
rlp.startList();
int count = 0;
for (final Hash hash : hashes) {
if (count >= requestLimit) {
break;
}
count++;
final Optional<Bytes> maybeNodeData = worldStateArchive.getNodeData(hash);
if (maybeNodeData.isEmpty()) {
continue;
}
final BytesValueRLPOutput rlpNodeData = new BytesValueRLPOutput();
rlpNodeData.writeBytes(maybeNodeData.get());
final int encodedSize = rlpNodeData.encodedSize();
if (responseSizeEstimate + encodedSize > maxMessageSize) {
break;
}
responseSizeEstimate += encodedSize;
rlp.writeRaw(rlpNodeData.encoded());
}
rlp.endList();
return NodeDataMessage.createUnsafe(rlp.encoded());
}
}