X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Fsonitus%2Fdata%2FPipeline.java;h=c24502fcd1b4f05ac513650dc592852fbe72b035;hb=cbeadf6d9eea57ab98cacd60e2419dd3c18bef89;hp=4e6a457ad8c87704fbb4e4489e6170ad04e132d1;hpb=0b264f13b26f68612de49e70010f41f50b6f7101;p=sonitus.git
diff --git a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java
index 4e6a457..c24502f 100644
--- a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java
+++ b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java
@@ -19,6 +19,7 @@ package net.pterodactylus.sonitus.data;
import java.io.IOException;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -29,10 +30,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import com.google.common.base.Function;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.MoreExecutors;
@@ -43,7 +46,7 @@ import com.google.common.util.concurrent.MoreExecutors;
*
* @author David âBombeâ Roden
*/
-public class Pipeline {
+public class Pipeline implements Iterable {
/** The logger. */
private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
@@ -54,8 +57,8 @@ public class Pipeline {
/** The sinks for each source. */
private final Multimap sinks;
- /** All started feeders. */
- private final List feeders = Lists.newArrayList();
+ /** All started connections. */
+ private final List connections = Lists.newArrayList();
/**
* Creates a new pipeline.
@@ -71,6 +74,53 @@ public class Pipeline {
}
//
+ // ACCESSORS
+ //
+
+ /**
+ * Expose this pipelineâs source.
+ *
+ * @return This pipelineâs source
+ */
+ public Source source() {
+ return source;
+ }
+
+ /**
+ * Returns all {@link Sink}s (or {@link Filter}s, really) that are connected to
+ * the given source.
+ *
+ * @param source
+ * The source to get the sinks for
+ * @return The sinks connected to the given source, or an empty list if the
+ * source does not exist in this pipeline
+ */
+ public Collection sinks(Source source) {
+ return sinks.get(source);
+ }
+
+ /**
+ * Returns the traffic counters of the given controlled component.
+ *
+ * @param controlledComponent
+ * The controlled component to get the traffic counters for
+ * @return The traffic counters for the given controlled component
+ */
+ public TrafficCounter trafficCounter(ControlledComponent controlledComponent) {
+ long input = -1;
+ long output = -1;
+ for (Connection connection : connections) {
+ /* the connection where the source matches knows the output. */
+ if (connection.source.equals(controlledComponent)) {
+ output = connection.counter();
+ } else if (connection.sinks.contains(controlledComponent)) {
+ input = connection.counter();
+ }
+ }
+ return new TrafficCounter(input, output);
+ }
+
+ //
// ACTIONS
//
@@ -83,7 +133,7 @@ public class Pipeline {
* if the pipeline is already running
*/
public void start() throws IOException, IllegalStateException {
- if (!feeders.isEmpty()) {
+ if (!connections.isEmpty()) {
throw new IllegalStateException("Pipeline is already running!");
}
List sources = Lists.newArrayList();
@@ -92,7 +142,7 @@ public class Pipeline {
while (!sources.isEmpty()) {
Source source = sources.remove(0);
Collection sinks = this.sinks.get(source);
- feeders.add(new Feeder(source, sinks));
+ connections.add(new Connection(source, sinks));
for (Sink sink : sinks) {
sink.open(source.metadata());
if (sink instanceof Filter) {
@@ -100,23 +150,32 @@ public class Pipeline {
}
}
}
- for (Feeder feeder : feeders) {
- logger.info(String.format("Starting Feeder from %s to %s.", feeder.source, feeder.sinks));
- new Thread(feeder).start();
+ for (Connection connection : connections) {
+ logger.info(String.format("Starting Connection from %s to %s.", connection.source, connection.sinks));
+ new Thread(connection).start();
}
}
public void stop() {
- if (!feeders.isEmpty()) {
+ if (!connections.isEmpty()) {
/* pipeline is not running. */
return;
}
- for (Feeder feeder : feeders) {
- feeder.stop();
+ for (Connection connection : connections) {
+ connection.stop();
}
}
//
+ // ITERABLE METHODS
+ //
+
+ @Override
+ public Iterator iterator() {
+ return ImmutableSet.builder().add(source).addAll(sinks.values()).build().iterator();
+ }
+
+ //
// STATIC METHODS
//
@@ -203,13 +262,13 @@ public class Pipeline {
}
/**
- * A feeder is responsible for streaming audio from one {@link Source} to an
- * arbitrary number of {@link Sink}s it is connected to. A feeder is started by
- * creating a {@link Thread} wrapping it and starting said thread.
+ * A connection is responsible for streaming audio from one {@link Source} to
+ * an arbitrary number of {@link Sink}s it is connected to. A connection is
+ * started by creating a {@link Thread} wrapping it and starting said thread.
*
* @author David âBombeâ Roden
*/
- private class Feeder implements Runnable {
+ public class Connection implements Runnable {
/** The source. */
private final Source source;
@@ -223,15 +282,18 @@ public class Pipeline {
/** The executor service. */
private final ExecutorService executorService;
+ /** The number of copied bytes. */
+ private long counter;
+
/**
- * Creates a new feeder.
+ * Creates a new connection.
*
* @param source
* The source of the stream
* @param sinks
* The sinks to which to stream
*/
- public Feeder(Source source, Collection sinks) {
+ public Connection(Source source, Collection sinks) {
this.source = source;
this.sinks = sinks;
if (sinks.size() == 1) {
@@ -242,10 +304,24 @@ public class Pipeline {
}
//
+ // ACCESSORS
+ //
+
+ /**
+ * Returns the number of bytes that this connection has received from its
+ * source during its lifetime.
+ *
+ * @return The number of processed input bytes
+ */
+ public long counter() {
+ return counter;
+ }
+
+ //
// ACTIONS
//
- /** Stops this feeder. */
+ /** Stops this connection. */
public void stop() {
stopped.set(true);
}
@@ -259,8 +335,6 @@ public class Pipeline {
Metadata firstMetadata = null;
while (!stopped.get()) {
try {
- final Metadata lastMetadata = firstMetadata;
- final Metadata metadata = firstMetadata = source.metadata();
final byte[] buffer;
try {
logger.finest(String.format("Getting %d bytes from %s...", 4096, source));
@@ -277,9 +351,6 @@ public class Pipeline {
@Override
public Void call() throws Exception {
- if (!metadata.equals(lastMetadata)) {
- sink.metadataUpdated(metadata);
- }
try {
logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink));
sink.process(buffer);
@@ -296,6 +367,7 @@ public class Pipeline {
for (Future future : futures) {
future.get();
}
+ counter += buffer.length;
} catch (IOException e) {
/* TODO */
e.printStackTrace();
@@ -314,4 +386,58 @@ public class Pipeline {
}
+ /**
+ * Container for input and output counters.
+ *
+ * @author David âBombeâ Roden
+ */
+ public static class TrafficCounter {
+
+ /** The number of input bytes. */
+ private final long input;
+
+ /** The number of output bytes. */
+ private final long output;
+
+ /**
+ * Creates a new traffic counter.
+ *
+ * @param input
+ * The number of input bytes (may be {@code -1} to signify non-available
+ * input)
+ * @param output
+ * The number of output bytes (may be {@code -1} to signify non-available
+ * output)
+ */
+ public TrafficCounter(long input, long output) {
+ this.input = input;
+ this.output = output;
+ }
+
+ //
+ // ACCESSORS
+ //
+
+ /**
+ * Returns the number of input bytes.
+ *
+ * @return The number of input bytes, or {@link Optional#absent()} if the
+ * component can not receive input
+ */
+ public Optional input() {
+ return (input == -1) ? Optional.absent() : Optional.of(input);
+ }
+
+ /**
+ * Returns the number of output bytes.
+ *
+ * @return The number of output bytes, or {@link Optional#absent()} if the
+ * component can not send output
+ */
+ public Optional output() {
+ return (output == -1) ? Optional.absent() : Optional.of(output);
+ }
+
+ }
+
}