Pull all interfaces into a single interface: Filter.
[sonitus.git] / src / main / java / net / pterodactylus / sonitus / data / Pipeline.java
index 9464f23..5bbcd63 100644 (file)
@@ -19,6 +19,7 @@ package net.pterodactylus.sonitus.data;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -29,45 +30,61 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Logger;
 
 import com.google.common.base.Function;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.MoreExecutors;
 
 /**
- * A pipeline is responsible for streaming audio data from a {@link Source} to
- * an arbitrary number of connected {@link Filter}s and {@link Sink}s.
+ * A pipeline is responsible for streaming audio data from a {@link Filter} to
+ * an arbitrary number of connected {@link Filter}s.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
-public class Pipeline {
+public class Pipeline implements Iterable<Filter> {
 
        /** The logger. */
        private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
 
        /** The source of the audio stream. */
-       private final Source source;
+       private final Filter source;
 
-       /** The sinks for each source. */
-       private final Multimap<Source, Sink> sinks;
+       /** The filters for each source. */
+       private final ListMultimap<Filter, Filter> filters;
 
-       /** All started feeders. */
-       private final List<Feeder> feeders = Lists.newArrayList();
+       /** All started connections. */
+       private final List<Connection> connections = Lists.newArrayList();
 
        /**
         * Creates a new pipeline.
         *
         * @param source
         *              The source of the audio stream
-        * @param sinks
-        *              The sinks for each source
+        * @param filters
+        *              The filters for each source
         */
-       private Pipeline(Source source, Multimap<Source, Sink> sinks) {
+       private Pipeline(Filter source, Multimap<Filter, Filter> filters) {
                this.source = Preconditions.checkNotNull(source, "source must not be null");
-               this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
+               this.filters = ArrayListMultimap.create(Preconditions.checkNotNull(filters, "filters must not be null"));
+               for (Filter filter : Lists.reverse(filters())) {
+                       logger.finest(String.format("Adding Listener to %s.", filter.name()));
+                       filter.addMetadataListener(new MetadataListener() {
+
+                               @Override
+                               public void metadataUpdated(Filter filter, Metadata metadata) {
+                                       for (Filter sinks : filters(filter)) {
+                                               logger.fine(String.format("Updating Metadata from %s to %s as %s.", filter.name(), sinks.name(), metadata));
+                                               sinks.metadataUpdated(metadata);
+                                       }
+                               }
+                       });
+               }
        }
 
        //
@@ -79,21 +96,41 @@ public class Pipeline {
         *
         * @return This pipeline’s source
         */
-       public Source source() {
+       public Filter source() {
                return source;
        }
 
        /**
-        * Returns all {@link Sink}s (or {@link Filter}s, really) that are connected to
-        * the given source.
+        * Returns all {@link Filter}s that are connected to the given filter.
         *
-        * @param source
-        *              The source to get the sinks for
-        * @return The sinks connected to the given source, or an empty list if the
-        *         source does not exist in this pipeline
+        * @param filter
+        *              The filter to get the connected filters for
+        * @return The filters connected to the given filter, or an empty list if the
+        *         filter does not exist in this pipeline, or is not connected to any filters
         */
-       public Collection<Sink> sinks(Source source) {
-               return sinks.get(source);
+       public List<Filter> filters(Filter filter) {
+               return filters.get(filter);
+       }
+
+       /**
+        * Returns the traffic counters of the given filter.
+        *
+        * @param filter
+        *              The filter to get the traffic counters for
+        * @return The traffic counters for the given filter
+        */
+       public TrafficCounter trafficCounter(Filter filter) {
+               long input = -1;
+               long output = -1;
+               for (Connection connection : connections) {
+                       /* the connection where the source matches knows the output. */
+                       if (connection.source.equals(filter)) {
+                               output = connection.counter();
+                       } else if (connection.sinks.contains(filter)) {
+                               input = connection.counter();
+                       }
+               }
+               return new TrafficCounter(input, output);
        }
 
        //
@@ -104,45 +141,85 @@ public class Pipeline {
         * Starts the pipeline.
         *
         * @throws IOException
-        *              if any of the sinks can not be opened
+        *              if any of the filters can not be opened
         * @throws IllegalStateException
         *              if the pipeline is already running
         */
        public void start() throws IOException, IllegalStateException {
-               if (!feeders.isEmpty()) {
+               if (!connections.isEmpty()) {
                        throw new IllegalStateException("Pipeline is already running!");
                }
-               List<Source> sources = Lists.newArrayList();
-               sources.add(source);
+               List<Filter> filters = Lists.newArrayList();
+               filters.add(source);
                /* collect all source->sink pairs. */
-               while (!sources.isEmpty()) {
-                       Source source = sources.remove(0);
-                       Collection<Sink> sinks = this.sinks.get(source);
-                       feeders.add(new Feeder(source, sinks));
-                       for (Sink sink : sinks) {
-                               sink.open(source.metadata());
-                               if (sink instanceof Filter) {
-                                       sources.add((Source) sink);
-                               }
+               while (!filters.isEmpty()) {
+                       Filter filter = filters.remove(0);
+                       Collection<Filter> sinks = this.filters.get(filter);
+                       connections.add(new Connection(filter, sinks));
+                       for (Filter sink : sinks) {
+                               logger.info(String.format("Opening %s with %s...", sink.name(), source.metadata()));
+                               sink.open(filter.metadata());
+                               filters.add(sink);
                        }
                }
-               for (Feeder feeder : feeders) {
-                       logger.info(String.format("Starting Feeder from %s to %s.", feeder.source, feeder.sinks));
-                       new Thread(feeder).start();
+               for (Connection connection : connections) {
+                       String threadName = String.format("%s → %s.", connection.source.name(), FluentIterable.from(connection.sinks).transform(new Function<Filter, String>() {
+
+                               @Override
+                               public String apply(Filter sink) {
+                                       return sink.name();
+                               }
+                       }));
+                       logger.info(String.format("Starting Thread: %s", threadName));
+                       new Thread(connection, threadName).start();
                }
        }
 
        public void stop() {
-               if (!feeders.isEmpty()) {
+               if (!connections.isEmpty()) {
                        /* pipeline is not running. */
                        return;
                }
-               for (Feeder feeder : feeders) {
-                       feeder.stop();
+               for (Connection connection : connections) {
+                       connection.stop();
                }
        }
 
        //
+       // ITERABLE METHODS
+       //
+
+       @Override
+       public Iterator<Filter> iterator() {
+               return filters().iterator();
+       }
+
+       //
+       // PRIVATE METHODS
+       //
+
+       /**
+        * Returns all filters of this pipeline, listed breadth-first, starting with
+        * the source.
+        *
+        * @return All filters of this pipeline
+        */
+       public List<Filter> filters() {
+               ImmutableList.Builder<Filter> filters = ImmutableList.builder();
+               List<Filter> remainingFilters = Lists.newArrayList();
+               filters.add(source);
+               remainingFilters.add(source);
+               while (!remainingFilters.isEmpty()) {
+                       Collection<Filter> sinks = this.filters(remainingFilters.remove(0));
+                       for (Filter sink : sinks) {
+                               filters.add(sink);
+                               remainingFilters.add(sink);
+                       }
+               }
+               return filters.build();
+       }
+
+       //
        // STATIC METHODS
        //
 
@@ -153,7 +230,7 @@ public class Pipeline {
         *              The source at which to start
         * @return A builder for a new pipeline
         */
-       public static Builder builder(Source source) {
+       public static Builder builder(Filter source) {
                return new Builder(source);
        }
 
@@ -165,13 +242,13 @@ public class Pipeline {
        public static class Builder {
 
                /** The source of the pipeline. */
-               private final Source source;
+               private final Filter source;
 
-               /** The sinks to which each source streams. */
-               private Multimap<Source, Sink> nextSinks = ArrayListMultimap.create();
+               /** The filters to which each source streams. */
+               private Multimap<Filter, Filter> nextSinks = ArrayListMultimap.create();
 
                /** The last added source. */
-               private Source lastSource;
+               private Filter lastSource;
 
                /**
                 * Creates a new builder.
@@ -179,31 +256,27 @@ public class Pipeline {
                 * @param source
                 *              The source that starts the pipeline
                 */
-               private Builder(Source source) {
+               private Builder(Filter source) {
                        this.source = source;
                        lastSource = source;
                }
 
                /**
-                * Adds a {@link Sink} (or {@link Filter} as a recipient for the last added
-                * {@link Source}.
+                * Adds a {@link Filter} as a recipient for the last added source.
                 *
                 * @param sink
                 *              The sink to add
                 * @return This builder
-                * @throws IllegalStateException
-                *              if the last added {@link Sink} was not also a {@link Source}
                 */
-               public Builder to(Sink sink) {
-                       Preconditions.checkState(lastSource != null, "last added Sink was not a Source");
+               public Builder to(Filter sink) {
                        nextSinks.put(lastSource, sink);
-                       lastSource = (sink instanceof Filter) ? (Source) sink : null;
+                       lastSource = sink;
                        return this;
                }
 
                /**
                 * Locates the given source and sets it as the last added node so that the
-                * next invocation of {@link #to(Sink)} can “fork” the pipeline.
+                * next invocation of {@link #to(Filter)} can “fork” the pipeline.
                 *
                 * @param source
                 *              The source to locate
@@ -211,7 +284,7 @@ public class Pipeline {
                 * @throws IllegalStateException
                 *              if the given source was not previously added as a sink
                 */
-               public Builder find(Source source) {
+               public Builder find(Filter source) {
                        Preconditions.checkState(nextSinks.containsValue(source));
                        lastSource = source;
                        return this;
@@ -229,19 +302,19 @@ public class Pipeline {
        }
 
        /**
-        * A feeder is responsible for streaming audio from one {@link Source} to an
-        * arbitrary number of {@link Sink}s it is connected to. A feeder is started by
-        * creating a {@link Thread} wrapping it and starting said thread.
+        * A connection is responsible for streaming audio from one {@link Filter} to
+        * an arbitrary number of {@link Filter}s it is connected to. A connection is
+        * started by creating a {@link Thread} wrapping it and starting said thread.
         *
         * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
         */
-       private class Feeder implements Runnable {
+       public class Connection implements Runnable {
 
                /** The source. */
-               private final Source source;
+               private final Filter source;
 
-               /** The sinks. */
-               private final Collection<Sink> sinks;
+               /** The filters. */
+               private final Collection<Filter> sinks;
 
                /** Whether the feeder was stopped. */
                private final AtomicBoolean stopped = new AtomicBoolean(false);
@@ -249,18 +322,24 @@ public class Pipeline {
                /** The executor service. */
                private final ExecutorService executorService;
 
+               /** The time the connection was started. */
+               private long startTime;
+
+               /** The number of copied bytes. */
+               private long counter;
+
                /**
-                * Creates a new feeder.
+                * Creates a new connection.
                 *
                 * @param source
                 *              The source of the stream
                 * @param sinks
-                *              The sinks to which to stream
+                *              The filters to which to stream
                 */
-               public Feeder(Source source, Collection<Sink> sinks) {
+               public Connection(Filter source, Collection<Filter> sinks) {
                        this.source = source;
                        this.sinks = sinks;
-                       if (sinks.size() == 1) {
+                       if (sinks.size() < 2) {
                                executorService = MoreExecutors.sameThreadExecutor();
                        } else {
                                executorService = Executors.newCachedThreadPool();
@@ -268,10 +347,34 @@ public class Pipeline {
                }
 
                //
+               // ACCESSORS
+               //
+
+               /**
+                * Returns the time this connection was started.
+                *
+                * @return The time this connection was started (in milliseconds since Jan 1,
+                *         1970 UTC)
+                */
+               public long startTime() {
+                       return startTime;
+               }
+
+               /**
+                * Returns the number of bytes that this connection has received from its
+                * source during its lifetime.
+                *
+                * @return The number of processed input bytes
+                */
+               public long counter() {
+                       return counter;
+               }
+
+               //
                // ACTIONS
                //
 
-               /** Stops this feeder. */
+               /** Stops this connection. */
                public void stop() {
                        stopped.set(true);
                }
@@ -282,36 +385,31 @@ public class Pipeline {
 
                @Override
                public void run() {
-                       Metadata firstMetadata = null;
+                       startTime = System.currentTimeMillis();
                        while (!stopped.get()) {
                                try {
-                                       final Metadata lastMetadata = firstMetadata;
-                                       final Metadata metadata = firstMetadata = source.metadata();
                                        final byte[] buffer;
                                        try {
-                                               logger.finest(String.format("Getting %d bytes from %s...", 4096, source));
+                                               logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name()));
                                                buffer = source.get(4096);
-                                               logger.finest(String.format("Got %d bytes from %s.", buffer.length, source));
+                                               logger.finest(String.format("Got %d bytes from %s.", buffer.length, source.name()));
                                        } catch (IOException ioe1) {
-                                               throw new IOException(String.format("I/O error while reading from %s.", source), ioe1);
+                                               throw new IOException(String.format("I/O error while reading from %s.", source.name()), ioe1);
                                        }
-                                       List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Sink, Callable<Void>>() {
+                                       List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Filter, Callable<Void>>() {
 
                                                @Override
-                                               public Callable<Void> apply(final Sink sink) {
+                                               public Callable<Void> apply(final Filter sink) {
                                                        return new Callable<Void>() {
 
                                                                @Override
                                                                public Void call() throws Exception {
-                                                                       if (!metadata.equals(lastMetadata)) {
-                                                                               sink.metadataUpdated(metadata);
-                                                                       }
                                                                        try {
-                                                                               logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink));
+                                                                               logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink.name()));
                                                                                sink.process(buffer);
-                                                                               logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink));
+                                                                               logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink.name()));
                                                                        } catch (IOException ioe1) {
-                                                                               throw new IOException(String.format("I/O error while writing to %s", sink), ioe1);
+                                                                               throw new IOException(String.format("I/O error while writing to %s", sink.name()), ioe1);
                                                                        }
                                                                        return null;
                                                                }
@@ -322,6 +420,7 @@ public class Pipeline {
                                        for (Future<Void> future : futures) {
                                                future.get();
                                        }
+                                       counter += buffer.length;
                                } catch (IOException e) {
                                        /* TODO */
                                        e.printStackTrace();
@@ -340,4 +439,58 @@ public class Pipeline {
 
        }
 
+       /**
+        * Container for input and output counters.
+        *
+        * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+        */
+       public static class TrafficCounter {
+
+               /** The number of input bytes. */
+               private final long input;
+
+               /** The number of output bytes. */
+               private final long output;
+
+               /**
+                * Creates a new traffic counter.
+                *
+                * @param input
+                *              The number of input bytes (may be {@code -1} to signify non-available
+                *              input)
+                * @param output
+                *              The number of output bytes (may be {@code -1} to signify non-available
+                *              output)
+                */
+               public TrafficCounter(long input, long output) {
+                       this.input = input;
+                       this.output = output;
+               }
+
+               //
+               // ACCESSORS
+               //
+
+               /**
+                * Returns the number of input bytes.
+                *
+                * @return The number of input bytes, or {@link Optional#absent()} if the
+                *         filter did not receive input
+                */
+               public Optional<Long> input() {
+                       return (input == -1) ? Optional.<Long>absent() : Optional.of(input);
+               }
+
+               /**
+                * Returns the number of output bytes.
+                *
+                * @return The number of output bytes, or {@link Optional#absent()} if the
+                *         filter did not send output
+                */
+               public Optional<Long> output() {
+                       return (output == -1) ? Optional.<Long>absent() : Optional.of(output);
+               }
+
+       }
+
 }