KubernetesNatManager.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.nat.kubernetes;
import org.hyperledger.besu.nat.NatMethod;
import org.hyperledger.besu.nat.core.AbstractNatManager;
import org.hyperledger.besu.nat.core.IpDetector;
import org.hyperledger.besu.nat.core.domain.NatPortMapping;
import org.hyperledger.besu.nat.core.domain.NatServiceType;
import org.hyperledger.besu.nat.core.domain.NetworkProtocol;
import org.hyperledger.besu.nat.core.exception.NatInitializationException;
import org.hyperledger.besu.nat.kubernetes.service.KubernetesServiceType;
import org.hyperledger.besu.nat.kubernetes.service.LoadBalancerBasedDetector;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.util.ClientBuilder;
import io.kubernetes.client.util.KubeConfig;
import io.kubernetes.client.util.authenticators.GCPAuthenticator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class describes the behaviour of the Kubernetes NAT manager. Kubernetes Nat manager add
* support for Kubernetes’s NAT implementation when Besu is being run from a Kubernetes cluster
*/
public class KubernetesNatManager extends AbstractNatManager {
private static final Logger LOG = LoggerFactory.getLogger(KubernetesNatManager.class);
/** The constant DEFAULT_BESU_SERVICE_NAME_FILTER. */
public static final String DEFAULT_BESU_SERVICE_NAME_FILTER = "besu";
private String internalAdvertisedHost;
private final String besuServiceNameFilter;
private final List<NatPortMapping> forwardedPorts = new ArrayList<>();
/**
* Instantiates a new Kubernetes nat manager.
*
* @param besuServiceNameFilter the besu service name filter
*/
public KubernetesNatManager(final String besuServiceNameFilter) {
super(NatMethod.KUBERNETES);
this.besuServiceNameFilter = besuServiceNameFilter;
}
@Override
protected void doStart() throws NatInitializationException {
LOG.info("Starting kubernetes NAT manager.");
try {
KubeConfig.registerAuthenticator(new GCPAuthenticator());
LOG.debug("Trying to update information using Kubernetes client SDK.");
final ApiClient client = ClientBuilder.cluster().build();
// set the global default api-client to the in-cluster one from above
Configuration.setDefaultApiClient(client);
// the CoreV1Api loads default api-client from global configuration.
final CoreV1Api api = new CoreV1Api();
// invokes the CoreV1Api client
final V1Service service =
api
.listServiceForAllNamespaces(
null, null, null, null, null, null, null, null, null, null)
.getItems()
.stream()
.filter(
v1Service -> v1Service.getMetadata().getName().contains(besuServiceNameFilter))
.findFirst()
.orElseThrow(() -> new NatInitializationException("Service not found"));
updateUsingBesuService(service);
} catch (Exception e) {
throw new NatInitializationException(e.getMessage(), e);
}
}
/**
* Update using besu service. Visible for testing.
*
* @param service the service
* @throws RuntimeException the runtime exception
*/
@VisibleForTesting
void updateUsingBesuService(final V1Service service) throws RuntimeException {
try {
LOG.info("Found Besu service: {}", service.getMetadata().getName());
internalAdvertisedHost =
getIpDetector(service)
.detectAdvertisedIp()
.orElseThrow(
() -> new NatInitializationException("Unable to retrieve IP from service"));
LOG.info("Setting host IP to: {}.", internalAdvertisedHost);
final String internalHost = queryLocalIPAddress().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
service
.getSpec()
.getPorts()
.forEach(
v1ServicePort -> {
try {
final NatServiceType natServiceType =
NatServiceType.fromString(v1ServicePort.getName());
forwardedPorts.add(
new NatPortMapping(
natServiceType,
natServiceType.equals(NatServiceType.DISCOVERY)
? NetworkProtocol.UDP
: NetworkProtocol.TCP,
internalHost,
internalAdvertisedHost,
v1ServicePort.getPort(),
v1ServicePort.getTargetPort().getIntValue()));
} catch (IllegalStateException e) {
LOG.warn("Ignored unknown Besu port: {}", e.getMessage());
}
});
} catch (Exception e) {
throw new RuntimeException(
"Failed update information using pod metadata : " + e.getMessage(), e);
}
}
@Override
protected void doStop() {
LOG.info("Stopping kubernetes NAT manager.");
}
@Override
protected CompletableFuture<String> retrieveExternalIPAddress() {
return CompletableFuture.completedFuture(internalAdvertisedHost);
}
@Override
public CompletableFuture<List<NatPortMapping>> getPortMappings() {
return CompletableFuture.completedFuture(forwardedPorts);
}
private IpDetector getIpDetector(final V1Service v1Service) throws NatInitializationException {
final String serviceType = v1Service.getSpec().getType();
switch (KubernetesServiceType.fromName(serviceType)) {
case CLUSTER_IP:
return () -> Optional.ofNullable(v1Service.getSpec().getClusterIP());
case LOAD_BALANCER:
return new LoadBalancerBasedDetector(v1Service);
default:
throw new NatInitializationException(String.format("%s is not implemented", serviceType));
}
}
}