JsonRpcIpcService.java
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.ipc;
import static org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.RpcErrorType.INVALID_REQUEST;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponseType;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.RpcErrorType;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JsonRpcIpcService {
private static final Logger LOG = LoggerFactory.getLogger(JsonRpcIpcService.class);
private static final ObjectWriter JSON_OBJECT_WRITER =
new ObjectMapper()
.registerModule(new Jdk8Module()) // Handle JDK8 Optionals (de)serialization
.writer()
.without(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM)
.with(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
private final Vertx vertx;
private final Path path;
private final JsonRpcExecutor jsonRpcExecutor;
private NetServer netServer;
public JsonRpcIpcService(final Vertx vertx, final Path path, final JsonRpcExecutor rpcExecutor) {
this.vertx = vertx;
this.path = path;
this.jsonRpcExecutor = rpcExecutor;
}
public Future<NetServer> start() {
netServer = vertx.createNetServer(buildNetServerOptions());
netServer.connectHandler(
socket -> {
AtomicBoolean closedSocket = new AtomicBoolean(false);
socket
.closeHandler(unused -> closedSocket.set(true))
.handler(
buffer -> {
if (buffer.length() == 0) {
errorReturn(socket, null, RpcErrorType.INVALID_REQUEST);
} else {
try {
final JsonObject jsonRpcRequest = buffer.toJsonObject();
vertx
.<JsonRpcResponse>executeBlocking(
promise -> {
final JsonRpcResponse jsonRpcResponse =
jsonRpcExecutor.execute(
Optional.empty(),
null,
null,
closedSocket::get,
jsonRpcRequest,
req -> req.mapTo(JsonRpcRequest.class));
promise.complete(jsonRpcResponse);
})
.onSuccess(
jsonRpcResponse -> {
try {
socket.write(
JSON_OBJECT_WRITER.writeValueAsString(jsonRpcResponse)
+ '\n');
} catch (JsonProcessingException e) {
LOG.error("Error streaming JSON-RPC response", e);
}
})
.onFailure(
throwable -> {
try {
final Integer id = jsonRpcRequest.getInteger("id", null);
errorReturn(socket, id, RpcErrorType.INTERNAL_ERROR);
} catch (ClassCastException idNotIntegerException) {
errorReturn(socket, null, RpcErrorType.INTERNAL_ERROR);
}
});
} catch (DecodeException jsonObjectDecodeException) {
try {
final JsonArray batchJsonRpcRequest = buffer.toJsonArray();
if (batchJsonRpcRequest.isEmpty()) {
errorReturn(socket, null, RpcErrorType.INVALID_REQUEST);
} else {
vertx
.<List<JsonRpcResponse>>executeBlocking(
promise -> {
List<JsonRpcResponse> responses = new ArrayList<>();
for (int i = 0; i < batchJsonRpcRequest.size(); i++) {
final JsonObject jsonRequest;
try {
jsonRequest = batchJsonRpcRequest.getJsonObject(i);
} catch (ClassCastException e) {
responses.add(
new JsonRpcErrorResponse(null, INVALID_REQUEST));
continue;
}
responses.add(
jsonRpcExecutor.execute(
Optional.empty(),
null,
null,
closedSocket::get,
jsonRequest,
req -> req.mapTo(JsonRpcRequest.class)));
}
promise.complete(responses);
})
.onSuccess(
jsonRpcBatchResponse -> {
try {
final JsonRpcResponse[] completed =
jsonRpcBatchResponse.stream()
.filter(
jsonRpcResponse ->
jsonRpcResponse.getType()
!= JsonRpcResponseType.NONE)
.toArray(JsonRpcResponse[]::new);
socket.write(
JSON_OBJECT_WRITER.writeValueAsString(completed)
+ '\n');
} catch (JsonProcessingException e) {
LOG.error("Error streaming JSON-RPC response", e);
}
})
.onFailure(
throwable ->
errorReturn(socket, null, RpcErrorType.INTERNAL_ERROR));
}
} catch (DecodeException jsonArrayDecodeException) {
errorReturn(socket, null, RpcErrorType.PARSE_ERROR);
}
}
}
});
});
return netServer
.listen(SocketAddress.domainSocketAddress(path.toString()))
.onSuccess(successServer -> LOG.info("IPC endpoint opened: {}", path))
.onFailure(throwable -> LOG.error("Unable to open IPC endpoint", throwable));
}
public Future<Void> stop() {
if (netServer == null) {
return Future.succeededFuture();
} else {
return netServer
.close()
.onComplete(
closeResult -> {
try {
Files.deleteIfExists(path);
} catch (IOException e) {
LOG.error("Unable to delete IPC file", e);
}
});
}
}
private Future<Void> errorReturn(
final NetSocket socket, final Integer id, final RpcErrorType rpcError) {
return socket.write(Buffer.buffer(Json.encode(new JsonRpcErrorResponse(id, rpcError)) + '\n'));
}
private NetServerOptions buildNetServerOptions() {
return new NetServerOptions();
}
}