VertxRequestTransmitter.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.enclave;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.vertx.core.Future;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.RequestOptions;
/** The Vertx request transmitter. */
public class VertxRequestTransmitter implements RequestTransmitter {
private static final String APPLICATION_JSON = "application/json";
private final HttpClient client;
private static final long REQUEST_TIMEOUT_MS = 5000L;
/**
* Instantiates a new Vertx request transmitter.
*
* @param httpClient the http client
*/
public VertxRequestTransmitter(final HttpClient httpClient) {
this.client = httpClient;
}
@Override
public <T> T post(
final String contentType,
final String content,
final String endpoint,
final ResponseBodyHandler<T> responseHandler) {
return sendRequest(
HttpMethod.POST,
Optional.of(contentType),
Optional.of(content),
endpoint,
responseHandler,
false);
}
@Override
public <T> T get(
final String contentType,
final String content,
final String endpoint,
final ResponseBodyHandler<T> responseHandler,
final boolean withAcceptJsonHeader) {
return sendRequest(
HttpMethod.GET,
Optional.ofNullable(contentType),
Optional.ofNullable(content),
endpoint,
responseHandler,
withAcceptJsonHeader);
}
/**
* Send request operation.
*
* @param <T> the type parameter
* @param method the method
* @param contentType the content type
* @param content the content
* @param endpoint the endpoint
* @param responseHandler the response handler
* @param withAcceptJsonHeader the with accept json header
* @return the t
*/
protected <T> T sendRequest(
final HttpMethod method,
final Optional<String> contentType,
final Optional<String> content,
final String endpoint,
final ResponseBodyHandler<T> responseHandler,
final boolean withAcceptJsonHeader) {
try {
final CompletableFuture<T> result = new CompletableFuture<>();
RequestOptions options = new RequestOptions();
options.setTimeout(REQUEST_TIMEOUT_MS);
options.setMethod(method);
options.setURI(endpoint);
if (withAcceptJsonHeader) {
// this is needed when using Tessera GET /transaction/{hash} to choose the right RPC
options.putHeader(HttpHeaderNames.ACCEPT, APPLICATION_JSON);
}
contentType.ifPresent(ct -> options.putHeader(HttpHeaders.CONTENT_TYPE, ct));
final Future<HttpClientRequest> request = client.request(options);
request
.onComplete(
newRequest -> {
if (content.isPresent()) {
request.result().end(content.get());
} else {
request.result().end();
}
request
.result()
.send(
h -> {
if (h.succeeded()) {
handleResponse(
newRequest.result().response().result(), responseHandler, result);
} else {
result.completeExceptionally(h.cause());
}
});
})
.onFailure(result::completeExceptionally);
return result.get();
} catch (final ExecutionException | InterruptedException e) {
if (e.getCause() instanceof EnclaveClientException) {
throw (EnclaveClientException) e.getCause();
} else if (e.getCause() instanceof EnclaveServerException) {
throw (EnclaveServerException) e.getCause();
}
throw new EnclaveIOException("Enclave Communication Failed", e);
}
}
private <T> void handleResponse(
final HttpClientResponse response,
final ResponseBodyHandler<T> responseHandler,
final CompletableFuture<T> future) {
response.bodyHandler(
responseBody -> {
try {
future.complete(
responseHandler.convertResponse(response.statusCode(), responseBody.getBytes()));
} catch (final Exception e) {
future.completeExceptionally(e);
}
});
}
}