Pull all interfaces into a single interface: Filter.
[sonitus.git] / src / main / java / net / pterodactylus / sonitus / data / Pipeline.java
index ab6500e..5bbcd63 100644 (file)
@@ -36,26 +36,27 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.MoreExecutors;
 
 /**
- * A pipeline is responsible for streaming audio data from a {@link Source} to
- * an arbitrary number of connected {@link Filter}s and {@link Sink}s.
+ * A pipeline is responsible for streaming audio data from a {@link Filter} to
+ * an arbitrary number of connected {@link Filter}s.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
-public class Pipeline implements Iterable<ControlledComponent> {
+public class Pipeline implements Iterable<Filter> {
 
        /** The logger. */
        private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
 
        /** The source of the audio stream. */
-       private final Source source;
+       private final Filter source;
 
-       /** The sinks for each source. */
-       private final Multimap<Source, Sink> sinks;
+       /** The filters for each source. */
+       private final ListMultimap<Filter, Filter> filters;
 
        /** All started connections. */
        private final List<Connection> connections = Lists.newArrayList();
@@ -65,23 +66,21 @@ public class Pipeline implements Iterable<ControlledComponent> {
         *
         * @param source
         *              The source of the audio stream
-        * @param sinks
-        *              The sinks for each source
+        * @param filters
+        *              The filters for each source
         */
-       private Pipeline(Source source, Multimap<Source, Sink> sinks) {
+       private Pipeline(Filter source, Multimap<Filter, Filter> filters) {
                this.source = Preconditions.checkNotNull(source, "source must not be null");
-               this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
-               for (ControlledComponent component : Lists.reverse(components())) {
-                       logger.finest(String.format("Adding Listener to %s.", component.name()));
-                       component.addMetadataListener(new MetadataListener() {
+               this.filters = ArrayListMultimap.create(Preconditions.checkNotNull(filters, "filters must not be null"));
+               for (Filter filter : Lists.reverse(filters())) {
+                       logger.finest(String.format("Adding Listener to %s.", filter.name()));
+                       filter.addMetadataListener(new MetadataListener() {
+
                                @Override
-                               public void metadataUpdated(ControlledComponent component, Metadata metadata) {
-                                       if (!(component instanceof Source)) {
-                                               return;
-                                       }
-                                       for (ControlledComponent controlledComponent : sinks((Source) component)) {
-                                               logger.fine(String.format("Updating Metadata from %s to %s as %s.", component.name(), controlledComponent.name(), metadata));
-                                               controlledComponent.metadataUpdated(metadata);
+                               public void metadataUpdated(Filter filter, Metadata metadata) {
+                                       for (Filter sinks : filters(filter)) {
+                                               logger.fine(String.format("Updating Metadata from %s to %s as %s.", filter.name(), sinks.name(), metadata));
+                                               sinks.metadataUpdated(metadata);
                                        }
                                }
                        });
@@ -97,38 +96,37 @@ public class Pipeline implements Iterable<ControlledComponent> {
         *
         * @return This pipeline’s source
         */
-       public Source source() {
+       public Filter source() {
                return source;
        }
 
        /**
-        * Returns all {@link Sink}s (or {@link Filter}s, really) that are connected to
-        * the given source.
+        * Returns all {@link Filter}s that are connected to the given filter.
         *
-        * @param source
-        *              The source to get the sinks for
-        * @return The sinks connected to the given source, or an empty list if the
-        *         source does not exist in this pipeline
+        * @param filter
+        *              The filter to get the connected filters for
+        * @return The filters connected to the given filter, or an empty list if the
+        *         filter does not exist in this pipeline, or is not connected to any filters
         */
-       public Collection<Sink> sinks(Source source) {
-               return sinks.get(source);
+       public List<Filter> filters(Filter filter) {
+               return filters.get(filter);
        }
 
        /**
-        * Returns the traffic counters of the given controlled component.
+        * Returns the traffic counters of the given filter.
         *
-        * @param controlledComponent
-        *              The controlled component to get the traffic counters for
-        * @return The traffic counters for the given controlled component
+        * @param filter
+        *              The filter to get the traffic counters for
+        * @return The traffic counters for the given filter
         */
-       public TrafficCounter trafficCounter(ControlledComponent controlledComponent) {
+       public TrafficCounter trafficCounter(Filter filter) {
                long input = -1;
                long output = -1;
                for (Connection connection : connections) {
                        /* the connection where the source matches knows the output. */
-                       if (connection.source.equals(controlledComponent)) {
+                       if (connection.source.equals(filter)) {
                                output = connection.counter();
-                       } else if (connection.sinks.contains(controlledComponent)) {
+                       } else if (connection.sinks.contains(filter)) {
                                input = connection.counter();
                        }
                }
@@ -143,7 +141,7 @@ public class Pipeline implements Iterable<ControlledComponent> {
         * Starts the pipeline.
         *
         * @throws IOException
-        *              if any of the sinks can not be opened
+        *              if any of the filters can not be opened
         * @throws IllegalStateException
         *              if the pipeline is already running
         */
@@ -151,26 +149,24 @@ public class Pipeline implements Iterable<ControlledComponent> {
                if (!connections.isEmpty()) {
                        throw new IllegalStateException("Pipeline is already running!");
                }
-               List<Source> sources = Lists.newArrayList();
-               sources.add(source);
+               List<Filter> filters = Lists.newArrayList();
+               filters.add(source);
                /* collect all source->sink pairs. */
-               while (!sources.isEmpty()) {
-                       Source source = sources.remove(0);
-                       Collection<Sink> sinks = this.sinks.get(source);
-                       connections.add(new Connection(source, sinks));
-                       for (Sink sink : sinks) {
+               while (!filters.isEmpty()) {
+                       Filter filter = filters.remove(0);
+                       Collection<Filter> sinks = this.filters.get(filter);
+                       connections.add(new Connection(filter, sinks));
+                       for (Filter sink : sinks) {
                                logger.info(String.format("Opening %s with %s...", sink.name(), source.metadata()));
-                               sink.open(source.metadata());
-                               if (sink instanceof Filter) {
-                                       sources.add((Source) sink);
-                               }
+                               sink.open(filter.metadata());
+                               filters.add(sink);
                        }
                }
                for (Connection connection : connections) {
-                       String threadName = String.format("%s → %s.", connection.source.name(), FluentIterable.from(connection.sinks).transform(new Function<Sink, String>() {
+                       String threadName = String.format("%s → %s.", connection.source.name(), FluentIterable.from(connection.sinks).transform(new Function<Filter, String>() {
 
                                @Override
-                               public String apply(Sink sink) {
+                               public String apply(Filter sink) {
                                        return sink.name();
                                }
                        }));
@@ -194,8 +190,8 @@ public class Pipeline implements Iterable<ControlledComponent> {
        //
 
        @Override
-       public Iterator<ControlledComponent> iterator() {
-               return components().iterator();
+       public Iterator<Filter> iterator() {
+               return filters().iterator();
        }
 
        //
@@ -203,26 +199,24 @@ public class Pipeline implements Iterable<ControlledComponent> {
        //
 
        /**
-        * Returns all components of this pipeline, listed breadth-first, starting with
+        * Returns all filters of this pipeline, listed breadth-first, starting with
         * the source.
         *
-        * @return All components of this pipeline
+        * @return All filters of this pipeline
         */
-       public List<ControlledComponent> components() {
-               ImmutableList.Builder<ControlledComponent> components = ImmutableList.builder();
-               List<ControlledComponent> currentComponents = Lists.newArrayList();
-               components.add(source);
-               currentComponents.add(source);
-               while (!currentComponents.isEmpty()) {
-                       Collection<Sink> sinks = this.sinks((Source) currentComponents.remove(0));
-                       for (Sink sink : sinks) {
-                               components.add(sink);
-                               if (sink instanceof Source) {
-                                       currentComponents.add(sink);
-                               }
+       public List<Filter> filters() {
+               ImmutableList.Builder<Filter> filters = ImmutableList.builder();
+               List<Filter> remainingFilters = Lists.newArrayList();
+               filters.add(source);
+               remainingFilters.add(source);
+               while (!remainingFilters.isEmpty()) {
+                       Collection<Filter> sinks = this.filters(remainingFilters.remove(0));
+                       for (Filter sink : sinks) {
+                               filters.add(sink);
+                               remainingFilters.add(sink);
                        }
                }
-               return components.build();
+               return filters.build();
        }
 
        //
@@ -236,7 +230,7 @@ public class Pipeline implements Iterable<ControlledComponent> {
         *              The source at which to start
         * @return A builder for a new pipeline
         */
-       public static Builder builder(Source source) {
+       public static Builder builder(Filter source) {
                return new Builder(source);
        }
 
@@ -248,13 +242,13 @@ public class Pipeline implements Iterable<ControlledComponent> {
        public static class Builder {
 
                /** The source of the pipeline. */
-               private final Source source;
+               private final Filter source;
 
-               /** The sinks to which each source streams. */
-               private Multimap<Source, Sink> nextSinks = ArrayListMultimap.create();
+               /** The filters to which each source streams. */
+               private Multimap<Filter, Filter> nextSinks = ArrayListMultimap.create();
 
                /** The last added source. */
-               private Source lastSource;
+               private Filter lastSource;
 
                /**
                 * Creates a new builder.
@@ -262,31 +256,27 @@ public class Pipeline implements Iterable<ControlledComponent> {
                 * @param source
                 *              The source that starts the pipeline
                 */
-               private Builder(Source source) {
+               private Builder(Filter source) {
                        this.source = source;
                        lastSource = source;
                }
 
                /**
-                * Adds a {@link Sink} (or {@link Filter} as a recipient for the last added
-                * {@link Source}.
+                * Adds a {@link Filter} as a recipient for the last added source.
                 *
                 * @param sink
                 *              The sink to add
                 * @return This builder
-                * @throws IllegalStateException
-                *              if the last added {@link Sink} was not also a {@link Source}
                 */
-               public Builder to(Sink sink) {
-                       Preconditions.checkState(lastSource != null, "last added Sink was not a Source");
+               public Builder to(Filter sink) {
                        nextSinks.put(lastSource, sink);
-                       lastSource = (sink instanceof Filter) ? (Source) sink : null;
+                       lastSource = sink;
                        return this;
                }
 
                /**
                 * Locates the given source and sets it as the last added node so that the
-                * next invocation of {@link #to(Sink)} can “fork” the pipeline.
+                * next invocation of {@link #to(Filter)} can “fork” the pipeline.
                 *
                 * @param source
                 *              The source to locate
@@ -294,7 +284,7 @@ public class Pipeline implements Iterable<ControlledComponent> {
                 * @throws IllegalStateException
                 *              if the given source was not previously added as a sink
                 */
-               public Builder find(Source source) {
+               public Builder find(Filter source) {
                        Preconditions.checkState(nextSinks.containsValue(source));
                        lastSource = source;
                        return this;
@@ -312,8 +302,8 @@ public class Pipeline implements Iterable<ControlledComponent> {
        }
 
        /**
-        * A connection is responsible for streaming audio from one {@link Source} to
-        * an arbitrary number of {@link Sink}s it is connected to. A connection is
+        * A connection is responsible for streaming audio from one {@link Filter} to
+        * an arbitrary number of {@link Filter}s it is connected to. A connection is
         * started by creating a {@link Thread} wrapping it and starting said thread.
         *
         * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
@@ -321,10 +311,10 @@ public class Pipeline implements Iterable<ControlledComponent> {
        public class Connection implements Runnable {
 
                /** The source. */
-               private final Source source;
+               private final Filter source;
 
-               /** The sinks. */
-               private final Collection<Sink> sinks;
+               /** The filters. */
+               private final Collection<Filter> sinks;
 
                /** Whether the feeder was stopped. */
                private final AtomicBoolean stopped = new AtomicBoolean(false);
@@ -344,9 +334,9 @@ public class Pipeline implements Iterable<ControlledComponent> {
                 * @param source
                 *              The source of the stream
                 * @param sinks
-                *              The sinks to which to stream
+                *              The filters to which to stream
                 */
-               public Connection(Source source, Collection<Sink> sinks) {
+               public Connection(Filter source, Collection<Filter> sinks) {
                        this.source = source;
                        this.sinks = sinks;
                        if (sinks.size() < 2) {
@@ -406,10 +396,10 @@ public class Pipeline implements Iterable<ControlledComponent> {
                                        } catch (IOException ioe1) {
                                                throw new IOException(String.format("I/O error while reading from %s.", source.name()), ioe1);
                                        }
-                                       List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Sink, Callable<Void>>() {
+                                       List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Filter, Callable<Void>>() {
 
                                                @Override
-                                               public Callable<Void> apply(final Sink sink) {
+                                               public Callable<Void> apply(final Filter sink) {
                                                        return new Callable<Void>() {
 
                                                                @Override
@@ -485,7 +475,7 @@ public class Pipeline implements Iterable<ControlledComponent> {
                 * Returns the number of input bytes.
                 *
                 * @return The number of input bytes, or {@link Optional#absent()} if the
-                *         component can not receive input
+                *         filter did not receive input
                 */
                public Optional<Long> input() {
                        return (input == -1) ? Optional.<Long>absent() : Optional.of(input);
@@ -495,7 +485,7 @@ public class Pipeline implements Iterable<ControlledComponent> {
                 * Returns the number of output bytes.
                 *
                 * @return The number of output bytes, or {@link Optional#absent()} if the
-                *         component can not send output
+                *         filter did not send output
                 */
                public Optional<Long> output() {
                        return (output == -1) ? Optional.<Long>absent() : Optional.of(output);