import java.io.IOException;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.MoreExecutors;
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
-public class Pipeline {
+public class Pipeline implements Iterable<Controlled> {
/** The logger. */
private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
/** The sinks for each source. */
private final Multimap<Source, Sink> sinks;
- /** All started feeders. */
- private final List<Feeder> feeders = Lists.newArrayList();
+ /** All started connections. */
+ private final List<Connection> connections = Lists.newArrayList();
/**
* Creates a new pipeline.
* if the pipeline is already running
*/
public void start() throws IOException, IllegalStateException {
- if (!feeders.isEmpty()) {
+ if (!connections.isEmpty()) {
throw new IllegalStateException("Pipeline is already running!");
}
List<Source> sources = Lists.newArrayList();
while (!sources.isEmpty()) {
Source source = sources.remove(0);
Collection<Sink> sinks = this.sinks.get(source);
- feeders.add(new Feeder(source, sinks));
+ connections.add(new Connection(source, sinks));
for (Sink sink : sinks) {
sink.open(source.metadata());
if (sink instanceof Filter) {
}
}
}
- for (Feeder feeder : feeders) {
- logger.info(String.format("Starting Feeder from %s to %s.", feeder.source, feeder.sinks));
- new Thread(feeder).start();
+ for (Connection connection : connections) {
+ logger.info(String.format("Starting Connection from %s to %s.", connection.source, connection.sinks));
+ new Thread(connection).start();
}
}
public void stop() {
- if (!feeders.isEmpty()) {
+ if (!connections.isEmpty()) {
/* pipeline is not running. */
return;
}
- for (Feeder feeder : feeders) {
- feeder.stop();
+ for (Connection connection : connections) {
+ connection.stop();
}
}
//
+ // ITERABLE METHODS
+ //
+
+ @Override
+ public Iterator<Controlled> iterator() {
+ return ImmutableSet.<Controlled>builder().add(source).addAll(sinks.values()).build().iterator();
+ }
+
+ //
// STATIC METHODS
//
}
/**
- * A feeder is responsible for streaming audio from one {@link Source} to an
- * arbitrary number of {@link Sink}s it is connected to. A feeder is started by
- * creating a {@link Thread} wrapping it and starting said thread.
+ * A connection is responsible for streaming audio from one {@link Source} to
+ * an arbitrary number of {@link Sink}s it is connected to. A connection is
+ * started by creating a {@link Thread} wrapping it and starting said thread.
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
- private class Feeder implements Runnable {
+ public class Connection implements Runnable {
/** The source. */
private final Source source;
/** The executor service. */
private final ExecutorService executorService;
+ /** The number of copied bytes. */
+ private long counter;
+
/**
- * Creates a new feeder.
+ * Creates a new connection.
*
* @param source
* The source of the stream
* @param sinks
* The sinks to which to stream
*/
- public Feeder(Source source, Collection<Sink> sinks) {
+ public Connection(Source source, Collection<Sink> sinks) {
this.source = source;
this.sinks = sinks;
if (sinks.size() == 1) {
}
//
+ // ACCESSORS
+ //
+
+ /**
+ * Returns the number of bytes that this connection has received from its
+ * source during its lifetime.
+ *
+ * @return The number of processed input bytes
+ */
+ public long counter() {
+ return counter;
+ }
+
+ //
// ACTIONS
//
- /** Stops this feeder. */
+ /** Stops this connection. */
public void stop() {
stopped.set(true);
}
for (Future<Void> future : futures) {
future.get();
}
+ counter += buffer.length;
} catch (IOException e) {
/* TODO */
e.printStackTrace();