Pipe.java
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.services.pipeline;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Forms the connection between two pipeline stages. A pipe is essentially a blocking queue with the
* added ability to signal when no further input is available because the pipe has been closed or
* the pipeline aborted.
*
* <p>In most cases a Pipe is used through one of two narrower interfaces it supports {@link
* ReadPipe}* and {@link WritePipe}. These are designed to expose only the operations relevant to
* objects either reading from or publishing to the pipe respectively.
*
* @param <T> the type of item that flows through the pipe.
*/
public class Pipe<T> implements ReadPipe<T>, WritePipe<T> {
private static final Logger LOG = LoggerFactory.getLogger(Pipe.class);
private final BlockingQueue<T> queue;
private final Counter inputCounter;
private final Counter outputCounter;
private final Counter abortedItemCounter;
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicBoolean aborted = new AtomicBoolean();
/**
* Instantiates a new Pipe.
*
* @param capacity the capacity
* @param inputCounter the input counter
* @param outputCounter the output counter
* @param abortedItemCounter the aborted item counter
*/
public Pipe(
final int capacity,
final Counter inputCounter,
final Counter outputCounter,
final Counter abortedItemCounter) {
queue = new ArrayBlockingQueue<>(capacity);
this.inputCounter = inputCounter;
this.outputCounter = outputCounter;
this.abortedItemCounter = abortedItemCounter;
}
@Override
public boolean isOpen() {
return !closed.get() && !aborted.get();
}
@Override
public boolean isAborted() {
return aborted.get();
}
@Override
public boolean hasRemainingCapacity() {
return queue.remainingCapacity() > 0 && isOpen();
}
@Override
public void close() {
closed.set(true);
}
@Override
public void abort() {
if (aborted.compareAndSet(false, true)) {
abortedItemCounter.inc(queue.size());
}
}
@Override
public boolean hasMore() {
if (aborted.get()) {
return false;
}
return !closed.get() || !queue.isEmpty();
}
@Override
public T get() {
try {
while (hasMore()) {
final T value = queue.poll(1, TimeUnit.SECONDS);
if (value != null) {
outputCounter.inc();
return value;
}
}
} catch (final InterruptedException e) {
LOG.trace("Interrupted while waiting for next item", e);
}
return null;
}
@Override
public T poll() {
final T item = queue.poll();
if (item != null) {
outputCounter.inc();
}
return item;
}
@Override
public int drainTo(final Collection<T> output, final int maxElements) {
final int count = queue.drainTo(output, maxElements);
outputCounter.inc(count);
return count;
}
@Override
public void put(final T value) {
while (isOpen()) {
try {
if (queue.offer(value, 1, TimeUnit.SECONDS)) {
inputCounter.inc();
return;
}
} catch (final InterruptedException e) {
LOG.trace("Interrupted while waiting to add to output", e);
}
}
}
}