X-Git-Url: https://git.pterodactylus.net/?p=sonitus.git;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Fsonitus%2Fdata%2FPipeline.java;h=ae4158df1614cc1bcf7b60d5871cf8805c7adeee;hp=9464f23491f897c745a1039e78c924acfb2e4e44;hb=ef71e567bf3eea60be94a5c56aa8f9bc423dd0f0;hpb=ada071e03e74811d9d23af3fc388733a1c3ff12f diff --git a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java index 9464f23..ae4158d 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java +++ b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java @@ -19,6 +19,7 @@ package net.pterodactylus.sonitus.data; import java.io.IOException; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -29,9 +30,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import com.google.common.base.Function; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; @@ -43,7 +46,7 @@ import com.google.common.util.concurrent.MoreExecutors; * * @author David ‘Bombe’ Roden */ -public class Pipeline { +public class Pipeline implements Iterable { /** The logger. */ private static final Logger logger = Logger.getLogger(Pipeline.class.getName()); @@ -54,8 +57,8 @@ public class Pipeline { /** The sinks for each source. */ private final Multimap sinks; - /** All started feeders. */ - private final List feeders = Lists.newArrayList(); + /** All started connections. */ + private final List connections = Lists.newArrayList(); /** * Creates a new pipeline. @@ -68,6 +71,21 @@ public class Pipeline { private Pipeline(Source source, Multimap sinks) { this.source = Preconditions.checkNotNull(source, "source must not be null"); this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null"); + for (ControlledComponent component : Lists.reverse(components())) { + logger.finest(String.format("Adding Listener to %s.", component.name())); + component.addMetadataListener(new MetadataListener() { + @Override + public void metadataUpdated(ControlledComponent component, Metadata metadata) { + if (!(component instanceof Source)) { + return; + } + for (ControlledComponent controlledComponent : sinks((Source) component)) { + logger.fine(String.format("Updating Metadata from %s to %s as %s.", component.name(), controlledComponent.name(), metadata)); + controlledComponent.metadataUpdated(metadata); + } + } + }); + } } // @@ -96,6 +114,27 @@ public class Pipeline { return sinks.get(source); } + /** + * Returns the traffic counters of the given controlled component. + * + * @param controlledComponent + * The controlled component to get the traffic counters for + * @return The traffic counters for the given controlled component + */ + public TrafficCounter trafficCounter(ControlledComponent controlledComponent) { + long input = -1; + long output = -1; + for (Connection connection : connections) { + /* the connection where the source matches knows the output. */ + if (connection.source.equals(controlledComponent)) { + output = connection.counter(); + } else if (connection.sinks.contains(controlledComponent)) { + input = connection.counter(); + } + } + return new TrafficCounter(input, output); + } + // // ACTIONS // @@ -109,7 +148,7 @@ public class Pipeline { * if the pipeline is already running */ public void start() throws IOException, IllegalStateException { - if (!feeders.isEmpty()) { + if (!connections.isEmpty()) { throw new IllegalStateException("Pipeline is already running!"); } List sources = Lists.newArrayList(); @@ -118,31 +157,73 @@ public class Pipeline { while (!sources.isEmpty()) { Source source = sources.remove(0); Collection sinks = this.sinks.get(source); - feeders.add(new Feeder(source, sinks)); + connections.add(new Connection(source, sinks)); for (Sink sink : sinks) { + logger.info(String.format("Opening %s with %s...", sink.name(), source.metadata())); sink.open(source.metadata()); if (sink instanceof Filter) { sources.add((Source) sink); } } } - for (Feeder feeder : feeders) { - logger.info(String.format("Starting Feeder from %s to %s.", feeder.source, feeder.sinks)); - new Thread(feeder).start(); + for (Connection connection : connections) { + logger.info(String.format("Starting Connection from %s to %s.", connection.source.name(), FluentIterable.from(connection.sinks).transform(new Function() { + @Override + public String apply(Sink sink) { + return sink.name(); + } + }))); + new Thread(connection).start(); } } public void stop() { - if (!feeders.isEmpty()) { + if (!connections.isEmpty()) { /* pipeline is not running. */ return; } - for (Feeder feeder : feeders) { - feeder.stop(); + for (Connection connection : connections) { + connection.stop(); } } // + // ITERABLE METHODS + // + + @Override + public Iterator iterator() { + return components().iterator(); + } + + // + // PRIVATE METHODS + // + + /** + * Returns all components of this pipeline, listed breadth-first, starting with + * the source. + * + * @return All components of this pipeline + */ + public List components() { + ImmutableList.Builder components = ImmutableList.builder(); + List currentComponents = Lists.newArrayList(); + components.add(source); + currentComponents.add(source); + while (!currentComponents.isEmpty()) { + Collection sinks = this.sinks((Source) currentComponents.remove(0)); + for (Sink sink : sinks) { + components.add(sink); + if (sink instanceof Source) { + currentComponents.add(sink); + } + } + } + return components.build(); + } + + // // STATIC METHODS // @@ -229,13 +310,13 @@ public class Pipeline { } /** - * A feeder is responsible for streaming audio from one {@link Source} to an - * arbitrary number of {@link Sink}s it is connected to. A feeder is started by - * creating a {@link Thread} wrapping it and starting said thread. + * A connection is responsible for streaming audio from one {@link Source} to + * an arbitrary number of {@link Sink}s it is connected to. A connection is + * started by creating a {@link Thread} wrapping it and starting said thread. * * @author David ‘Bombe’ Roden */ - private class Feeder implements Runnable { + public class Connection implements Runnable { /** The source. */ private final Source source; @@ -249,15 +330,18 @@ public class Pipeline { /** The executor service. */ private final ExecutorService executorService; + /** The number of copied bytes. */ + private long counter; + /** - * Creates a new feeder. + * Creates a new connection. * * @param source * The source of the stream * @param sinks * The sinks to which to stream */ - public Feeder(Source source, Collection sinks) { + public Connection(Source source, Collection sinks) { this.source = source; this.sinks = sinks; if (sinks.size() == 1) { @@ -268,10 +352,24 @@ public class Pipeline { } // + // ACCESSORS + // + + /** + * Returns the number of bytes that this connection has received from its + * source during its lifetime. + * + * @return The number of processed input bytes + */ + public long counter() { + return counter; + } + + // // ACTIONS // - /** Stops this feeder. */ + /** Stops this connection. */ public void stop() { stopped.set(true); } @@ -285,8 +383,6 @@ public class Pipeline { Metadata firstMetadata = null; while (!stopped.get()) { try { - final Metadata lastMetadata = firstMetadata; - final Metadata metadata = firstMetadata = source.metadata(); final byte[] buffer; try { logger.finest(String.format("Getting %d bytes from %s...", 4096, source)); @@ -303,9 +399,6 @@ public class Pipeline { @Override public Void call() throws Exception { - if (!metadata.equals(lastMetadata)) { - sink.metadataUpdated(metadata); - } try { logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink)); sink.process(buffer); @@ -322,6 +415,7 @@ public class Pipeline { for (Future future : futures) { future.get(); } + counter += buffer.length; } catch (IOException e) { /* TODO */ e.printStackTrace(); @@ -340,4 +434,58 @@ public class Pipeline { } + /** + * Container for input and output counters. + * + * @author David ‘Bombe’ Roden + */ + public static class TrafficCounter { + + /** The number of input bytes. */ + private final long input; + + /** The number of output bytes. */ + private final long output; + + /** + * Creates a new traffic counter. + * + * @param input + * The number of input bytes (may be {@code -1} to signify non-available + * input) + * @param output + * The number of output bytes (may be {@code -1} to signify non-available + * output) + */ + public TrafficCounter(long input, long output) { + this.input = input; + this.output = output; + } + + // + // ACCESSORS + // + + /** + * Returns the number of input bytes. + * + * @return The number of input bytes, or {@link Optional#absent()} if the + * component can not receive input + */ + public Optional input() { + return (input == -1) ? Optional.absent() : Optional.of(input); + } + + /** + * Returns the number of output bytes. + * + * @return The number of output bytes, or {@link Optional#absent()} if the + * component can not send output + */ + public Optional output() { + return (output == -1) ? Optional.absent() : Optional.of(output); + } + + } + }