Stratum1EthProxyProtocol.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.stratum;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.EthGetWork;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.EthSubmitHashRate;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.EthSubmitWork;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse;
import org.hyperledger.besu.ethereum.blockcreation.PoWMiningCoordinator;
import org.hyperledger.besu.ethereum.mainnet.PoWSolution;
import org.hyperledger.besu.ethereum.mainnet.PoWSolverInputs;
import java.util.ArrayList;
import java.util.Collection;
import java.util.function.Consumer;
import java.util.function.Function;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.json.JsonMapper;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of the stratum1+tcp protocol.
*
* <p>This protocol allows miners to submit EthHash solutions over a persistent TCP connection.
*/
public class Stratum1EthProxyProtocol implements StratumProtocol {
private static final Logger LOG = LoggerFactory.getLogger(Stratum1EthProxyProtocol.class);
private static final JsonMapper mapper = new JsonMapper();
private final EthGetWork ethGetWork;
private final EthSubmitWork ethSubmitWork;
private final EthSubmitHashRate ethSubmitHashRate;
private final Collection<StratumConnection> activeConnections = new ArrayList<>();
public Stratum1EthProxyProtocol(final PoWMiningCoordinator miningCoordinator) {
ethGetWork = new EthGetWork(miningCoordinator);
ethSubmitWork = new EthSubmitWork(miningCoordinator);
ethSubmitHashRate = new EthSubmitHashRate(miningCoordinator);
}
@Override
public boolean maybeHandle(
final Buffer initialMessage, final StratumConnection conn, final Consumer<String> sender) {
JsonRpcRequest req;
try {
req = new JsonObject(initialMessage).mapTo(JsonRpcRequest.class);
} catch (DecodeException | IllegalArgumentException e) {
LOG.debug(e.getMessage(), e);
return false;
}
if (!"eth_submitLogin".equals(req.getMethod())) {
LOG.debug("Invalid first message method: {}", req.getMethod());
return false;
}
try {
String response = mapper.writeValueAsString(new JsonRpcSuccessResponse(req.getId(), true));
sender.accept(response);
activeConnections.add(conn);
} catch (JsonProcessingException e) {
LOG.debug(e.getMessage(), e);
conn.close();
}
return true;
}
@Override
public void onClose(final StratumConnection conn) {
activeConnections.remove(conn);
}
@Override
public void handle(
final StratumConnection conn, final Buffer message, final Consumer<String> sender) {
final JsonRpcRequest req = new JsonObject(message).mapTo(JsonRpcRequest.class);
final JsonRpcRequestContext reqContext = new JsonRpcRequestContext(req);
final JsonRpcResponse rpcResponse;
switch (req.getMethod()) {
case "eth_getWork" -> rpcResponse = ethGetWork.response(reqContext);
case "eth_submitWork" -> rpcResponse = ethSubmitWork.response(reqContext);
case "eth_submitHashrate" -> rpcResponse = ethSubmitHashRate.response(reqContext);
default -> {
LOG.debug("Invalid method: {}", req.getMethod());
throw new UnsupportedOperationException("Invalid method: " + req.getMethod());
}
}
try {
sender.accept(mapper.writeValueAsString(rpcResponse));
} catch (JsonProcessingException e) {
throw new IllegalStateException(e);
}
}
@Override
public void setCurrentWorkTask(final PoWSolverInputs input) {
activeConnections.forEach(
conn -> {
try {
conn.notificationSender()
.accept(
mapper.writeValueAsString(
new JsonRpcSuccessResponse(0, ethGetWork.rawResponse(input))));
} catch (JsonProcessingException e) {
LOG.error("Failed to announce new work", e);
}
});
}
@Override
public void setSubmitCallback(final Function<PoWSolution, Boolean> submitSolutionCallback) {}
}