X-Git-Url: https://git.pterodactylus.net/?p=sonitus.git;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Fsonitus%2Fdata%2FPipeline.java;h=a38d6fe62d1e2f7ae3a73066f613740b8e89df38;hp=4e6a457ad8c87704fbb4e4489e6170ad04e132d1;hb=6ff6dcb1223b2c948b2456b5d4e63c9a8ee0971d;hpb=0b264f13b26f68612de49e70010f41f50b6f7101 diff --git a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java index 4e6a457..a38d6fe 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java +++ b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java @@ -19,6 +19,7 @@ package net.pterodactylus.sonitus.data; import java.io.IOException; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -33,6 +34,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.MoreExecutors; @@ -43,7 +45,7 @@ import com.google.common.util.concurrent.MoreExecutors; * * @author David ‘Bombe’ Roden */ -public class Pipeline { +public class Pipeline implements Iterable { /** The logger. */ private static final Logger logger = Logger.getLogger(Pipeline.class.getName()); @@ -54,8 +56,8 @@ public class Pipeline { /** The sinks for each source. */ private final Multimap sinks; - /** All started feeders. */ - private final List feeders = Lists.newArrayList(); + /** All started connections. */ + private final List connections = Lists.newArrayList(); /** * Creates a new pipeline. @@ -71,6 +73,32 @@ public class Pipeline { } // + // ACCESSORS + // + + /** + * Expose this pipeline’s source. + * + * @return This pipeline’s source + */ + public Source source() { + return source; + } + + /** + * Returns all {@link Sink}s (or {@link Filter}s, really) that are connected to + * the given source. + * + * @param source + * The source to get the sinks for + * @return The sinks connected to the given source, or an empty list if the + * source does not exist in this pipeline + */ + public Collection sinks(Source source) { + return sinks.get(source); + } + + // // ACTIONS // @@ -83,7 +111,7 @@ public class Pipeline { * if the pipeline is already running */ public void start() throws IOException, IllegalStateException { - if (!feeders.isEmpty()) { + if (!connections.isEmpty()) { throw new IllegalStateException("Pipeline is already running!"); } List sources = Lists.newArrayList(); @@ -92,7 +120,7 @@ public class Pipeline { while (!sources.isEmpty()) { Source source = sources.remove(0); Collection sinks = this.sinks.get(source); - feeders.add(new Feeder(source, sinks)); + connections.add(new Connection(source, sinks)); for (Sink sink : sinks) { sink.open(source.metadata()); if (sink instanceof Filter) { @@ -100,23 +128,32 @@ public class Pipeline { } } } - for (Feeder feeder : feeders) { - logger.info(String.format("Starting Feeder from %s to %s.", feeder.source, feeder.sinks)); - new Thread(feeder).start(); + for (Connection connection : connections) { + logger.info(String.format("Starting Connection from %s to %s.", connection.source, connection.sinks)); + new Thread(connection).start(); } } public void stop() { - if (!feeders.isEmpty()) { + if (!connections.isEmpty()) { /* pipeline is not running. */ return; } - for (Feeder feeder : feeders) { - feeder.stop(); + for (Connection connection : connections) { + connection.stop(); } } // + // ITERABLE METHODS + // + + @Override + public Iterator iterator() { + return ImmutableSet.builder().add(source).addAll(sinks.values()).build().iterator(); + } + + // // STATIC METHODS // @@ -203,13 +240,13 @@ public class Pipeline { } /** - * A feeder is responsible for streaming audio from one {@link Source} to an - * arbitrary number of {@link Sink}s it is connected to. A feeder is started by - * creating a {@link Thread} wrapping it and starting said thread. + * A connection is responsible for streaming audio from one {@link Source} to + * an arbitrary number of {@link Sink}s it is connected to. A connection is + * started by creating a {@link Thread} wrapping it and starting said thread. * * @author David ‘Bombe’ Roden */ - private class Feeder implements Runnable { + public class Connection implements Runnable { /** The source. */ private final Source source; @@ -224,14 +261,14 @@ public class Pipeline { private final ExecutorService executorService; /** - * Creates a new feeder. + * Creates a new connection. * * @param source * The source of the stream * @param sinks * The sinks to which to stream */ - public Feeder(Source source, Collection sinks) { + public Connection(Source source, Collection sinks) { this.source = source; this.sinks = sinks; if (sinks.size() == 1) { @@ -245,7 +282,7 @@ public class Pipeline { // ACTIONS // - /** Stops this feeder. */ + /** Stops this connection. */ public void stop() { stopped.set(true); }