Rename “Controlled” to “ControlledComponent.”
[sonitus.git] / src / main / java / net / pterodactylus / sonitus / data / Pipeline.java
index a38d6fe..ee67eec 100644 (file)
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Logger;
 
 import com.google.common.base.Function;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.FluentIterable;
@@ -45,7 +46,7 @@ import com.google.common.util.concurrent.MoreExecutors;
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
-public class Pipeline implements Iterable<Controlled> {
+public class Pipeline implements Iterable<ControlledComponent> {
 
        /** The logger. */
        private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
@@ -98,6 +99,27 @@ public class Pipeline implements Iterable<Controlled> {
                return sinks.get(source);
        }
 
+       /**
+        * Returns the traffic counters of the given controlled component.
+        *
+        * @param controlledComponent
+        *              The controlled component to get the traffic counters for
+        * @return The traffic counters for the given controlled component
+        */
+       public TrafficCounter trafficCounter(ControlledComponent controlledComponent) {
+               long input = -1;
+               long output = -1;
+               for (Connection connection : connections) {
+                       /* the connection where the source matches knows the output. */
+                       if (connection.source.equals(controlledComponent)) {
+                               output = connection.counter();
+                       } else if (connection.sinks.contains(controlledComponent)) {
+                               input = connection.counter();
+                       }
+               }
+               return new TrafficCounter(input, output);
+       }
+
        //
        // ACTIONS
        //
@@ -149,8 +171,8 @@ public class Pipeline implements Iterable<Controlled> {
        //
 
        @Override
-       public Iterator<Controlled> iterator() {
-               return ImmutableSet.<Controlled>builder().add(source).addAll(sinks.values()).build().iterator();
+       public Iterator<ControlledComponent> iterator() {
+               return ImmutableSet.<ControlledComponent>builder().add(source).addAll(sinks.values()).build().iterator();
        }
 
        //
@@ -260,6 +282,9 @@ public class Pipeline implements Iterable<Controlled> {
                /** The executor service. */
                private final ExecutorService executorService;
 
+               /** The number of copied bytes. */
+               private long counter;
+
                /**
                 * Creates a new connection.
                 *
@@ -279,6 +304,20 @@ public class Pipeline implements Iterable<Controlled> {
                }
 
                //
+               // ACCESSORS
+               //
+
+               /**
+                * Returns the number of bytes that this connection has received from its
+                * source during its lifetime.
+                *
+                * @return The number of processed input bytes
+                */
+               public long counter() {
+                       return counter;
+               }
+
+               //
                // ACTIONS
                //
 
@@ -333,6 +372,7 @@ public class Pipeline implements Iterable<Controlled> {
                                        for (Future<Void> future : futures) {
                                                future.get();
                                        }
+                                       counter += buffer.length;
                                } catch (IOException e) {
                                        /* TODO */
                                        e.printStackTrace();
@@ -351,4 +391,58 @@ public class Pipeline implements Iterable<Controlled> {
 
        }
 
+       /**
+        * Container for input and output counters.
+        *
+        * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+        */
+       public static class TrafficCounter {
+
+               /** The number of input bytes. */
+               private final long input;
+
+               /** The number of output bytes. */
+               private final long output;
+
+               /**
+                * Creates a new traffic counter.
+                *
+                * @param input
+                *              The number of input bytes (may be {@code -1} to signify non-available
+                *              input)
+                * @param output
+                *              The number of output bytes (may be {@code -1} to signify non-available
+                *              output)
+                */
+               public TrafficCounter(long input, long output) {
+                       this.input = input;
+                       this.output = output;
+               }
+
+               //
+               // ACCESSORS
+               //
+
+               /**
+                * Returns the number of input bytes.
+                *
+                * @return The number of input bytes, or {@link Optional#absent()} if the
+                *         component can not receive input
+                */
+               public Optional<Long> input() {
+                       return (input == -1) ? Optional.<Long>absent() : Optional.of(input);
+               }
+
+               /**
+                * Returns the number of output bytes.
+                *
+                * @return The number of output bytes, or {@link Optional#absent()} if the
+                *         component can not send output
+                */
+               public Optional<Long> output() {
+                       return (output == -1) ? Optional.<Long>absent() : Optional.of(output);
+               }
+
+       }
+
 }