X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Fsonitus%2Fdata%2FPipeline.java;h=5bbcd636091231c4b64c74b528eaeacde258f21d;hb=633a841142f978235ed9f745b6ba16c278963e62;hp=b45d34650f2d03858e7d9caa86ce69492b4a738e;hpb=df267bc370d157b3d7a20e8ffccdcb74b8655012;p=sonitus.git diff --git a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java index b45d346..5bbcd63 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java +++ b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java @@ -19,6 +19,7 @@ package net.pterodactylus.sonitus.data; import java.io.IOException; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -29,45 +30,107 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import com.google.common.base.Function; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.MoreExecutors; /** - * A pipeline is responsible for streaming audio data from a {@link Source} to - * an arbitrary number of connected {@link Filter}s and {@link Sink}s. + * A pipeline is responsible for streaming audio data from a {@link Filter} to + * an arbitrary number of connected {@link Filter}s. * * @author David ‘Bombe’ Roden */ -public class Pipeline { +public class Pipeline implements Iterable { /** The logger. */ private static final Logger logger = Logger.getLogger(Pipeline.class.getName()); /** The source of the audio stream. */ - private final Source source; + private final Filter source; - /** The sinks for each source. */ - private final Multimap sinks; + /** The filters for each source. */ + private final ListMultimap filters; - /** All started feeders. */ - private final List feeders = Lists.newArrayList(); + /** All started connections. */ + private final List connections = Lists.newArrayList(); /** * Creates a new pipeline. * * @param source * The source of the audio stream - * @param sinks - * The sinks for each source + * @param filters + * The filters for each source */ - private Pipeline(Source source, Multimap sinks) { + private Pipeline(Filter source, Multimap filters) { this.source = Preconditions.checkNotNull(source, "source must not be null"); - this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null"); + this.filters = ArrayListMultimap.create(Preconditions.checkNotNull(filters, "filters must not be null")); + for (Filter filter : Lists.reverse(filters())) { + logger.finest(String.format("Adding Listener to %s.", filter.name())); + filter.addMetadataListener(new MetadataListener() { + + @Override + public void metadataUpdated(Filter filter, Metadata metadata) { + for (Filter sinks : filters(filter)) { + logger.fine(String.format("Updating Metadata from %s to %s as %s.", filter.name(), sinks.name(), metadata)); + sinks.metadataUpdated(metadata); + } + } + }); + } + } + + // + // ACCESSORS + // + + /** + * Expose this pipeline’s source. + * + * @return This pipeline’s source + */ + public Filter source() { + return source; + } + + /** + * Returns all {@link Filter}s that are connected to the given filter. + * + * @param filter + * The filter to get the connected filters for + * @return The filters connected to the given filter, or an empty list if the + * filter does not exist in this pipeline, or is not connected to any filters + */ + public List filters(Filter filter) { + return filters.get(filter); + } + + /** + * Returns the traffic counters of the given filter. + * + * @param filter + * The filter to get the traffic counters for + * @return The traffic counters for the given filter + */ + public TrafficCounter trafficCounter(Filter filter) { + long input = -1; + long output = -1; + for (Connection connection : connections) { + /* the connection where the source matches knows the output. */ + if (connection.source.equals(filter)) { + output = connection.counter(); + } else if (connection.sinks.contains(filter)) { + input = connection.counter(); + } + } + return new TrafficCounter(input, output); } // @@ -78,45 +141,85 @@ public class Pipeline { * Starts the pipeline. * * @throws IOException - * if any of the sinks can not be opened + * if any of the filters can not be opened * @throws IllegalStateException * if the pipeline is already running */ public void start() throws IOException, IllegalStateException { - if (!feeders.isEmpty()) { + if (!connections.isEmpty()) { throw new IllegalStateException("Pipeline is already running!"); } - List sources = Lists.newArrayList(); - sources.add(source); + List filters = Lists.newArrayList(); + filters.add(source); /* collect all source->sink pairs. */ - while (!sources.isEmpty()) { - Source source = sources.remove(0); - Collection sinks = this.sinks.get(source); - feeders.add(new Feeder(source, sinks)); - for (Sink sink : sinks) { - sink.open(source.metadata()); - if (sink instanceof Filter) { - sources.add((Source) sink); - } + while (!filters.isEmpty()) { + Filter filter = filters.remove(0); + Collection sinks = this.filters.get(filter); + connections.add(new Connection(filter, sinks)); + for (Filter sink : sinks) { + logger.info(String.format("Opening %s with %s...", sink.name(), source.metadata())); + sink.open(filter.metadata()); + filters.add(sink); } } - for (Feeder feeder : feeders) { - logger.info(String.format("Starting Feeder from %s to %s.", feeder.source, feeder.sinks)); - new Thread(feeder).start(); + for (Connection connection : connections) { + String threadName = String.format("%s → %s.", connection.source.name(), FluentIterable.from(connection.sinks).transform(new Function() { + + @Override + public String apply(Filter sink) { + return sink.name(); + } + })); + logger.info(String.format("Starting Thread: %s", threadName)); + new Thread(connection, threadName).start(); } } public void stop() { - if (!feeders.isEmpty()) { + if (!connections.isEmpty()) { /* pipeline is not running. */ return; } - for (Feeder feeder : feeders) { - feeder.stop(); + for (Connection connection : connections) { + connection.stop(); } } // + // ITERABLE METHODS + // + + @Override + public Iterator iterator() { + return filters().iterator(); + } + + // + // PRIVATE METHODS + // + + /** + * Returns all filters of this pipeline, listed breadth-first, starting with + * the source. + * + * @return All filters of this pipeline + */ + public List filters() { + ImmutableList.Builder filters = ImmutableList.builder(); + List remainingFilters = Lists.newArrayList(); + filters.add(source); + remainingFilters.add(source); + while (!remainingFilters.isEmpty()) { + Collection sinks = this.filters(remainingFilters.remove(0)); + for (Filter sink : sinks) { + filters.add(sink); + remainingFilters.add(sink); + } + } + return filters.build(); + } + + // // STATIC METHODS // @@ -127,7 +230,7 @@ public class Pipeline { * The source at which to start * @return A builder for a new pipeline */ - public static Builder builder(Source source) { + public static Builder builder(Filter source) { return new Builder(source); } @@ -139,13 +242,13 @@ public class Pipeline { public static class Builder { /** The source of the pipeline. */ - private final Source source; + private final Filter source; - /** The sinks to which each source streams. */ - private Multimap nextSinks = ArrayListMultimap.create(); + /** The filters to which each source streams. */ + private Multimap nextSinks = ArrayListMultimap.create(); /** The last added source. */ - private Source lastSource; + private Filter lastSource; /** * Creates a new builder. @@ -153,31 +256,27 @@ public class Pipeline { * @param source * The source that starts the pipeline */ - private Builder(Source source) { + private Builder(Filter source) { this.source = source; lastSource = source; } /** - * Adds a {@link Sink} (or {@link Filter} as a recipient for the last added - * {@link Source}. + * Adds a {@link Filter} as a recipient for the last added source. * * @param sink * The sink to add * @return This builder - * @throws IllegalStateException - * if the last added {@link Sink} was not also a {@link Source} */ - public Builder to(Sink sink) { - Preconditions.checkState(lastSource != null, "last added Sink was not a Source"); + public Builder to(Filter sink) { nextSinks.put(lastSource, sink); - lastSource = (sink instanceof Filter) ? (Source) sink : null; + lastSource = sink; return this; } /** * Locates the given source and sets it as the last added node so that the - * next invocation of {@link #to(Sink)} can “fork” the pipeline. + * next invocation of {@link #to(Filter)} can “fork” the pipeline. * * @param source * The source to locate @@ -185,7 +284,7 @@ public class Pipeline { * @throws IllegalStateException * if the given source was not previously added as a sink */ - public Builder find(Source source) { + public Builder find(Filter source) { Preconditions.checkState(nextSinks.containsValue(source)); lastSource = source; return this; @@ -203,19 +302,19 @@ public class Pipeline { } /** - * A feeder is responsible for streaming audio from one {@link Source} to an - * arbitrary number of {@link Sink}s it is connected to. A feeder is started by - * creating a {@link Thread} wrapping it and starting said thread. + * A connection is responsible for streaming audio from one {@link Filter} to + * an arbitrary number of {@link Filter}s it is connected to. A connection is + * started by creating a {@link Thread} wrapping it and starting said thread. * * @author David ‘Bombe’ Roden */ - private class Feeder implements Runnable { + public class Connection implements Runnable { /** The source. */ - private final Source source; + private final Filter source; - /** The sinks. */ - private final Collection sinks; + /** The filters. */ + private final Collection sinks; /** Whether the feeder was stopped. */ private final AtomicBoolean stopped = new AtomicBoolean(false); @@ -223,18 +322,24 @@ public class Pipeline { /** The executor service. */ private final ExecutorService executorService; + /** The time the connection was started. */ + private long startTime; + + /** The number of copied bytes. */ + private long counter; + /** - * Creates a new feeder. + * Creates a new connection. * * @param source * The source of the stream * @param sinks - * The sinks to which to stream + * The filters to which to stream */ - public Feeder(Source source, Collection sinks) { + public Connection(Filter source, Collection sinks) { this.source = source; this.sinks = sinks; - if (sinks.size() == 1) { + if (sinks.size() < 2) { executorService = MoreExecutors.sameThreadExecutor(); } else { executorService = Executors.newCachedThreadPool(); @@ -242,10 +347,34 @@ public class Pipeline { } // + // ACCESSORS + // + + /** + * Returns the time this connection was started. + * + * @return The time this connection was started (in milliseconds since Jan 1, + * 1970 UTC) + */ + public long startTime() { + return startTime; + } + + /** + * Returns the number of bytes that this connection has received from its + * source during its lifetime. + * + * @return The number of processed input bytes + */ + public long counter() { + return counter; + } + + // // ACTIONS // - /** Stops this feeder. */ + /** Stops this connection. */ public void stop() { stopped.set(true); } @@ -256,32 +385,31 @@ public class Pipeline { @Override public void run() { - Metadata firstMetadata = null; + startTime = System.currentTimeMillis(); while (!stopped.get()) { try { - final Metadata lastMetadata = firstMetadata; - final Metadata metadata = firstMetadata = source.metadata(); final byte[] buffer; try { + logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name())); buffer = source.get(4096); + logger.finest(String.format("Got %d bytes from %s.", buffer.length, source.name())); } catch (IOException ioe1) { - throw new IOException(String.format("I/O error while reading from %s.", source), ioe1); + throw new IOException(String.format("I/O error while reading from %s.", source.name()), ioe1); } - List> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function>() { + List> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function>() { @Override - public Callable apply(final Sink sink) { + public Callable apply(final Filter sink) { return new Callable() { @Override public Void call() throws Exception { - if (!metadata.equals(lastMetadata)) { - sink.metadataUpdated(metadata); - } try { + logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink.name())); sink.process(buffer); + logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink.name())); } catch (IOException ioe1) { - throw new IOException(String.format("I/O error while writing to %s", sink), ioe1); + throw new IOException(String.format("I/O error while writing to %s", sink.name()), ioe1); } return null; } @@ -292,6 +420,7 @@ public class Pipeline { for (Future future : futures) { future.get(); } + counter += buffer.length; } catch (IOException e) { /* TODO */ e.printStackTrace(); @@ -310,4 +439,58 @@ public class Pipeline { } + /** + * Container for input and output counters. + * + * @author David ‘Bombe’ Roden + */ + public static class TrafficCounter { + + /** The number of input bytes. */ + private final long input; + + /** The number of output bytes. */ + private final long output; + + /** + * Creates a new traffic counter. + * + * @param input + * The number of input bytes (may be {@code -1} to signify non-available + * input) + * @param output + * The number of output bytes (may be {@code -1} to signify non-available + * output) + */ + public TrafficCounter(long input, long output) { + this.input = input; + this.output = output; + } + + // + // ACCESSORS + // + + /** + * Returns the number of input bytes. + * + * @return The number of input bytes, or {@link Optional#absent()} if the + * filter did not receive input + */ + public Optional input() { + return (input == -1) ? Optional.absent() : Optional.of(input); + } + + /** + * Returns the number of output bytes. + * + * @return The number of output bytes, or {@link Optional#absent()} if the + * filter did not send output + */ + public Optional output() { + return (output == -1) ? Optional.absent() : Optional.of(output); + } + + } + }