Improve logging.
[sonitus.git] / src / main / java / net / pterodactylus / sonitus / data / Pipeline.java
index 4e6a457..ae4158d 100644 (file)
@@ -19,6 +19,7 @@ package net.pterodactylus.sonitus.data;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -29,9 +30,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Logger;
 
 import com.google.common.base.Function;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
@@ -43,7 +46,7 @@ import com.google.common.util.concurrent.MoreExecutors;
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
-public class Pipeline {
+public class Pipeline implements Iterable<ControlledComponent> {
 
        /** The logger. */
        private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
@@ -54,8 +57,8 @@ public class Pipeline {
        /** The sinks for each source. */
        private final Multimap<Source, Sink> sinks;
 
-       /** All started feeders. */
-       private final List<Feeder> feeders = Lists.newArrayList();
+       /** All started connections. */
+       private final List<Connection> connections = Lists.newArrayList();
 
        /**
         * Creates a new pipeline.
@@ -68,6 +71,68 @@ public class Pipeline {
        private Pipeline(Source source, Multimap<Source, Sink> sinks) {
                this.source = Preconditions.checkNotNull(source, "source must not be null");
                this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
+               for (ControlledComponent component : Lists.reverse(components())) {
+                       logger.finest(String.format("Adding Listener to %s.", component.name()));
+                       component.addMetadataListener(new MetadataListener() {
+                               @Override
+                               public void metadataUpdated(ControlledComponent component, Metadata metadata) {
+                                       if (!(component instanceof Source)) {
+                                               return;
+                                       }
+                                       for (ControlledComponent controlledComponent : sinks((Source) component)) {
+                                               logger.fine(String.format("Updating Metadata from %s to %s as %s.", component.name(), controlledComponent.name(), metadata));
+                                               controlledComponent.metadataUpdated(metadata);
+                                       }
+                               }
+                       });
+               }
+       }
+
+       //
+       // ACCESSORS
+       //
+
+       /**
+        * Expose this pipeline’s source.
+        *
+        * @return This pipeline’s source
+        */
+       public Source source() {
+               return source;
+       }
+
+       /**
+        * Returns all {@link Sink}s (or {@link Filter}s, really) that are connected to
+        * the given source.
+        *
+        * @param source
+        *              The source to get the sinks for
+        * @return The sinks connected to the given source, or an empty list if the
+        *         source does not exist in this pipeline
+        */
+       public Collection<Sink> sinks(Source source) {
+               return sinks.get(source);
+       }
+
+       /**
+        * Returns the traffic counters of the given controlled component.
+        *
+        * @param controlledComponent
+        *              The controlled component to get the traffic counters for
+        * @return The traffic counters for the given controlled component
+        */
+       public TrafficCounter trafficCounter(ControlledComponent controlledComponent) {
+               long input = -1;
+               long output = -1;
+               for (Connection connection : connections) {
+                       /* the connection where the source matches knows the output. */
+                       if (connection.source.equals(controlledComponent)) {
+                               output = connection.counter();
+                       } else if (connection.sinks.contains(controlledComponent)) {
+                               input = connection.counter();
+                       }
+               }
+               return new TrafficCounter(input, output);
        }
 
        //
@@ -83,7 +148,7 @@ public class Pipeline {
         *              if the pipeline is already running
         */
        public void start() throws IOException, IllegalStateException {
-               if (!feeders.isEmpty()) {
+               if (!connections.isEmpty()) {
                        throw new IllegalStateException("Pipeline is already running!");
                }
                List<Source> sources = Lists.newArrayList();
@@ -92,28 +157,70 @@ public class Pipeline {
                while (!sources.isEmpty()) {
                        Source source = sources.remove(0);
                        Collection<Sink> sinks = this.sinks.get(source);
-                       feeders.add(new Feeder(source, sinks));
+                       connections.add(new Connection(source, sinks));
                        for (Sink sink : sinks) {
+                               logger.info(String.format("Opening %s with %s...", sink.name(), source.metadata()));
                                sink.open(source.metadata());
                                if (sink instanceof Filter) {
                                        sources.add((Source) sink);
                                }
                        }
                }
-               for (Feeder feeder : feeders) {
-                       logger.info(String.format("Starting Feeder from %s to %s.", feeder.source, feeder.sinks));
-                       new Thread(feeder).start();
+               for (Connection connection : connections) {
+                       logger.info(String.format("Starting Connection from %s to %s.", connection.source.name(), FluentIterable.from(connection.sinks).transform(new Function<Sink, String>() {
+                               @Override
+                               public String apply(Sink sink) {
+                                       return sink.name();
+                               }
+                       })));
+                       new Thread(connection).start();
                }
        }
 
        public void stop() {
-               if (!feeders.isEmpty()) {
+               if (!connections.isEmpty()) {
                        /* pipeline is not running. */
                        return;
                }
-               for (Feeder feeder : feeders) {
-                       feeder.stop();
+               for (Connection connection : connections) {
+                       connection.stop();
+               }
+       }
+
+       //
+       // ITERABLE METHODS
+       //
+
+       @Override
+       public Iterator<ControlledComponent> iterator() {
+               return components().iterator();
+       }
+
+       //
+       // PRIVATE METHODS
+       //
+
+       /**
+        * Returns all components of this pipeline, listed breadth-first, starting with
+        * the source.
+        *
+        * @return All components of this pipeline
+        */
+       public List<ControlledComponent> components() {
+               ImmutableList.Builder<ControlledComponent> components = ImmutableList.builder();
+               List<ControlledComponent> currentComponents = Lists.newArrayList();
+               components.add(source);
+               currentComponents.add(source);
+               while (!currentComponents.isEmpty()) {
+                       Collection<Sink> sinks = this.sinks((Source) currentComponents.remove(0));
+                       for (Sink sink : sinks) {
+                               components.add(sink);
+                               if (sink instanceof Source) {
+                                       currentComponents.add(sink);
+                               }
+                       }
                }
+               return components.build();
        }
 
        //
@@ -203,13 +310,13 @@ public class Pipeline {
        }
 
        /**
-        * A feeder is responsible for streaming audio from one {@link Source} to an
-        * arbitrary number of {@link Sink}s it is connected to. A feeder is started by
-        * creating a {@link Thread} wrapping it and starting said thread.
+        * A connection is responsible for streaming audio from one {@link Source} to
+        * an arbitrary number of {@link Sink}s it is connected to. A connection is
+        * started by creating a {@link Thread} wrapping it and starting said thread.
         *
         * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
         */
-       private class Feeder implements Runnable {
+       public class Connection implements Runnable {
 
                /** The source. */
                private final Source source;
@@ -223,15 +330,18 @@ public class Pipeline {
                /** The executor service. */
                private final ExecutorService executorService;
 
+               /** The number of copied bytes. */
+               private long counter;
+
                /**
-                * Creates a new feeder.
+                * Creates a new connection.
                 *
                 * @param source
                 *              The source of the stream
                 * @param sinks
                 *              The sinks to which to stream
                 */
-               public Feeder(Source source, Collection<Sink> sinks) {
+               public Connection(Source source, Collection<Sink> sinks) {
                        this.source = source;
                        this.sinks = sinks;
                        if (sinks.size() == 1) {
@@ -242,10 +352,24 @@ public class Pipeline {
                }
 
                //
+               // ACCESSORS
+               //
+
+               /**
+                * Returns the number of bytes that this connection has received from its
+                * source during its lifetime.
+                *
+                * @return The number of processed input bytes
+                */
+               public long counter() {
+                       return counter;
+               }
+
+               //
                // ACTIONS
                //
 
-               /** Stops this feeder. */
+               /** Stops this connection. */
                public void stop() {
                        stopped.set(true);
                }
@@ -259,8 +383,6 @@ public class Pipeline {
                        Metadata firstMetadata = null;
                        while (!stopped.get()) {
                                try {
-                                       final Metadata lastMetadata = firstMetadata;
-                                       final Metadata metadata = firstMetadata = source.metadata();
                                        final byte[] buffer;
                                        try {
                                                logger.finest(String.format("Getting %d bytes from %s...", 4096, source));
@@ -277,9 +399,6 @@ public class Pipeline {
 
                                                                @Override
                                                                public Void call() throws Exception {
-                                                                       if (!metadata.equals(lastMetadata)) {
-                                                                               sink.metadataUpdated(metadata);
-                                                                       }
                                                                        try {
                                                                                logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink));
                                                                                sink.process(buffer);
@@ -296,6 +415,7 @@ public class Pipeline {
                                        for (Future<Void> future : futures) {
                                                future.get();
                                        }
+                                       counter += buffer.length;
                                } catch (IOException e) {
                                        /* TODO */
                                        e.printStackTrace();
@@ -314,4 +434,58 @@ public class Pipeline {
 
        }
 
+       /**
+        * Container for input and output counters.
+        *
+        * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+        */
+       public static class TrafficCounter {
+
+               /** The number of input bytes. */
+               private final long input;
+
+               /** The number of output bytes. */
+               private final long output;
+
+               /**
+                * Creates a new traffic counter.
+                *
+                * @param input
+                *              The number of input bytes (may be {@code -1} to signify non-available
+                *              input)
+                * @param output
+                *              The number of output bytes (may be {@code -1} to signify non-available
+                *              output)
+                */
+               public TrafficCounter(long input, long output) {
+                       this.input = input;
+                       this.output = output;
+               }
+
+               //
+               // ACCESSORS
+               //
+
+               /**
+                * Returns the number of input bytes.
+                *
+                * @return The number of input bytes, or {@link Optional#absent()} if the
+                *         component can not receive input
+                */
+               public Optional<Long> input() {
+                       return (input == -1) ? Optional.<Long>absent() : Optional.of(input);
+               }
+
+               /**
+                * Returns the number of output bytes.
+                *
+                * @return The number of output bytes, or {@link Optional#absent()} if the
+                *         component can not send output
+                */
+               public Optional<Long> output() {
+                       return (output == -1) ? Optional.<Long>absent() : Optional.of(output);
+               }
+
+       }
+
 }