import java.util.logging.Logger;
import com.google.common.base.Function;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.FluentIterable;
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
-public class Pipeline implements Iterable<Controlled> {
+public class Pipeline implements Iterable<ControlledComponent> {
/** The logger. */
private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
return sinks.get(source);
}
+ /**
+ * Returns the traffic counters of the given controlled component.
+ *
+ * @param controlledComponent
+ * The controlled component to get the traffic counters for
+ * @return The traffic counters for the given controlled component
+ */
+ public TrafficCounter trafficCounter(ControlledComponent controlledComponent) {
+ long input = -1;
+ long output = -1;
+ for (Connection connection : connections) {
+ /* the connection where the source matches knows the output. */
+ if (connection.source.equals(controlledComponent)) {
+ output = connection.counter();
+ } else if (connection.sinks.contains(controlledComponent)) {
+ input = connection.counter();
+ }
+ }
+ return new TrafficCounter(input, output);
+ }
+
//
// ACTIONS
//
//
@Override
- public Iterator<Controlled> iterator() {
- return ImmutableSet.<Controlled>builder().add(source).addAll(sinks.values()).build().iterator();
+ public Iterator<ControlledComponent> iterator() {
+ return ImmutableSet.<ControlledComponent>builder().add(source).addAll(sinks.values()).build().iterator();
}
//
/** The executor service. */
private final ExecutorService executorService;
+ /** The number of copied bytes. */
+ private long counter;
+
/**
* Creates a new connection.
*
}
//
+ // ACCESSORS
+ //
+
+ /**
+ * Returns the number of bytes that this connection has received from its
+ * source during its lifetime.
+ *
+ * @return The number of processed input bytes
+ */
+ public long counter() {
+ return counter;
+ }
+
+ //
// ACTIONS
//
Metadata firstMetadata = null;
while (!stopped.get()) {
try {
- final Metadata lastMetadata = firstMetadata;
- final Metadata metadata = firstMetadata = source.metadata();
final byte[] buffer;
try {
logger.finest(String.format("Getting %d bytes from %s...", 4096, source));
@Override
public Void call() throws Exception {
- if (!metadata.equals(lastMetadata)) {
- sink.metadataUpdated(metadata);
- }
try {
logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink));
sink.process(buffer);
for (Future<Void> future : futures) {
future.get();
}
+ counter += buffer.length;
} catch (IOException e) {
/* TODO */
e.printStackTrace();
}
+ /**
+ * Container for input and output counters.
+ *
+ * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+ */
+ public static class TrafficCounter {
+
+ /** The number of input bytes. */
+ private final long input;
+
+ /** The number of output bytes. */
+ private final long output;
+
+ /**
+ * Creates a new traffic counter.
+ *
+ * @param input
+ * The number of input bytes (may be {@code -1} to signify non-available
+ * input)
+ * @param output
+ * The number of output bytes (may be {@code -1} to signify non-available
+ * output)
+ */
+ public TrafficCounter(long input, long output) {
+ this.input = input;
+ this.output = output;
+ }
+
+ //
+ // ACCESSORS
+ //
+
+ /**
+ * Returns the number of input bytes.
+ *
+ * @return The number of input bytes, or {@link Optional#absent()} if the
+ * component can not receive input
+ */
+ public Optional<Long> input() {
+ return (input == -1) ? Optional.<Long>absent() : Optional.of(input);
+ }
+
+ /**
+ * Returns the number of output bytes.
+ *
+ * @return The number of output bytes, or {@link Optional#absent()} if the
+ * component can not send output
+ */
+ public Optional<Long> output() {
+ return (output == -1) ? Optional.<Long>absent() : Optional.of(output);
+ }
+
+ }
+
}