TransactionsMessageProcessor.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions;
import static java.time.Instant.now;
import static org.hyperledger.besu.ethereum.core.Transaction.toHashList;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.messages.TransactionsMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.ethereum.rlp.RLPException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class TransactionsMessageProcessor {
private static final Logger LOG = LoggerFactory.getLogger(TransactionsMessageProcessor.class);
static final String METRIC_LABEL = "transactions";
private final PeerTransactionTracker transactionTracker;
private final TransactionPool transactionPool;
private final TransactionPoolMetrics metrics;
public TransactionsMessageProcessor(
final PeerTransactionTracker transactionTracker,
final TransactionPool transactionPool,
final TransactionPoolMetrics metrics) {
this.transactionTracker = transactionTracker;
this.transactionPool = transactionPool;
this.metrics = metrics;
metrics.initExpiredMessagesCounter(METRIC_LABEL);
}
void processTransactionsMessage(
final EthPeer peer,
final TransactionsMessage transactionsMessage,
final Instant startedAt,
final Duration keepAlive) {
// Check if message is not expired.
if (startedAt.plus(keepAlive).isAfter(now())) {
this.processTransactionsMessage(peer, transactionsMessage);
} else {
metrics.incrementExpiredMessages(METRIC_LABEL);
}
}
private void processTransactionsMessage(
final EthPeer peer, final TransactionsMessage transactionsMessage) {
try {
final List<Transaction> incomingTransactions = transactionsMessage.transactions();
final Collection<Transaction> freshTransactions = skipSeenTransactions(incomingTransactions);
transactionTracker.markTransactionsAsSeen(peer, incomingTransactions);
metrics.incrementAlreadySeenTransactions(
METRIC_LABEL, incomingTransactions.size() - freshTransactions.size());
LOG.atTrace()
.setMessage(
"Received transactions message from {}, incoming transactions {}, incoming list {}"
+ ", fresh transactions {}, fresh list {}")
.addArgument(peer)
.addArgument(incomingTransactions::size)
.addArgument(() -> toHashList(incomingTransactions))
.addArgument(freshTransactions::size)
.addArgument(() -> toHashList(freshTransactions))
.log();
transactionPool.addRemoteTransactions(freshTransactions);
} catch (final RLPException ex) {
if (peer != null) {
LOG.debug(
"Malformed transaction message received (BREACH_OF_PROTOCOL), disconnecting: {}",
peer,
ex);
peer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL_MALFORMED_MESSAGE_RECEIVED);
}
}
}
private Collection<Transaction> skipSeenTransactions(final List<Transaction> inTransactions) {
return inTransactions.stream()
.filter(tx -> !transactionTracker.hasSeenTransaction(tx.getHash()))
.collect(Collectors.toUnmodifiableList());
}
}