X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;ds=inline;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Fsonitus%2Fdata%2FPipeline.java;h=5bbcd636091231c4b64c74b528eaeacde258f21d;hb=633a841142f978235ed9f745b6ba16c278963e62;hp=c24502fcd1b4f05ac513650dc592852fbe72b035;hpb=cbeadf6d9eea57ab98cacd60e2419dd3c18bef89;p=sonitus.git
diff --git a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java
index c24502f..5bbcd63 100644
--- a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java
+++ b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java
@@ -34,28 +34,29 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.MoreExecutors;
/**
- * A pipeline is responsible for streaming audio data from a {@link Source} to
- * an arbitrary number of connected {@link Filter}s and {@link Sink}s.
+ * A pipeline is responsible for streaming audio data from a {@link Filter} to
+ * an arbitrary number of connected {@link Filter}s.
*
* @author David âBombeâ Roden
*/
-public class Pipeline implements Iterable {
+public class Pipeline implements Iterable {
/** The logger. */
private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
/** The source of the audio stream. */
- private final Source source;
+ private final Filter source;
- /** The sinks for each source. */
- private final Multimap sinks;
+ /** The filters for each source. */
+ private final ListMultimap filters;
/** All started connections. */
private final List connections = Lists.newArrayList();
@@ -65,12 +66,25 @@ public class Pipeline implements Iterable {
*
* @param source
* The source of the audio stream
- * @param sinks
- * The sinks for each source
+ * @param filters
+ * The filters for each source
*/
- private Pipeline(Source source, Multimap sinks) {
+ private Pipeline(Filter source, Multimap filters) {
this.source = Preconditions.checkNotNull(source, "source must not be null");
- this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
+ this.filters = ArrayListMultimap.create(Preconditions.checkNotNull(filters, "filters must not be null"));
+ for (Filter filter : Lists.reverse(filters())) {
+ logger.finest(String.format("Adding Listener to %s.", filter.name()));
+ filter.addMetadataListener(new MetadataListener() {
+
+ @Override
+ public void metadataUpdated(Filter filter, Metadata metadata) {
+ for (Filter sinks : filters(filter)) {
+ logger.fine(String.format("Updating Metadata from %s to %s as %s.", filter.name(), sinks.name(), metadata));
+ sinks.metadataUpdated(metadata);
+ }
+ }
+ });
+ }
}
//
@@ -82,38 +96,37 @@ public class Pipeline implements Iterable {
*
* @return This pipelineâs source
*/
- public Source source() {
+ public Filter source() {
return source;
}
/**
- * Returns all {@link Sink}s (or {@link Filter}s, really) that are connected to
- * the given source.
+ * Returns all {@link Filter}s that are connected to the given filter.
*
- * @param source
- * The source to get the sinks for
- * @return The sinks connected to the given source, or an empty list if the
- * source does not exist in this pipeline
+ * @param filter
+ * The filter to get the connected filters for
+ * @return The filters connected to the given filter, or an empty list if the
+ * filter does not exist in this pipeline, or is not connected to any filters
*/
- public Collection sinks(Source source) {
- return sinks.get(source);
+ public List filters(Filter filter) {
+ return filters.get(filter);
}
/**
- * Returns the traffic counters of the given controlled component.
+ * Returns the traffic counters of the given filter.
*
- * @param controlledComponent
- * The controlled component to get the traffic counters for
- * @return The traffic counters for the given controlled component
+ * @param filter
+ * The filter to get the traffic counters for
+ * @return The traffic counters for the given filter
*/
- public TrafficCounter trafficCounter(ControlledComponent controlledComponent) {
+ public TrafficCounter trafficCounter(Filter filter) {
long input = -1;
long output = -1;
for (Connection connection : connections) {
/* the connection where the source matches knows the output. */
- if (connection.source.equals(controlledComponent)) {
+ if (connection.source.equals(filter)) {
output = connection.counter();
- } else if (connection.sinks.contains(controlledComponent)) {
+ } else if (connection.sinks.contains(filter)) {
input = connection.counter();
}
}
@@ -128,7 +141,7 @@ public class Pipeline implements Iterable {
* Starts the pipeline.
*
* @throws IOException
- * if any of the sinks can not be opened
+ * if any of the filters can not be opened
* @throws IllegalStateException
* if the pipeline is already running
*/
@@ -136,23 +149,29 @@ public class Pipeline implements Iterable {
if (!connections.isEmpty()) {
throw new IllegalStateException("Pipeline is already running!");
}
- List sources = Lists.newArrayList();
- sources.add(source);
+ List filters = Lists.newArrayList();
+ filters.add(source);
/* collect all source->sink pairs. */
- while (!sources.isEmpty()) {
- Source source = sources.remove(0);
- Collection sinks = this.sinks.get(source);
- connections.add(new Connection(source, sinks));
- for (Sink sink : sinks) {
- sink.open(source.metadata());
- if (sink instanceof Filter) {
- sources.add((Source) sink);
- }
+ while (!filters.isEmpty()) {
+ Filter filter = filters.remove(0);
+ Collection sinks = this.filters.get(filter);
+ connections.add(new Connection(filter, sinks));
+ for (Filter sink : sinks) {
+ logger.info(String.format("Opening %s with %s...", sink.name(), source.metadata()));
+ sink.open(filter.metadata());
+ filters.add(sink);
}
}
for (Connection connection : connections) {
- logger.info(String.format("Starting Connection from %s to %s.", connection.source, connection.sinks));
- new Thread(connection).start();
+ String threadName = String.format("%s â %s.", connection.source.name(), FluentIterable.from(connection.sinks).transform(new Function() {
+
+ @Override
+ public String apply(Filter sink) {
+ return sink.name();
+ }
+ }));
+ logger.info(String.format("Starting Thread: %s", threadName));
+ new Thread(connection, threadName).start();
}
}
@@ -171,8 +190,33 @@ public class Pipeline implements Iterable {
//
@Override
- public Iterator iterator() {
- return ImmutableSet.builder().add(source).addAll(sinks.values()).build().iterator();
+ public Iterator iterator() {
+ return filters().iterator();
+ }
+
+ //
+ // PRIVATE METHODS
+ //
+
+ /**
+ * Returns all filters of this pipeline, listed breadth-first, starting with
+ * the source.
+ *
+ * @return All filters of this pipeline
+ */
+ public List filters() {
+ ImmutableList.Builder filters = ImmutableList.builder();
+ List remainingFilters = Lists.newArrayList();
+ filters.add(source);
+ remainingFilters.add(source);
+ while (!remainingFilters.isEmpty()) {
+ Collection sinks = this.filters(remainingFilters.remove(0));
+ for (Filter sink : sinks) {
+ filters.add(sink);
+ remainingFilters.add(sink);
+ }
+ }
+ return filters.build();
}
//
@@ -186,7 +230,7 @@ public class Pipeline implements Iterable {
* The source at which to start
* @return A builder for a new pipeline
*/
- public static Builder builder(Source source) {
+ public static Builder builder(Filter source) {
return new Builder(source);
}
@@ -198,13 +242,13 @@ public class Pipeline implements Iterable {
public static class Builder {
/** The source of the pipeline. */
- private final Source source;
+ private final Filter source;
- /** The sinks to which each source streams. */
- private Multimap nextSinks = ArrayListMultimap.create();
+ /** The filters to which each source streams. */
+ private Multimap nextSinks = ArrayListMultimap.create();
/** The last added source. */
- private Source lastSource;
+ private Filter lastSource;
/**
* Creates a new builder.
@@ -212,31 +256,27 @@ public class Pipeline implements Iterable {
* @param source
* The source that starts the pipeline
*/
- private Builder(Source source) {
+ private Builder(Filter source) {
this.source = source;
lastSource = source;
}
/**
- * Adds a {@link Sink} (or {@link Filter} as a recipient for the last added
- * {@link Source}.
+ * Adds a {@link Filter} as a recipient for the last added source.
*
* @param sink
* The sink to add
* @return This builder
- * @throws IllegalStateException
- * if the last added {@link Sink} was not also a {@link Source}
*/
- public Builder to(Sink sink) {
- Preconditions.checkState(lastSource != null, "last added Sink was not a Source");
+ public Builder to(Filter sink) {
nextSinks.put(lastSource, sink);
- lastSource = (sink instanceof Filter) ? (Source) sink : null;
+ lastSource = sink;
return this;
}
/**
* Locates the given source and sets it as the last added node so that the
- * next invocation of {@link #to(Sink)} can âforkâ the pipeline.
+ * next invocation of {@link #to(Filter)} can âforkâ the pipeline.
*
* @param source
* The source to locate
@@ -244,7 +284,7 @@ public class Pipeline implements Iterable {
* @throws IllegalStateException
* if the given source was not previously added as a sink
*/
- public Builder find(Source source) {
+ public Builder find(Filter source) {
Preconditions.checkState(nextSinks.containsValue(source));
lastSource = source;
return this;
@@ -262,8 +302,8 @@ public class Pipeline implements Iterable {
}
/**
- * A connection is responsible for streaming audio from one {@link Source} to
- * an arbitrary number of {@link Sink}s it is connected to. A connection is
+ * A connection is responsible for streaming audio from one {@link Filter} to
+ * an arbitrary number of {@link Filter}s it is connected to. A connection is
* started by creating a {@link Thread} wrapping it and starting said thread.
*
* @author David âBombeâ Roden
@@ -271,10 +311,10 @@ public class Pipeline implements Iterable {
public class Connection implements Runnable {
/** The source. */
- private final Source source;
+ private final Filter source;
- /** The sinks. */
- private final Collection sinks;
+ /** The filters. */
+ private final Collection sinks;
/** Whether the feeder was stopped. */
private final AtomicBoolean stopped = new AtomicBoolean(false);
@@ -282,6 +322,9 @@ public class Pipeline implements Iterable {
/** The executor service. */
private final ExecutorService executorService;
+ /** The time the connection was started. */
+ private long startTime;
+
/** The number of copied bytes. */
private long counter;
@@ -291,12 +334,12 @@ public class Pipeline implements Iterable {
* @param source
* The source of the stream
* @param sinks
- * The sinks to which to stream
+ * The filters to which to stream
*/
- public Connection(Source source, Collection sinks) {
+ public Connection(Filter source, Collection sinks) {
this.source = source;
this.sinks = sinks;
- if (sinks.size() == 1) {
+ if (sinks.size() < 2) {
executorService = MoreExecutors.sameThreadExecutor();
} else {
executorService = Executors.newCachedThreadPool();
@@ -308,6 +351,16 @@ public class Pipeline implements Iterable {
//
/**
+ * Returns the time this connection was started.
+ *
+ * @return The time this connection was started (in milliseconds since Jan 1,
+ * 1970 UTC)
+ */
+ public long startTime() {
+ return startTime;
+ }
+
+ /**
* Returns the number of bytes that this connection has received from its
* source during its lifetime.
*
@@ -332,31 +385,31 @@ public class Pipeline implements Iterable {
@Override
public void run() {
- Metadata firstMetadata = null;
+ startTime = System.currentTimeMillis();
while (!stopped.get()) {
try {
final byte[] buffer;
try {
- logger.finest(String.format("Getting %d bytes from %s...", 4096, source));
+ logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name()));
buffer = source.get(4096);
- logger.finest(String.format("Got %d bytes from %s.", buffer.length, source));
+ logger.finest(String.format("Got %d bytes from %s.", buffer.length, source.name()));
} catch (IOException ioe1) {
- throw new IOException(String.format("I/O error while reading from %s.", source), ioe1);
+ throw new IOException(String.format("I/O error while reading from %s.", source.name()), ioe1);
}
- List> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function>() {
+ List> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function>() {
@Override
- public Callable apply(final Sink sink) {
+ public Callable apply(final Filter sink) {
return new Callable() {
@Override
public Void call() throws Exception {
try {
- logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink));
+ logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink.name()));
sink.process(buffer);
- logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink));
+ logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink.name()));
} catch (IOException ioe1) {
- throw new IOException(String.format("I/O error while writing to %s", sink), ioe1);
+ throw new IOException(String.format("I/O error while writing to %s", sink.name()), ioe1);
}
return null;
}
@@ -422,7 +475,7 @@ public class Pipeline implements Iterable {
* Returns the number of input bytes.
*
* @return The number of input bytes, or {@link Optional#absent()} if the
- * component can not receive input
+ * filter did not receive input
*/
public Optional input() {
return (input == -1) ? Optional.absent() : Optional.of(input);
@@ -432,7 +485,7 @@ public class Pipeline implements Iterable {
* Returns the number of output bytes.
*
* @return The number of output bytes, or {@link Optional#absent()} if the
- * component can not send output
+ * filter did not send output
*/
public Optional output() {
return (output == -1) ? Optional.absent() : Optional.of(output);